Je n'ai pas vu l'API client SQS très souvent, j'ai donc profité de l'occasion pour l'implémenter. En tant que contenu de traitement, les opérations de base de SQS, telles que l'émission d'une file d'attente / file d'attente de lettres mortes, la suppression d'une file d'attente et l'envoi / la réception d'un message, ont été mises en œuvre. Nous espérons pour votre référence.
package com.amazonaws.samples;
import java.util.TimerTask;
import java.util.Timer;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.createQueueRequest;
import com.amazonaws.services.sqs.model.createQueueResult;
import com.amazonaws.services.sqs.model.deleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ListQueuesRequest;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.sendMessageRequest;
import com.amazonaws.services.sqs.model.sendMessageResult;
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
public class SQSLambdaSample {
@SuppressWarnings("deprecation")
AWSSecurityTokenServiceClient sts_client;
GetSessionTokenRequest session_token_request;
GetSessionTokenResult session_token_result;
Credentials session_creds;
BasicSessionCredentials sessionCredentials;
final AmazonSQS sqs;
public SQSLambdaSample() {
sts_client = new AWSSecurityTokenServiceClient();
session_token_request = new GetSessionTokenRequest();
session_token_result = sts_client.getSessionToken(session_token_request);
session_creds = session_token_result.getCredentials();
sessionCredentials = new BasicSessionCredentials(
session_creds.getAccessKeyId(),
session_creds.getSecretAccessKey(),
session_creds.getSessionToken());
sqs = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(sessionCredentials))
.build();
}
public static void main (String[] args) {
SQSLambdaSample sls = new SQSLambdaSample();
String srcQueue = "sample";
String deadletterQueue = "sample_dead";
sls.createQueue(srcQueue);
String queueURL = sls.getQueueURL(srcQueue);
sls.createQueue(deadletterQueue);
String deadletterQueueUrl = sls.getQueueURL(deadletterQueue);
String deadLetterQueueARN = sls.getQueueAttributes(deadletterQueueUrl);
sls.setQueueAttribute(queueURL, deadLetterQueueARN);
sls.purgeQueue(queueURL);
sls.purgeQueue(deadletterQueueUrl);
IntStream.range(1, 11)
.forEach(i -> sls.sendMessage(queueURL, "SQS Message" + i + "!!"));
Timer timer = new Timer();
TimerTask task = new TimerTask() {
int count = 0;
public void run() {
count++;
boolean existDeleteFlg = sls.rcvMessage(queueURL);
if (existDeleteFlg == false) {
timer.cancel();
timer.purge();
}
}
};
timer.scheduleAtFixedRate(task,1000,3000);
}
public void createQueue(String QueueName) {
ListQueuesRequest list_req = new ListQueuesRequest();
ListQueuesResult list_res = this.sqs.listQueues(list_req);
String strName = list_res.toString();
if (!(strName.contains(QueueName))) {
try {
createQueueRequest request = new createQueueRequest(QueueName);
createQueueResult result = this.sqs.createQueue(request);
System.out.println(QueueName + " has been created.");
} catch (AmazonSQSException e) {
System.out.println(e.getMessage());
}
} else {
System.out.println(QueueName + " already exist, skipped create queue.");
}
}
public String getQueueURL(String QueueName) {
GetQueueUrlRequest request = new GetQueueUrlRequest()
.withQueueName(QueueName);
try {
GetQueueUrlResult result = this.sqs.getQueueUrl(request);
String[] urlScheme = result.toString().split("QueueUrl: ");
String queueURL = urlScheme[1].replace("}", "");
return queueURL;
} catch (Exception e) {
return e.getMessage();
}
}
public void sendMessage(String QueueURL, String BodyMessage) {
sendMessageRequest request = new sendMessageRequest()
.withQueueUrl(QueueURL)
.withMessageBody(BodyMessage)
.withDelaySeconds(5);
try {
sendMessageResult result = this.sqs.sendMessage(request);
System.out.println("Message:" + BodyMessage + "has been sent.");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
public String purgeQueue(String queueURl) {
PurgeQueueRequest request = new PurgeQueueRequest()
.withQueueUrl(queueURl);
try {
String representation = this.sqs.purgeQueue(request).toString();
System.out.println("Purged all messages.");
return representation;
} catch (Exception e) {
return e.getMessage();
}
}
public String getQueueAttributes(String deadLetterQueueUrl) {
GetQueueAttributesResult deadLetterQueueAttributes = this.sqs.getQueueAttributes(
new GetQueueAttributesRequest(deadLetterQueueUrl)
.withAttributeNames("QueueArn"));
String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes()
.get("QueueArn");
return deadLetterQueueARN;
}
public void setQueueAttribute(String queueURL, String deadLetterQueueARN) {
SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest()
.withQueueUrl(queueURL)
.addAttributesEntry("RedrivePolicy",
"{\"maxReceiveCount\":\"10\", "
+ "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}");
try {
this.sqs.setQueueAttributes(queueAttributesRequest);
System.out.print("Set dead letter queue.");
} catch (Exception e){
System.out.print(e.getMessage());
}
}
public boolean rcvMessage(String queueURL) {
ReceiveMessageRequest request = new ReceiveMessageRequest()
.withQueueUrl(queueURL)
.withWaitTimeSeconds(5)
.withMaxNumberOfMessages(10);
ReceiveMessageResult result = this.sqs.receiveMessage(request);
String[] cutMsg = result.toString().split("Messages: ");
String msgList = cutMsg[1].replace("}", "");
if (!(msgList.equals("[]"))) {
for (Message message : result.getMessages()) {
try {
this.deleteMessage(queueURL, message.getReceiptHandle());
System.out.println(message.getMessageId() + " has been deleted.\n" + message.getBody());
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
return true;
} else {
System.out.println("There is no message which has completed.");
return false;
}
}
public String deleteMessage(String queueURL, String receiptHandle) {
deleteMessageRequest request = new deleteMessageRequest()
.withQueueUrl(queueURL)
.withReceiptHandle(receiptHandle);
try {
String deleteStr = this.sqs.deleteMessage(request).toString();
return deleteStr;
} catch (Exception e) {
return e.getMessage();
}
}
}
Recommended Posts