Bonjour, je m'appelle Kurtosis. Je fais du SRE.
J'utilisais Azure Eventhubs et je ne trouvais aucune documentation du tout, donc j'étais coincé. Cet article est destiné aux personnes disposant déjà d'un environnement Java. (maven, gradle, etc.)
Je souhaite accéder aux Eventhubs à l'aide de la signature d'accès partagé. 1 Cependant, si vous disposez de SAS, vous pourrez accéder à Eventhubs de manière semi-permanente. Le placer côté client est incertain du point de vue de la sécurité. Lorsque je cherchais un bon moyen, j'ai obtenu des informations selon lesquelles Token peut être généré à partir de Sas. Utilisez donc SasToken pour envoyer des données à Eventhubs. Puisqu'il y avait une partie obstruée, je vais l'écrire sous forme de mémorandum.
Portail Azure → Espace de noms Eventhubs → Instance Eventhubs → Politique d'accès partagé → Nom de la politique d'accès partagé → Clé primaire et chaîne de connexion - Clé primaire Est utilisé. Chaîne de connexion - clé primaire Endpoint = sb: // (nom Eventhubs) .servicebus.windows.net /; SharedAccessKeyName = .......; EntityPath = (point d'entrée) Transformez-le et utilisez-le pour les [RESOURCEURI] suivantes
Variables à utiliser
[RESOURCEURI]=sb://(Nom des hubs d'événements).servicebus.windows.net/(Point d'accès)
[KEYNAME]=Nom de la stratégie d'accès partagé
[KEY]=Clé primaire pour le nom de la stratégie d'accès partagé
SASToken.java
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
public class SASToken
{
public static void main(String[] args) {
long epoch = System.currentTimeMillis() / 1000L;
int week = 60 * 60 * 24 * 7;
String resourceUri ="[RESOURCEURI]";
String keyName = "[KEYNAME]";
String key = "[KEY]";
String expiry = Long.toString(epoch + week);
String sasToken = null;
try {
String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
String signature = getHMAC256(key, stringToSign);
sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig=" +
URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(sasToken);
}
//Fonction qui renvoie une valeur de hachage
public static String getHMAC256 (String key, String input){
Mac sha256_HMAC = null;
String hash = null;
try {
sha256_HMAC = Mac.getInstance("HmacSHA256");
SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
sha256_HMAC.init(secret_key);
Base64.Encoder encoder = Base64.getEncoder();
hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return hash;
}
}
Cela vous donnera un sasToken.
Variables à utiliser
[SasToken obtenu en 1]=Résultats ci-dessus
[NAMESPACENAME]=Espace de noms Event Hub
[EVENTHUBNAME]=Le nom de l'instance du hub d'événements
SendToEventhubs.java
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class SendToEventhubs {
public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException {
String sas = "[SasToken obtenu en 1]";
//Définir les informations d'identification
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("[NAMESPACENAME]")
.setEventHubName("[EVENTHUBNAME]")
.setSharedAccessSignature(sas);
final Gson gson = new GsonBuilder().create();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
//Décrire le client Event Hub
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
try {
for (int i = 0; i < 1; i++) {
String payload = "Hello, Eventhubs !! (Mettez un message ici)";
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
System.out.println(sendEvent);
//Envoyer au hub d'événements
ehClient.sendSync(sendEvent);
}
System.out.println(Instant.now() + ": Send Complete...");
System.out.println("Press Enter to stop.");
System.in.read();
} finally {
ehClient.closeSync();
executorService.shutdown();
}
}
}
Ceci termine l'envoi de l'événement. Cependant, comme cela ne peut pas être confirmé, je vais également inclure une méthode pour le recevoir.
Variables à utiliser
String consumerGroupName = "$Default";
String namespaceName = "[NAMESPACENAME]";
String eventHubName = "[EVENTHUBNAME]";
String sasKeyName = "[KEYNAME]";
String sasKey = "[KEY]";
String storageConnectionString = "[STORAGECONNECTION]"; //Portail → Compte de stockage → Clé d'accès
String storageContainerName = "[STORAGECONTAINERNAME]"; //Portail → Compte de stockage → Conteneur
String hostNamePrefix = "";
EventProcessorSample.java
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
public class EventProcessorSample
{
public static void main(String args[]) throws InterruptedException, ExecutionException
{
String consumerGroupName = "$Default";
String namespaceName = "[NAMESPACENAME]";
String eventHubName = "[EVENTHUBNAME]";
String sasKeyName = "[KEYNAME]";
String sasKey = "[KEY]";
String storageConnectionString = "[STORAGECONNECTION]";
String storageContainerName = "[STORAGECONTAINERNAME]";
String hostNamePrefix = "";
ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
.setNamespaceName(namespaceName)
.setEventHubName(eventHubName)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey);
EventProcessorHost host = new EventProcessorHost(
EventProcessorHost.createHostName(hostNamePrefix),
eventHubName,
consumerGroupName,
eventHubConnectionString.toString(),
storageConnectionString,
storageContainerName);
System.out.println("Registering host named " + host.getHostName());
EventProcessorOptions options = new EventProcessorOptions();
options.setExceptionNotification(new ErrorNotificationHandler());
host.registerEventProcessor(EventProcessor.class, options)
.whenComplete((unused, e) ->
{
if (e != null)
{
System.out.println("Failure while registering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
}
})
.thenAccept((unused) ->
{
System.out.println("Press enter to stop.");
try
{
System.in.read();
}
catch (Exception e)
{
System.out.println("Keyboard read failed: " + e.toString());
}
})
.thenCompose((unused) ->
{
return host.unregisterEventProcessor();
})
.exceptionally((e) ->
{
System.out.println("Failure while unregistering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
return null;
})
.get(); // Wait for everything to finish before exiting main!
System.out.println("End of sample");
}
// The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
// as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
// was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
{
@Override
public void accept(ExceptionReceivedEventArgs t)
{
System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
}
}
public static class EventProcessor implements IEventProcessor
{
private int checkpointBatchingCount = 0;
// OnOpen is called when a new event processor instance is created by the host.
@Override
public void onOpen(PartitionContext context) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
}
// OnClose is called when an event processor instance is being shut down.
@Override
public void onClose(PartitionContext context, CloseReason reason) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
}
// onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
@Override
public void onError(PartitionContext context, Throwable error)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
}
// onEvents is called when events are received on this partition of the Event Hub.
@Override
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
int eventCount = 0;
for (EventData data : events)
{
try
{
System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
eventCount++;
// Checkpointing persists the current position in the event stream for this partition and means that the next
// time any host opens an event processor on this event hub+consumer group+partition combination, it will start
// receiving at the event after this one.
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 5) == 0)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
// Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
// before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
context.checkpoint(data).get();
}
}
catch (Exception e)
{
System.out.println("Processing failed for an event: " + e.toString());
}
}
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
}
}
}
En exécutant ce code, vous pouvez recevoir les données envoyées à Eventhubs en temps réel.
--Je ne savais pas sur quoi ResourceUri pointait lors de la création du SASToken.
Il a fallu du temps et des efforts car il était nécessaire de faire une erreur d'essai avec chaque combinaison de ceux-ci. J'espère que cela aidera tous ceux qui liront cet article.
Envoyez et recevez des événements vers et depuis Azure Event Hubs à l'aide de Java (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send ) Authentifiez l'accès aux ressources Event Hubs à l'aide de la signature d'accès partagé (SAS) (https://docs.microsoft.com/en-us/azure/event-hubs/authenticate-shared-access-signature)
Merci d'avoir lu jusqu'au bout! Si vous avez des questions, nous apprécions vos commentaires.
Recommended Posts