Comment utiliser les jetons SAS pour les hubs d'événements Azure (Java)

introduction

Bonjour, je m'appelle Kurtosis. Je fais du SRE.

Pourquoi j'ai écrit

J'utilisais Azure Eventhubs et je ne trouvais aucune documentation du tout, donc j'étais coincé. Cet article est destiné aux personnes disposant déjà d'un environnement Java. (maven, gradle, etc.)

Sujet principal

Chose que tu veux faire

Envoyer des données depuis l'application Android vers les hubs d'événements Azure

Je souhaite accéder aux Eventhubs à l'aide de la signature d'accès partagé. 1 Cependant, si vous disposez de SAS, vous pourrez accéder à Eventhubs de manière semi-permanente. Le placer côté client est incertain du point de vue de la sécurité. Lorsque je cherchais un bon moyen, j'ai obtenu des informations selon lesquelles Token peut être généré à partir de Sas. Utilisez donc SasToken pour envoyer des données à Eventhubs. Puisqu'il y avait une partie obstruée, je vais l'écrire sous forme de mémorandum.

Essayer

  1. Créez un SasToken
  2. Envoyez un message à Eventhubs en utilisant SasToken
  3. Vérifiez si le message est bien arrivé
1. Créez un SasToken

Portail Azure → Espace de noms Eventhubs → Instance Eventhubs → Politique d'accès partagé → Nom de la politique d'accès partagé → Clé primaire et chaîne de connexion - Clé primaire Est utilisé. Chaîne de connexion - clé primaire Endpoint = sb: // (nom Eventhubs) .servicebus.windows.net /; SharedAccessKeyName = .......; EntityPath = (point d'entrée) Transformez-le et utilisez-le pour les [RESOURCEURI] suivantes

スクリーンショット 2020-01-24 9.55.05.png

Variables à utiliser


[RESOURCEURI]=sb://(Nom des hubs d'événements).servicebus.windows.net/(Point d'accès)
[KEYNAME]=Nom de la stratégie d'accès partagé
[KEY]=Clé primaire pour le nom de la stratégie d'accès partagé

SASToken.java



import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

public class SASToken
{
    public static void main(String[] args) {
        long epoch = System.currentTimeMillis() / 1000L;
        int week = 60 * 60 * 24 * 7;
        String resourceUri ="[RESOURCEURI]";
        String keyName = "[KEYNAME]";
        String key = "[KEY]";
        String expiry = Long.toString(epoch + week);
        String sasToken = null;
        try {
            String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
            String signature = getHMAC256(key, stringToSign);
            sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig=" +
                    URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println(sasToken);
    }

    //Fonction qui renvoie une valeur de hachage
    public static String getHMAC256 (String key, String input){
        Mac sha256_HMAC = null;
        String hash = null;
        try {
            sha256_HMAC = Mac.getInstance("HmacSHA256");
            SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
            sha256_HMAC.init(secret_key);
            Base64.Encoder encoder = Base64.getEncoder();
            hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));

        } catch (InvalidKeyException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return hash;
    }
}

Cela vous donnera un sasToken.

2. Envoyez un message à Eventhubs en utilisant SasToken

Variables à utiliser


[SasToken obtenu en 1]=Résultats ci-dessus
[NAMESPACENAME]=Espace de noms Event Hub
[EVENTHUBNAME]=Le nom de l'instance du hub d'événements

SendToEventhubs.java



import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


public class SendToEventhubs {
    public static void main(String[] args)
            throws EventHubException, ExecutionException, InterruptedException, IOException {

        String sas = "[SasToken obtenu en 1]";
//Définir les informations d'identification
        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("[NAMESPACENAME]")
                .setEventHubName("[EVENTHUBNAME]")
                .setSharedAccessSignature(sas);
        final Gson gson = new GsonBuilder().create();
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
//Décrire le client Event Hub
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

        try {
            for (int i = 0; i < 1; i++) {

                String payload = "Hello, Eventhubs !! (Mettez un message ici)";
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());

                EventData sendEvent = EventData.create(payloadBytes);
                System.out.println(sendEvent);
//Envoyer au hub d'événements
                ehClient.sendSync(sendEvent);
            }
            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }
    }
}

Ceci termine l'envoi de l'événement. Cependant, comme cela ne peut pas être confirmé, je vais également inclure une méthode pour le recevoir.

3. Vérifiez si le message est bien arrivé

Variables à utiliser


        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]"; //Portail → Compte de stockage → Clé d'accès
        String storageContainerName = "[STORAGECONTAINERNAME]"; //Portail → Compte de stockage → Conteneur
        String hostNamePrefix = "";

EventProcessorSample.java


import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

