如何使用 AWS 2.0 SDK 重新处理传输中的 SQS 消息
How to reprocess in-flight SQS messages with AWS 2.0 SDK
我正在用 SQS 处理器替换 JMS 处理器。当我收到消息时,我需要对多个第三方系统进行更新调用。我不确定如何处理成功检索 SQS 消息的情况,但有一个或多个更新第三方系统的调用失败。在 JMS 世界中,我们会抛出一个异常,它会在重试 4 次后最终进入 DLQ 之前重新发送相同的 JMS 消息并进行增量退避。这是我目前的代码。
我有以下辅助方法来整合消息检索
public static List<Message> receiveMessage(SqsClient sqs, String queueUrl) throws AwsException {
try {
ReceiveMessageRequest req = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.waitTimeSeconds(LONG_POLL_DURATION)
.build();
ReceiveMessageResponse resp = sqs.receiveMessage(req);
if (resp != null) {
if (!Collections.isNullOrEmpty(resp.messages())) {
return resp.messages();
} else {
return new ArrayList<>();
}
} else {
return new ArrayList<>();
}
} catch (SdkException e) {
throw new AwsException("An error occurred receiving SQS message: " + e.getMessage(), e);
}
}
我处理 SQS 消息的代码
try (SqsClient client = SqsUtil.getClient()) {
while(!shutdown) {
List<Message> messages = SqsUtil.receiveMessage(client, queueUrl);
if (!messages.isEmpty()) {
for(Message msg : messages) {
boolean errorOccurred = false;
try {
//Convert SQS message to System 1 Request
System1Request req1 = convert(msg);
system1Client.process(req1);
} catch (System1Exception e) {
//log error
errorOccurred = true;
}
try {
//Convert SQS message to System 2 Request
System2Request req2 = convert(msg);
system2Client.process(req2);
} catch (System1Exception e) {
//log error
errorOccurred = true;
}
if (!errorOccurred) {
//delete SQS message
} else {
//TODO: how do I re-process the message using SQS
}
}
}
}
}
根据我的理解,SQS 客户端在 JDK 中内置了重试功能,但我认为这不适用于传输中的消息?我在 API 中看不到如何将消息添加回 SQS 队列。我犹豫要不要在我的应用程序中构建重试逻辑,以防 pod 重新启动并且消息丢失。
我知道如果我不删除邮件,它最终会转移到 DLQ,但我想在将邮件发送到 DLQ 之前重试几次。 SQS Example code 没有说明如何处理这个问题,而且 AWS 文档似乎非常零散。
我是否需要建立第二个排队机制来处理这种情况?我是不是误读了重试机制的工作原理?
我认为您缺少的关键概念是 SQS visibility timeout
When a consumer receives and processes a message from a queue, the message remains in the queue. Amazon SQS doesn't automatically delete the message. Because Amazon SQS is a distributed system, there's no guarantee that the consumer actually receives the message (for example, due to a connectivity issue, or due to an issue in the consumer application). Thus, the consumer must delete the message from the queue after receiving and processing it.
可见性超时(可在设置中配置)是为了防止多个消费者尝试同时处理同一条消息(请记住,它还没有真正从队列中删除),同时确保您不会
丢失消息。
如果消息未被删除,它的可见性超时最终会过期,SQS 可以在以后的 receiveMessage
调用中再次 return 它(重新驱动消息)。如果这种情况发生太多次(通常是在消息处理过程中出现未捕获的异常或其他情况时),则 SQS 会将消息传递到 SQS Dead-Letter-Queue (DLQ)(如果您已配置它)。
记住——这里的关键循环本质上是:
while (keepProcessing()) {
Message m = receiveMessage(); // call SQS to get message
processMessage(m); // your own logic to process
deleteMessage(m); // call SQS to ACK the message
}
...如果其中某处抛出异常,则该消息不会丢失 - 它将被重新驱动(基于 DLQ 策略和可见性超时)
(与重新驱动消息不同)用于调用 SDK(即:receiveMessage、deleteMessage 等)。这是为了自动处理间歇性节流、网络或服务问题等问题。
处理你的消息是你自己的逻辑,所以如果你想管理围绕它的任何重试(并确定什么类型的问题可以通过简单的重试来解决),这是你的工作。
我正在用 SQS 处理器替换 JMS 处理器。当我收到消息时,我需要对多个第三方系统进行更新调用。我不确定如何处理成功检索 SQS 消息的情况,但有一个或多个更新第三方系统的调用失败。在 JMS 世界中,我们会抛出一个异常,它会在重试 4 次后最终进入 DLQ 之前重新发送相同的 JMS 消息并进行增量退避。这是我目前的代码。
我有以下辅助方法来整合消息检索
public static List<Message> receiveMessage(SqsClient sqs, String queueUrl) throws AwsException {
try {
ReceiveMessageRequest req = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.waitTimeSeconds(LONG_POLL_DURATION)
.build();
ReceiveMessageResponse resp = sqs.receiveMessage(req);
if (resp != null) {
if (!Collections.isNullOrEmpty(resp.messages())) {
return resp.messages();
} else {
return new ArrayList<>();
}
} else {
return new ArrayList<>();
}
} catch (SdkException e) {
throw new AwsException("An error occurred receiving SQS message: " + e.getMessage(), e);
}
}
我处理 SQS 消息的代码
try (SqsClient client = SqsUtil.getClient()) {
while(!shutdown) {
List<Message> messages = SqsUtil.receiveMessage(client, queueUrl);
if (!messages.isEmpty()) {
for(Message msg : messages) {
boolean errorOccurred = false;
try {
//Convert SQS message to System 1 Request
System1Request req1 = convert(msg);
system1Client.process(req1);
} catch (System1Exception e) {
//log error
errorOccurred = true;
}
try {
//Convert SQS message to System 2 Request
System2Request req2 = convert(msg);
system2Client.process(req2);
} catch (System1Exception e) {
//log error
errorOccurred = true;
}
if (!errorOccurred) {
//delete SQS message
} else {
//TODO: how do I re-process the message using SQS
}
}
}
}
}
根据我的理解,SQS 客户端在 JDK 中内置了重试功能,但我认为这不适用于传输中的消息?我在 API 中看不到如何将消息添加回 SQS 队列。我犹豫要不要在我的应用程序中构建重试逻辑,以防 pod 重新启动并且消息丢失。
我知道如果我不删除邮件,它最终会转移到 DLQ,但我想在将邮件发送到 DLQ 之前重试几次。 SQS Example code 没有说明如何处理这个问题,而且 AWS 文档似乎非常零散。
我是否需要建立第二个排队机制来处理这种情况?我是不是误读了重试机制的工作原理?
我认为您缺少的关键概念是 SQS visibility timeout
When a consumer receives and processes a message from a queue, the message remains in the queue. Amazon SQS doesn't automatically delete the message. Because Amazon SQS is a distributed system, there's no guarantee that the consumer actually receives the message (for example, due to a connectivity issue, or due to an issue in the consumer application). Thus, the consumer must delete the message from the queue after receiving and processing it.
可见性超时(可在设置中配置)是为了防止多个消费者尝试同时处理同一条消息(请记住,它还没有真正从队列中删除),同时确保您不会 丢失消息。
如果消息未被删除,它的可见性超时最终会过期,SQS 可以在以后的 receiveMessage
调用中再次 return 它(重新驱动消息)。如果这种情况发生太多次(通常是在消息处理过程中出现未捕获的异常或其他情况时),则 SQS 会将消息传递到 SQS Dead-Letter-Queue (DLQ)(如果您已配置它)。
记住——这里的关键循环本质上是:
while (keepProcessing()) {
Message m = receiveMessage(); // call SQS to get message
processMessage(m); // your own logic to process
deleteMessage(m); // call SQS to ACK the message
}
...如果其中某处抛出异常,则该消息不会丢失 - 它将被重新驱动(基于 DLQ 策略和可见性超时)
处理你的消息是你自己的逻辑,所以如果你想管理围绕它的任何重试(并确定什么类型的问题可以通过简单的重试来解决),这是你的工作。