How to use SAS tokens for Azure Event hubs (Java)

Introduction

Hello, My name is Kurtosis. I am doing SRE.

Why i wrote

I'm using Azure Eventhubs and I can't find any documentation at all, so I'm stuck. This article is intended for people who already have a Java environment. (maven, gradle, etc.)

Main subject

Thing you want to do

Send data from android app to Azure Event hubs

I want to access Eventhubs using the Shared Access Signature. 1 However, if you have SAS, you will be able to access Eventhubs semi-permanently. Placing it on the client side is uneasy from a security point of view. When I was looking for a good way, I got information that Token can be generated from Sas. So use SasToken to send data to Eventhubs. Since there was a clogged part, I will write it as a memorandum.

Try

  1. Create a SasToken
  2. Send a message to Eventhubs using SasToken
  3. Check if the message has arrived properly
1. Create a SasToken

Azure portal → Eventhubs namespace → Eventhubs instance → Shared access policy → Shared access policy name → Primary key & connection string – Primary key Is used. Connection string – of the primary key Endpoint = sb: // (Eventhubs name) .servicebus.windows.net /; SharedAccessKeyName = .......; EntityPath = (entry point) This is transformed and used for the following [RESOURCEURI]

スクリーンショット 2020-01-24 9.55.05.png

Variables to use


[RESOURCEURI]=sb://(Event hubs name).servicebus.windows.net/(Entry point)
[KEYNAME]=Shared access policy name
[KEY]=Primary key for shared access policy name

SASToken.java



import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

public class SASToken
{
    public static void main(String[] args) {
        long epoch = System.currentTimeMillis() / 1000L;
        int week = 60 * 60 * 24 * 7;
        String resourceUri ="[RESOURCEURI]";
        String keyName = "[KEYNAME]";
        String key = "[KEY]";
        String expiry = Long.toString(epoch + week);
        String sasToken = null;
        try {
            String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
            String signature = getHMAC256(key, stringToSign);
            sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig=" +
                    URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println(sasToken);
    }

    //Function that returns a hash value
    public static String getHMAC256 (String key, String input){
        Mac sha256_HMAC = null;
        String hash = null;
        try {
            sha256_HMAC = Mac.getInstance("HmacSHA256");
            SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
            sha256_HMAC.init(secret_key);
            Base64.Encoder encoder = Base64.getEncoder();
            hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));

        } catch (InvalidKeyException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return hash;
    }
}

Doing this will give you a sasToken.

2. Send a message to Eventhubs using SasToken

Variables to use


[SasToken obtained in 1]=Results above
[NAMESPACENAME]=Event hub namespace
[EVENTHUBNAME]=The name of the event hub instance

SendToEventhubs.java



import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


public class SendToEventhubs {
    public static void main(String[] args)
            throws EventHubException, ExecutionException, InterruptedException, IOException {

        String sas = "[SasToken obtained in 1]";
//Set credentials
        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("[NAMESPACENAME]")
                .setEventHubName("[EVENTHUBNAME]")
                .setSharedAccessSignature(sas);
        final Gson gson = new GsonBuilder().create();
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
//Describe the event hub client
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

        try {
            for (int i = 0; i < 1; i++) {

                String payload = "Hello, Eventhubs !! (Put a message here)";
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());

                EventData sendEvent = EventData.create(payloadBytes);
                System.out.println(sendEvent);
//Send to event hub
                ehClient.sendSync(sendEvent);
            }
            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }
    }
}

This completes sending the event. However, since this cannot be confirmed, I will also include a method for receiving it.

3. Check if the message has arrived properly

Variables to use


        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]"; //Portal → Storage account → Access key
        String storageContainerName = "[STORAGECONTAINERNAME]"; //Portal → Storage account → Container
        String hostNamePrefix = "";

EventProcessorSample.java


import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

