Verwendung von SAS-Token für Azure Event Hubs (Java)

Einführung

Hallo, ich heiße Kurtosis. Ich mache SRE.

Warum ich geschrieben habe

Ich habe Azure Eventhubs verwendet und konnte überhaupt keine Dokumentation finden. Daher steckte ich fest. Dieser Artikel richtet sich an Personen, die bereits über eine Java-Umgebung verfügen. (Maven, Gradle usw.)

Hauptthema

Was du machen willst

Senden Sie Daten von der Android-App an Azure Event-Hubs

Ich möchte mit der Shared Access-Signatur auf Eventhubs zugreifen. 1 Wenn Sie jedoch über SAS verfügen, können Sie semi-permanent auf Eventhubs zugreifen. Die Platzierung auf der Client-Seite ist aus Sicherheitsgründen ungewiss. Als ich nach einem guten Weg suchte, bekam ich Informationen, dass Token aus Sas generiert werden kann. Verwenden Sie also SasToken, um Daten an Eventhubs zu senden. Da es einen verstopften Teil gab, werde ich ihn als Memorandum schreiben.

Versuchen

  1. Erstellen Sie ein SasToken
  2. Senden Sie mit SasToken eine Nachricht an Eventhubs
  3. Überprüfen Sie, ob die Nachricht richtig angekommen ist
1. Erstellen Sie ein SasToken

Azure-Portal → Eventhubs-Namespace → Eventhubs-Instanz → Richtlinie für gemeinsamen Zugriff → Name der Richtlinie für gemeinsamen Zugriff → Primärschlüssel & Verbindungszeichenfolge - Primärschlüssel Wird genutzt. Verbindungszeichenfolge - Primärschlüssel Endpoint = sb: // (Name des Eventhubs) .servicebus.windows.net /; SharedAccessKeyName = .......; EntityPath = (Einstiegspunkt) Transformiere dies und benutze es für das folgende [RESOURCEURI]

スクリーンショット 2020-01-24 9.55.05.png

Zu verwendende Variablen


[RESOURCEURI]=sb://(Name des Ereignis-Hubs).servicebus.windows.net/(Einstiegspunkt)
[KEYNAME]=Name der Richtlinie für freigegebenen Zugriff
[KEY]=Primärschlüssel für den Namen der Richtlinie für den gemeinsamen Zugriff

SASToken.java



import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

public class SASToken
{
    public static void main(String[] args) {
        long epoch = System.currentTimeMillis() / 1000L;
        int week = 60 * 60 * 24 * 7;
        String resourceUri ="[RESOURCEURI]";
        String keyName = "[KEYNAME]";
        String key = "[KEY]";
        String expiry = Long.toString(epoch + week);
        String sasToken = null;
        try {
            String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
            String signature = getHMAC256(key, stringToSign);
            sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig=" +
                    URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println(sasToken);
    }

    //Funktion, die einen Hashwert zurückgibt
    public static String getHMAC256 (String key, String input){
        Mac sha256_HMAC = null;
        String hash = null;
        try {
            sha256_HMAC = Mac.getInstance("HmacSHA256");
            SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
            sha256_HMAC.init(secret_key);
            Base64.Encoder encoder = Base64.getEncoder();
            hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));

        } catch (InvalidKeyException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return hash;
    }
}

Wenn Sie dies tun, erhalten Sie ein sasToken.

2. Senden Sie mit SasToken eine Nachricht an Eventhubs

Zu verwendende Variablen


[SasToken erhalten in 1]=Ergebnisse oben
[NAMESPACENAME]=Event Hub-Namespace
[EVENTHUBNAME]=Der Name der Event Hub-Instanz

SendToEventhubs.java



import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


public class SendToEventhubs {
    public static void main(String[] args)
            throws EventHubException, ExecutionException, InterruptedException, IOException {

        String sas = "[SasToken erhalten in 1]";
//Anmeldeinformationen festlegen
        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("[NAMESPACENAME]")
                .setEventHubName("[EVENTHUBNAME]")
                .setSharedAccessSignature(sas);
        final Gson gson = new GsonBuilder().create();
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
//Beschreiben des Event Hub-Clients
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

        try {
            for (int i = 0; i < 1; i++) {

                String payload = "Hello, Eventhubs !! (Geben Sie hier eine Nachricht ein)";
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());

                EventData sendEvent = EventData.create(payloadBytes);
                System.out.println(sendEvent);
//An Event Hub senden
                ehClient.sendSync(sendEvent);
            }
            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }
    }
}

Damit ist das Senden des Ereignisses abgeschlossen. Da dies jedoch nicht bestätigt werden kann, werde ich auch eine Methode zum Empfangen einschließen.

3. Überprüfen Sie, ob die Nachricht richtig angekommen ist

Zu verwendende Variablen


        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]"; //Portal → Speicherkonto → Zugriffsschlüssel
        String storageContainerName = "[STORAGECONTAINERNAME]"; //Portal → Speicherkonto → Container
        String hostNamePrefix = "";

EventProcessorSample.java


import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

