Hello, My name is Kurtosis. I am doing SRE.
I'm using Azure Eventhubs and I can't find any documentation at all, so I'm stuck. This article is intended for people who already have a Java environment. (maven, gradle, etc.)
I want to access Eventhubs using the Shared Access Signature. 1 However, if you have SAS, you will be able to access Eventhubs semi-permanently. Placing it on the client side is uneasy from a security point of view. When I was looking for a good way, I got information that Token can be generated from Sas. So use SasToken to send data to Eventhubs. Since there was a clogged part, I will write it as a memorandum.
Azure portal → Eventhubs namespace → Eventhubs instance → Shared access policy → Shared access policy name → Primary key & connection string – Primary key Is used. Connection string – of the primary key Endpoint = sb: // (Eventhubs name) .servicebus.windows.net /; SharedAccessKeyName = .......; EntityPath = (entry point) This is transformed and used for the following [RESOURCEURI]
Variables to use
[RESOURCEURI]=sb://(Event hubs name).servicebus.windows.net/(Entry point)
[KEYNAME]=Shared access policy name
[KEY]=Primary key for shared access policy name
SASToken.java
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
public class SASToken
{
public static void main(String[] args) {
long epoch = System.currentTimeMillis() / 1000L;
int week = 60 * 60 * 24 * 7;
String resourceUri ="[RESOURCEURI]";
String keyName = "[KEYNAME]";
String key = "[KEY]";
String expiry = Long.toString(epoch + week);
String sasToken = null;
try {
String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
String signature = getHMAC256(key, stringToSign);
sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig=" +
URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println(sasToken);
}
//Function that returns a hash value
public static String getHMAC256 (String key, String input){
Mac sha256_HMAC = null;
String hash = null;
try {
sha256_HMAC = Mac.getInstance("HmacSHA256");
SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
sha256_HMAC.init(secret_key);
Base64.Encoder encoder = Base64.getEncoder();
hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return hash;
}
}
Doing this will give you a sasToken.
Variables to use
[SasToken obtained in 1]=Results above
[NAMESPACENAME]=Event hub namespace
[EVENTHUBNAME]=The name of the event hub instance
SendToEventhubs.java
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class SendToEventhubs {
public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException {
String sas = "[SasToken obtained in 1]";
//Set credentials
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("[NAMESPACENAME]")
.setEventHubName("[EVENTHUBNAME]")
.setSharedAccessSignature(sas);
final Gson gson = new GsonBuilder().create();
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
//Describe the event hub client
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
try {
for (int i = 0; i < 1; i++) {
String payload = "Hello, Eventhubs !! (Put a message here)";
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
System.out.println(sendEvent);
//Send to event hub
ehClient.sendSync(sendEvent);
}
System.out.println(Instant.now() + ": Send Complete...");
System.out.println("Press Enter to stop.");
System.in.read();
} finally {
ehClient.closeSync();
executorService.shutdown();
}
}
}
This completes sending the event. However, since this cannot be confirmed, I will also include a method for receiving it.
Variables to use
String consumerGroupName = "$Default";
String namespaceName = "[NAMESPACENAME]";
String eventHubName = "[EVENTHUBNAME]";
String sasKeyName = "[KEYNAME]";
String sasKey = "[KEY]";
String storageConnectionString = "[STORAGECONNECTION]"; //Portal → Storage account → Access key
String storageContainerName = "[STORAGECONTAINERNAME]"; //Portal → Storage account → Container
String hostNamePrefix = "";
EventProcessorSample.java
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
public class EventProcessorSample
{
public static void main(String args[]) throws InterruptedException, ExecutionException
{
String consumerGroupName = "$Default";
String namespaceName = "[NAMESPACENAME]";
String eventHubName = "[EVENTHUBNAME]";
String sasKeyName = "[KEYNAME]";
String sasKey = "[KEY]";
String storageConnectionString = "[STORAGECONNECTION]";
String storageContainerName = "[STORAGECONTAINERNAME]";
String hostNamePrefix = "";
ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
.setNamespaceName(namespaceName)
.setEventHubName(eventHubName)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey);
EventProcessorHost host = new EventProcessorHost(
EventProcessorHost.createHostName(hostNamePrefix),
eventHubName,
consumerGroupName,
eventHubConnectionString.toString(),
storageConnectionString,
storageContainerName);
System.out.println("Registering host named " + host.getHostName());
EventProcessorOptions options = new EventProcessorOptions();
options.setExceptionNotification(new ErrorNotificationHandler());
host.registerEventProcessor(EventProcessor.class, options)
.whenComplete((unused, e) ->
{
if (e != null)
{
System.out.println("Failure while registering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
}
})
.thenAccept((unused) ->
{
System.out.println("Press enter to stop.");
try
{
System.in.read();
}
catch (Exception e)
{
System.out.println("Keyboard read failed: " + e.toString());
}
})
.thenCompose((unused) ->
{
return host.unregisterEventProcessor();
})
.exceptionally((e) ->
{
System.out.println("Failure while unregistering: " + e.toString());
if (e.getCause() != null)
{
System.out.println("Inner exception: " + e.getCause().toString());
}
return null;
})
.get(); // Wait for everything to finish before exiting main!
System.out.println("End of sample");
}
// The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
// as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
// was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
{
@Override
public void accept(ExceptionReceivedEventArgs t)
{
System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
}
}
public static class EventProcessor implements IEventProcessor
{
private int checkpointBatchingCount = 0;
// OnOpen is called when a new event processor instance is created by the host.
@Override
public void onOpen(PartitionContext context) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
}
// OnClose is called when an event processor instance is being shut down.
@Override
public void onClose(PartitionContext context, CloseReason reason) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
}
// onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
@Override
public void onError(PartitionContext context, Throwable error)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
}
// onEvents is called when events are received on this partition of the Event Hub.
@Override
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
int eventCount = 0;
for (EventData data : events)
{
try
{
System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
eventCount++;
// Checkpointing persists the current position in the event stream for this partition and means that the next
// time any host opens an event processor on this event hub+consumer group+partition combination, it will start
// receiving at the event after this one.
this.checkpointBatchingCount++;
if ((checkpointBatchingCount % 5) == 0)
{
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
// Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
// before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
context.checkpoint(data).get();
}
}
catch (Exception e)
{
System.out.println("Processing failed for an event: " + e.toString());
}
}
System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
}
}
}
By executing this code, you can receive the data sent to Eventhubs in real time.
--I didn't know what ResourceUri was pointing to when creating the SASToken. --I didn't know where the SasToken was. Is it from sr, after = or the whole? (SharedAccessSignature sr=...) --What information do you need for the Sas token and other information when sending a message?
It took a lot of time and effort because it was necessary to make a trial error with each combination of these. I hope it helps everyone who read this article.
Send and receive events to and from Azure Event Hubs using Java (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send ) Authenticate access to Event Hubs resources using Shared Access Signature (SAS) (https://docs.microsoft.com/en-us/azure/event-hubs/authenticate-shared-access-signature)
Thank you for reading to the end! If you have any questions, we welcome your comments.
Recommended Posts