public class EventProcessorSample
{
    public static void main(String args[]) throws InterruptedException, ExecutionException
    {
        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]";
        String storageContainerName = "[STORAGECONTAINERNAME]";
        String hostNamePrefix = "";


        ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey);

        EventProcessorHost host = new EventProcessorHost(
                EventProcessorHost.createHostName(hostNamePrefix),
                eventHubName,
                consumerGroupName,
                eventHubConnectionString.toString(),
                storageConnectionString,
                storageContainerName);

        System.out.println("Registering host named " + host.getHostName());
        EventProcessorOptions options = new EventProcessorOptions();
        options.setExceptionNotification(new ErrorNotificationHandler());

        host.registerEventProcessor(EventProcessor.class, options)
                .whenComplete((unused, e) ->
                {
                    if (e != null)
                    {
                        System.out.println("Failure while registering: " + e.toString());
                        if (e.getCause() != null)
                        {
                            System.out.println("Inner exception: " + e.getCause().toString());
                        }
                    }
                })
                .thenAccept((unused) ->
                {
                    System.out.println("Press enter to stop.");
                    try
                    {
                        System.in.read();
                    }
                    catch (Exception e)
                    {
                        System.out.println("Keyboard read failed: " + e.toString());
                    }
                })
                .thenCompose((unused) ->
                {
                    return host.unregisterEventProcessor();
                })
                .exceptionally((e) ->
                {
                    System.out.println("Failure while unregistering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                    return null;
                })
                .get(); // Wait for everything to finish before exiting main!

        System.out.println("End of sample");
    }

    // The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
    // as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
    // was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
    public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;

        // OnOpen is called when a new event processor instance is created by the host.
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }

        // OnClose is called when an event processor instance is being shut down.
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }

        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }

        // onEvents is called when events are received on this partition of the Event Hub.
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;

                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one.
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                                data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
}

By executing this code, you can receive the data sent to Eventhubs in real time.

Where i got stuck

--I didn't know what ResourceUri was pointing to when creating the SASToken. --I didn't know where the SasToken was. Is it from sr, after = or the whole? (SharedAccessSignature sr=...) --What information do you need for the Sas token and other information when sending a message?

It took a lot of time and effort because it was necessary to make a trial error with each combination of these. I hope it helps everyone who read this article.

References

Send and receive events to and from Azure Event Hubs using Java (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send ) Authenticate access to Event Hubs resources using Shared Access Signature (SAS) (https://docs.microsoft.com/en-us/azure/event-hubs/authenticate-shared-access-signature)

Finally

Thank you for reading to the end! If you have any questions, we welcome your comments.

Recommended Posts

How to use SAS tokens for Azure Event hubs (Java)
[Java] How to use Map
How to use java Optional
How to use java class
[Java] How to use Optional ②
[Java] How to use removeAll ()
[Java] How to use string.format
How to use Java Map
How to use Java variables
[Java] How to use Optional ①
How to use Truth (assertion library for Java / Android)
How to use Java HttpClient (Get)
[Must-see for apprentice java engineer] How to use Stream API
How to use Java HttpClient (Post)
[Java] How to use join method
[Processing × Java] How to use variables
[Java] How to use LinkedHashMap class
[JavaFX] [Java8] How to use GridPane
How to use class methods [Java]
[Java] How to use List [ArrayList]
How to use classes in Java?
[Processing × Java] How to use arrays
How to use Java lambda expressions
[Java] How to use Math class
How to use Java enum type
Multilingual Locale in Java How to use Locale
[Java] How to use the File class
[Java] How to use the hasNext function
How to use submit method (Java Silver)
[Java] How to use the HashMap class
[Easy-to-understand explanation! ] How to use Java instance
[Java] How to use the toString () method
Studying how to use the constructor (java)
[Processing × Java] How to use the loop
How to use Java classes, definitions, import
[Ruby] How to use slice for beginners
[Easy-to-understand explanation! ] How to use Java polymorphism
[Java] [Maven3] Summary of how to use Maven3
[Processing × Java] How to use the class
How to use Java Scanner class (Note)
[Processing × Java] How to use the function
[Easy-to-understand explanation! ] How to use ArrayList [Java]
[Java] How to use the Calendar class
[Java] Learn how to use Optional correctly
[Easy-to-understand explanation! ] How to use Java overload
try-catch-finally exception handling How to use java
[Easy-to-understand explanation! ] How to use Java encapsulation
[Java] How to test for null with JUnit
[Java] How to use FileReader class and BufferedReader class
[Java] How to use Thread.sleep to pause the program
How to use an array for HashMap keys
How to use Java framework with AWS Lambda! ??
How to use Java API with lambda expression
[Java] (for MacOS) How to set the classpath
[Easy-to-understand explanation! ] How to use Java inheritance [Override explanation]
How to use the replace () method (Java Silver)
[Rails] How to use Gem'rails-i18n' for Japanese support
How to use Alibaba Cloud LOG Java Producer
How to use nginx-ingress-controller with Docker for Mac
[For super beginners] How to use autofocus: true
[Java] How to make multiple for loops single