public class EventProcessorSample
{
    public static void main(String args[]) throws InterruptedException, ExecutionException
    {
        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]";
        String storageContainerName = "[STORAGECONTAINERNAME]";
        String hostNamePrefix = "";


        ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey);

        EventProcessorHost host = new EventProcessorHost(
                EventProcessorHost.createHostName(hostNamePrefix),
                eventHubName,
                consumerGroupName,
                eventHubConnectionString.toString(),
                storageConnectionString,
                storageContainerName);

        System.out.println("Registering host named " + host.getHostName());
        EventProcessorOptions options = new EventProcessorOptions();
        options.setExceptionNotification(new ErrorNotificationHandler());

        host.registerEventProcessor(EventProcessor.class, options)
                .whenComplete((unused, e) ->
                {
                    if (e != null)
                    {
                        System.out.println("Failure while registering: " + e.toString());
                        if (e.getCause() != null)
                        {
                            System.out.println("Inner exception: " + e.getCause().toString());
                        }
                    }
                })
                .thenAccept((unused) ->
                {
                    System.out.println("Press enter to stop.");
                    try
                    {
                        System.in.read();
                    }
                    catch (Exception e)
                    {
                        System.out.println("Keyboard read failed: " + e.toString());
                    }
                })
                .thenCompose((unused) ->
                {
                    return host.unregisterEventProcessor();
                })
                .exceptionally((e) ->
                {
                    System.out.println("Failure while unregistering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                    return null;
                })
                .get(); // Wait for everything to finish before exiting main!

        System.out.println("End of sample");
    }

    // The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
    // as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
    // was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
    public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;

        // OnOpen is called when a new event processor instance is created by the host.
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }

        // OnClose is called when an event processor instance is being shut down.
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }

        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }

        // onEvents is called when events are received on this partition of the Event Hub.
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;

                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one.
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                                data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
}

Durch Ausführen dieses Codes können Sie die an Eventhubs gesendeten Daten in Echtzeit empfangen.

Wo ich feststeckte

Es hat Zeit und Mühe gekostet, da bei jeder Kombination dieser Elemente ein Testfehler gemacht werden musste. Ich hoffe, es hilft allen, die diesen Artikel lesen.

Verweise

Senden und Empfangen von Ereignissen an und von Azure Event Hubs mithilfe von Java (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send) ) Authentifizieren Sie den Zugriff auf Event Hubs-Ressourcen mithilfe der Shared Access Signature (SAS) (https://docs.microsoft.com/en-us/azure/event-hubs/authenticate-shared-access-signature).

Schließlich

Vielen Dank für das Lesen bis zum Ende! Wenn Sie Fragen haben, freuen wir uns über Ihre Kommentare.

Recommended Posts

Verwendung von SAS-Token für Azure Event Hubs (Java)
Verwendung von Java Optional
Verwendung der Java-Klasse
[Java] Verwendung von removeAll ()
Verwendung von Java Map
Verwendung von Java-Variablen
Verwendung von Truth (Assertion Library für Java / Android)
Verwendung von HttpClient (Get) von Java
[Ein Muss für einen Java-Ingenieurlehrling] Verwendung der Stream-API
Verwendung von HttpClient (Post) von Java
[Java] Verwendung der Join-Methode
[Verarbeitung × Java] Verwendung von Variablen
[JavaFX] [Java8] Verwendung von GridPane
Verwendung von Klassenmethoden [Java]
[Java] Verwendung von List [ArrayList]
Wie verwende ich Klassen in Java?
[Verarbeitung × Java] Verwendung von Arrays
Verwendung von Java-Lambda-Ausdrücken
[Java] Verwendung der Math-Klasse
Verwendung des Java-Aufzählungstyps
Mehrsprachige Unterstützung für Java Verwendung des Gebietsschemas
[Java] Verwendung der File-Klasse
Verwendung der Submit-Methode (Java Silver)
[Leicht verständliche Erklärung! ] Verwendung der Java-Instanz
[Java] Verwendung der toString () -Methode
Studieren der Verwendung des Konstruktors (Java)
[Verarbeitung × Java] Verwendung der Schleife
Verwendung und Definition von Java-Klassen, Importieren
[Ruby] Wie man Slice für Anfänger benutzt
[Leicht verständliche Erklärung! ] Verwendung des Java-Polymorphismus
[Java] [Maven3] Zusammenfassung der Verwendung von Maven3
[Verarbeitung × Java] Verwendung der Klasse
Verwendung der Java Scanner-Klasse (Hinweis)
[Verarbeitung × Java] Verwendung der Funktion
[Leicht verständliche Erklärung! ] Verwendung von ArrayList [Java]
[Java] Verwendung der Calendar-Klasse
[Java] Erfahren Sie, wie Sie Optional richtig verwenden
[Leicht verständliche Erklärung! ] Verwendung von Java-Überladung
try-catch-finally Ausnahmebehandlung Verwendung von Java
[Leicht verständliche Erklärung! ] Verwendung der Java-Kapselung
[Java] So testen Sie, ob es in JUnit null ist
[Java] Verwendung der FileReader-Klasse und der BufferedReader-Klasse
So verwenden Sie ein Array für HashMap-Schlüssel
Verwendung des Java-Frameworks mit AWS Lambda! ??
Verwendung der Java-API mit Lambda-Ausdrücken
[Java] (für MacOS) Methode zur Einstellung des Klassenpfads
[Leicht verständliche Erklärung! ] Verwendung der Java-Vererbung [Erklärung überschreiben]
Verwendung der replace () -Methode (Java Silver)
Verwendung des LOG Java Producer von Alibaba Cloud
[Für Super-Anfänger] Verwendung des Autofokus: true
[Java] Wie man mehrere for-Schleifen einzeln macht