I haven't seen the SQS client API personally, so I took this opportunity to implement it. As the processing contents, the basic operations of SQS, such as issuing a queue / dead letter queue, deleting a queue, and sending / receiving a message, have been implemented. We hope for your reference.
package com.amazonaws.samples;
import java.util.TimerTask;
import java.util.Timer;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.createQueueRequest;
import com.amazonaws.services.sqs.model.createQueueResult;
import com.amazonaws.services.sqs.model.deleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ListQueuesRequest;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.sendMessageRequest;
import com.amazonaws.services.sqs.model.sendMessageResult;
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
public class SQSLambdaSample {
@SuppressWarnings("deprecation")
AWSSecurityTokenServiceClient sts_client;
GetSessionTokenRequest session_token_request;
GetSessionTokenResult session_token_result;
Credentials session_creds;
BasicSessionCredentials sessionCredentials;
final AmazonSQS sqs;
public SQSLambdaSample() {
sts_client = new AWSSecurityTokenServiceClient();
session_token_request = new GetSessionTokenRequest();
session_token_result = sts_client.getSessionToken(session_token_request);
session_creds = session_token_result.getCredentials();
sessionCredentials = new BasicSessionCredentials(
session_creds.getAccessKeyId(),
session_creds.getSecretAccessKey(),
session_creds.getSessionToken());
sqs = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(sessionCredentials))
.build();
}
public static void main (String[] args) {
SQSLambdaSample sls = new SQSLambdaSample();
String srcQueue = "sample";
String deadletterQueue = "sample_dead";
sls.createQueue(srcQueue);
String queueURL = sls.getQueueURL(srcQueue);
sls.createQueue(deadletterQueue);
String deadletterQueueUrl = sls.getQueueURL(deadletterQueue);
String deadLetterQueueARN = sls.getQueueAttributes(deadletterQueueUrl);
sls.setQueueAttribute(queueURL, deadLetterQueueARN);
sls.purgeQueue(queueURL);
sls.purgeQueue(deadletterQueueUrl);
IntStream.range(1, 11)
.forEach(i -> sls.sendMessage(queueURL, "SQS Message" + i + "!!"));
Timer timer = new Timer();
TimerTask task = new TimerTask() {
int count = 0;
public void run() {
count++;
boolean existDeleteFlg = sls.rcvMessage(queueURL);
if (existDeleteFlg == false) {
timer.cancel();
timer.purge();
}
}
};
timer.scheduleAtFixedRate(task,1000,3000);
}
public void createQueue(String QueueName) {
ListQueuesRequest list_req = new ListQueuesRequest();
ListQueuesResult list_res = this.sqs.listQueues(list_req);
String strName = list_res.toString();
if (!(strName.contains(QueueName))) {
try {
createQueueRequest request = new createQueueRequest(QueueName);
createQueueResult result = this.sqs.createQueue(request);
System.out.println(QueueName + " has been created.");
} catch (AmazonSQSException e) {
System.out.println(e.getMessage());
}
} else {
System.out.println(QueueName + " already exist, skipped create queue.");
}
}
public String getQueueURL(String QueueName) {
GetQueueUrlRequest request = new GetQueueUrlRequest()
.withQueueName(QueueName);
try {
GetQueueUrlResult result = this.sqs.getQueueUrl(request);
String[] urlScheme = result.toString().split("QueueUrl: ");
String queueURL = urlScheme[1].replace("}", "");
return queueURL;
} catch (Exception e) {
return e.getMessage();
}
}
public void sendMessage(String QueueURL, String BodyMessage) {
sendMessageRequest request = new sendMessageRequest()
.withQueueUrl(QueueURL)
.withMessageBody(BodyMessage)
.withDelaySeconds(5);
try {
sendMessageResult result = this.sqs.sendMessage(request);
System.out.println("Message:" + BodyMessage + "has been sent.");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
public String purgeQueue(String queueURl) {
PurgeQueueRequest request = new PurgeQueueRequest()
.withQueueUrl(queueURl);
try {
String representation = this.sqs.purgeQueue(request).toString();
System.out.println("Purged all messages.");
return representation;
} catch (Exception e) {
return e.getMessage();
}
}
public String getQueueAttributes(String deadLetterQueueUrl) {
GetQueueAttributesResult deadLetterQueueAttributes = this.sqs.getQueueAttributes(
new GetQueueAttributesRequest(deadLetterQueueUrl)
.withAttributeNames("QueueArn"));
String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes()
.get("QueueArn");
return deadLetterQueueARN;
}
public void setQueueAttribute(String queueURL, String deadLetterQueueARN) {
SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest()
.withQueueUrl(queueURL)
.addAttributesEntry("RedrivePolicy",
"{\"maxReceiveCount\":\"10\", "
+ "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}");
try {
this.sqs.setQueueAttributes(queueAttributesRequest);
System.out.print("Set dead letter queue.");
} catch (Exception e){
System.out.print(e.getMessage());
}
}
public boolean rcvMessage(String queueURL) {
ReceiveMessageRequest request = new ReceiveMessageRequest()
.withQueueUrl(queueURL)
.withWaitTimeSeconds(5)
.withMaxNumberOfMessages(10);
ReceiveMessageResult result = this.sqs.receiveMessage(request);
String[] cutMsg = result.toString().split("Messages: ");
String msgList = cutMsg[1].replace("}", "");
if (!(msgList.equals("[]"))) {
for (Message message : result.getMessages()) {
try {
this.deleteMessage(queueURL, message.getReceiptHandle());
System.out.println(message.getMessageId() + " has been deleted.\n" + message.getBody());
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
return true;
} else {
System.out.println("There is no message which has completed.");
return false;
}
}
public String deleteMessage(String queueURL, String receiptHandle) {
deleteMessageRequest request = new deleteMessageRequest()
.withQueueUrl(queueURL)
.withReceiptHandle(receiptHandle);
try {
String deleteStr = this.sqs.deleteMessage(request).toString();
return deleteStr;
} catch (Exception e) {
return e.getMessage();
}
}
}
Recommended Posts