Hallo, ich heiße Kurtosis. Ich mache SRE.
Ich habe Azure Eventhubs verwendet und konnte überhaupt keine Dokumentation finden. Daher steckte ich fest. Dieser Artikel richtet sich an Personen, die bereits über eine Java-Umgebung verfügen. (Maven, Gradle usw.)
Ich möchte mit der Shared Access-Signatur auf Eventhubs zugreifen. 1 Wenn Sie jedoch über SAS verfügen, können Sie semi-permanent auf Eventhubs zugreifen. Die Platzierung auf der Client-Seite ist aus Sicherheitsgründen ungewiss. Als ich nach einem guten Weg suchte, bekam ich Informationen, dass Token aus Sas generiert werden kann. Verwenden Sie also SasToken, um Daten an Eventhubs zu senden. Da es einen verstopften Teil gab, werde ich ihn als Memorandum schreiben.
Azure-Portal → Eventhubs-Namespace → Eventhubs-Instanz → Richtlinie für gemeinsamen Zugriff → Name der Richtlinie für gemeinsamen Zugriff → Primärschlüssel & Verbindungszeichenfolge - Primärschlüssel Wird genutzt. Verbindungszeichenfolge - Primärschlüssel Endpoint = sb: // (Name des Eventhubs) .servicebus.windows.net /; SharedAccessKeyName = .......; EntityPath = (Einstiegspunkt) Transformiere dies und benutze es für das folgende [RESOURCEURI]
Zu verwendende Variablen
[RESOURCEURI]=sb://(Name des Ereignis-Hubs).servicebus.windows.net/(Einstiegspunkt)
[KEYNAME]=Name der Richtlinie für freigegebenen Zugriff
[KEY]=Primärschlüssel für den Namen der Richtlinie für den gemeinsamen Zugriff
SASToken.java
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
public class SASToken
{
public static void main(String[] args) {
long epoch = System.currentTimeMillis() / 1000L;
int week = 60 * 60 * 24 * 7;
String resourceUri ="[RESOURCEURI]";
String keyName = "[KEYNAME]";
String key = "[KEY]";
String expiry = Long.toString(epoch + week);
String sasToken = null;
try {
String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
String signature = getHMAC256(key, stringToSign);
sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig=" +
URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(sasToken);
}
//Funktion, die einen Hashwert zurückgibt
public static String getHMAC256 (String key, String input){
Mac sha256_HMAC = null;
String hash = null;
try {
sha256_HMAC = Mac.getInstance("HmacSHA256");
SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
sha256_HMAC.init(secret_key);
Base64.Encoder encoder = Base64.getEncoder();
hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return hash;
}
}
Wenn Sie dies tun, erhalten Sie ein sasToken.
Zu verwendende Variablen
[SasToken erhalten in 1]=Ergebnisse oben
[NAMESPACENAME]=Event Hub-Namespace
[EVENTHUBNAME]=Der Name der Event Hub-Instanz
SendToEventhubs.java
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class SendToEventhubs {
public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException {
String sas = "[SasToken erhalten in 1]";
//Anmeldeinformationen festlegen
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("[NAMESPACENAME]")
.setEventHubName("[EVENTHUBNAME]")
.setSharedAccessSignature(sas);
final Gson gson = new GsonBuilder().create();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
//Beschreiben des Event Hub-Clients
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
try {
for (int i = 0; i < 1; i++) {
String payload = "Hello, Eventhubs !! (Geben Sie hier eine Nachricht ein)";
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
System.out.println(sendEvent);
//An Event Hub senden
ehClient.sendSync(sendEvent);
}
System.out.println(Instant.now() + ": Send Complete...");
System.out.println("Press Enter to stop.");
System.in.read();
} finally {
ehClient.closeSync();
executorService.shutdown();
}
}
}
Damit ist das Senden des Ereignisses abgeschlossen. Da dies jedoch nicht bestätigt werden kann, werde ich auch eine Methode zum Empfangen einschließen.
Zu verwendende Variablen
String consumerGroupName = "$Default";
String namespaceName = "[NAMESPACENAME]";
String eventHubName = "[EVENTHUBNAME]";
String sasKeyName = "[KEYNAME]";
String sasKey = "[KEY]";
String storageConnectionString = "[STORAGECONNECTION]"; //Portal → Speicherkonto → Zugriffsschlüssel
String storageContainerName = "[STORAGECONTAINERNAME]"; //Portal → Speicherkonto → Container
String hostNamePrefix = "";
EventProcessorSample.java
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
public class EventProcessorSample
{
public static void main(String args[]) throws InterruptedException, ExecutionException
{
String consumerGroupName = "$Default";
String namespaceName = "[NAMESPACENAME]";
String eventHubName = "[EVENTHUBNAME]";
String sasKeyName = "[KEYNAME]";
String sasKey = "[KEY]";
String storageConnectionString = "[STORAGECONNECTION]";
String storageContainerName = "[STORAGECONTAINERNAME]";
String hostNamePrefix = "";
ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
.setNamespaceName(namespaceName)
.setEventHubName(eventHubName)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey);
EventProcessorHost host = new EventProcessorHost(
EventProcessorHost.createHostName(hostNamePrefix),
eventHubName,
consumerGroupName,
eventHubConnectionString.toString(),
storageConnectionString,
storageContainerName);
System.out.println("Registering host named " + host.getHostName());
EventProcessorOptions options = new EventProcessorOptions();
options.setExceptionNotification(new ErrorNotificationHandler());
host.registerEventProcessor(EventProcessor.class, options)
.whenComplete((unused, e) ->
{
if (e != null)
{
System.out.println("Failure while registering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
}
})
.thenAccept((unused) ->
{
System.out.println("Press enter to stop.");
try
{
System.in.read();
}
catch (Exception e)
{
System.out.println("Keyboard read failed: " + e.toString());
}
})
.thenCompose((unused) ->
{
return host.unregisterEventProcessor();
})
.exceptionally((e) ->
{
System.out.println("Failure while unregistering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
return null;
})
.get(); // Wait for everything to finish before exiting main!
System.out.println("End of sample");
}
// The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
// as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
// was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
{
@Override
public void accept(ExceptionReceivedEventArgs t)
{
System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
}
}
public static class EventProcessor implements IEventProcessor
{
private int checkpointBatchingCount = 0;
// OnOpen is called when a new event processor instance is created by the host.
@Override
public void onOpen(PartitionContext context) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
}
// OnClose is called when an event processor instance is being shut down.
@Override
public void onClose(PartitionContext context, CloseReason reason) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
}
// onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
@Override
public void onError(PartitionContext context, Throwable error)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
}
// onEvents is called when events are received on this partition of the Event Hub.
@Override
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
int eventCount = 0;
for (EventData data : events)
{
try
{
System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
eventCount++;
// Checkpointing persists the current position in the event stream for this partition and means that the next
// time any host opens an event processor on this event hub+consumer group+partition combination, it will start
// receiving at the event after this one.
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 5) == 0)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
// Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
// before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
context.checkpoint(data).get();
}
}
catch (Exception e)
{
System.out.println("Processing failed for an event: " + e.toString());
}
}
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
}
}
}
Durch Ausführen dieses Codes können Sie die an Eventhubs gesendeten Daten in Echtzeit empfangen.
Es hat Zeit und Mühe gekostet, da bei jeder Kombination dieser Elemente ein Testfehler gemacht werden musste. Ich hoffe, es hilft allen, die diesen Artikel lesen.
Senden und Empfangen von Ereignissen an und von Azure Event Hubs mithilfe von Java (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send) ) Authentifizieren Sie den Zugriff auf Event Hubs-Ressourcen mithilfe der Shared Access Signature (SAS) (https://docs.microsoft.com/en-us/azure/event-hubs/authenticate-shared-access-signature).
Vielen Dank für das Lesen bis zum Ende! Wenn Sie Fragen haben, freuen wir uns über Ihre Kommentare.
Recommended Posts