public class EventProcessorSample
{
    public static void main(String args[]) throws InterruptedException, ExecutionException
    {
        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]";
        String storageContainerName = "[STORAGECONTAINERNAME]";
        String hostNamePrefix = "";


        ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey);

        EventProcessorHost host = new EventProcessorHost(
                EventProcessorHost.createHostName(hostNamePrefix),
                eventHubName,
                consumerGroupName,
                eventHubConnectionString.toString(),
                storageConnectionString,
                storageContainerName);

        System.out.println("Registering host named " + host.getHostName());
        EventProcessorOptions options = new EventProcessorOptions();
        options.setExceptionNotification(new ErrorNotificationHandler());

        host.registerEventProcessor(EventProcessor.class, options)
                .whenComplete((unused, e) ->
                {
                    if (e != null)
                    {
                        System.out.println("Failure while registering: " + e.toString());
                        if (e.getCause() != null)
                        {
                            System.out.println("Inner exception: " + e.getCause().toString());
                        }
                    }
                })
                .thenAccept((unused) ->
                {
                    System.out.println("Press enter to stop.");
                    try
                    {
                        System.in.read();
                    }
                    catch (Exception e)
                    {
                        System.out.println("Keyboard read failed: " + e.toString());
                    }
                })
                .thenCompose((unused) ->
                {
                    return host.unregisterEventProcessor();
                })
                .exceptionally((e) ->
                {
                    System.out.println("Failure while unregistering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                    return null;
                })
                .get(); // Wait for everything to finish before exiting main!

        System.out.println("End of sample");
    }

    // The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
    // as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
    // was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
    public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;

        // OnOpen is called when a new event processor instance is created by the host.
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }

        // OnClose is called when an event processor instance is being shut down.
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }

        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }

        // onEvents is called when events are received on this partition of the Event Hub.
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;

                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one.
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                                data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
}

En exécutant ce code, vous pouvez recevoir les données envoyées à Eventhubs en temps réel.

Où je suis resté coincé

--Je ne savais pas sur quoi ResourceUri pointait lors de la création du SASToken.

Il a fallu du temps et des efforts car il était nécessaire de faire une erreur d'essai avec chaque combinaison de ceux-ci. J'espère que cela aidera tous ceux qui liront cet article.

Les références

Envoyez et recevez des événements vers et depuis Azure Event Hubs à l'aide de Java (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send ) Authentifiez l'accès aux ressources Event Hubs à l'aide de la signature d'accès partagé (SAS) (https://docs.microsoft.com/en-us/azure/event-hubs/authenticate-shared-access-signature)

finalement

Merci d'avoir lu jusqu'au bout! Si vous avez des questions, nous apprécions vos commentaires.

Recommended Posts

Comment utiliser les jetons SAS pour les hubs d'événements Azure (Java)
Comment utiliser java Facultatif
Comment utiliser la classe Java
[Java] Comment utiliser removeAll ()
Comment utiliser Java Map
Comment utiliser les variables Java
Comment utiliser Truth (bibliothèque d'assertions pour Java / Android)
Comment utiliser HttpClient de Java (Get)
[À voir absolument pour l'apprenti ingénieur Java] Comment utiliser l'API Stream
Comment utiliser HttpClient de Java (Post)
[Java] Comment utiliser la méthode de jointure
[Traitement × Java] Comment utiliser les variables
[JavaFX] [Java8] Comment utiliser GridPane
Comment utiliser les méthodes de classe [Java]
[Java] Comment utiliser List [ArrayList]
Comment utiliser les classes en Java?
[Traitement × Java] Comment utiliser les tableaux
Comment utiliser les expressions Java lambda
[Java] Comment utiliser la classe Math
Comment utiliser le type enum Java
Prise en charge multilingue de Java Comment utiliser les paramètres régionaux
[Java] Comment utiliser la classe File
Comment utiliser la méthode de soumission (Java Silver)
[Explication facile à comprendre! ] Comment utiliser l'instance Java
[Java] Comment utiliser la méthode toString ()
Etudier comment utiliser le constructeur (java)
[Traitement × Java] Comment utiliser la boucle
Comment utiliser et définir les classes Java, importer
[Ruby] Comment utiliser slice pour les débutants
[Explication facile à comprendre! ] Comment utiliser le polymorphisme Java
[Java] [Maven3] Résumé de l'utilisation de Maven3
[Traitement × Java] Comment utiliser la classe
Comment utiliser la classe Java Scanner (Remarque)
[Traitement × Java] Comment utiliser la fonction
[Explication facile à comprendre! ] Comment utiliser ArrayList [Java]
[Java] Comment utiliser la classe Calendar
[Java] Découvrez comment utiliser correctement Optional
[Explication facile à comprendre! ] Comment utiliser la surcharge Java
gestion des exceptions try-catch-finally Comment utiliser java
[Explication facile à comprendre! ] Comment utiliser l'encapsulation Java
[Java] Comment tester s'il est nul dans JUnit
[Java] Comment utiliser la classe FileReader et la classe BufferedReader
Comment utiliser un tableau pour les clés HashMap
Comment utiliser le framework Java avec AWS Lambda! ??
Comment utiliser l'API Java avec des expressions lambda
[Java] (pour MacOS) Méthode de définition du chemin de classe
[Explication facile à comprendre! ] Comment utiliser l'héritage Java [Explication de remplacement]
Comment utiliser la méthode replace () (Java Silver)
Comment utiliser LOG Java Producer d'Alibaba Cloud
[Pour les super débutants] Comment utiliser l'autofocus: vrai
[Java] Comment rendre plusieurs boucles for uniques