连续侦听 AWS SQS 消息的模式
Pattern to continuously listen to AWS SQS messages
我有一个名为 QueueService
的简单 class,其中一些方法包装了 Java 的 AWS SQS SDK 中的方法。例如:
public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();
ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
for(Message message : messages) {
Hashtable<String, String> resultItem = new Hashtable<String, String>();
resultItem.put("MessageId", message.getMessageId());
resultItem.put("ReceiptHandle", message.getReceiptHandle());
resultItem.put("Body", message.getBody());
resultList.add(resultItem);
}
return resultList;
}
我有另一个名为 App
的另一个 class,它有一个 main
并创建了一个 QueueService
的实例。
我正在寻找 "pattern" 来使 App
中的 main
监听队列中的新消息。现在我有一个 while(true)
循环,我在其中调用 receiveMessages
方法:
while(true) {
messages = queueService.receiveMessages(queueURL);
for(Hashtable<String, String> message: messages) {
String receiptHandle = message.get("ReceiptHandle");
String messageBody = message.get("MessageBody");
System.out.println(messageBody);
queueService.deleteMessage(queueURL, receiptHandle);
}
}
这是正确的方法吗?我应该使用SQS SDK中的异步消息接收方法吗?
您缺少一些东西:
- 使用
receiveMessages(ReceiveMessageRequest)
并设置等待时间以启用长轮询。
- 将您的 AWS 调用包装在 try/catch 块中。特别要注意
OverLimitException
,如果你有太多的动态消息,它可以从 receiveMessages()
中抛出。
- 将
while
循环的整个主体包裹在它自己的 try/catch 块中,记录捕获到的任何异常(不应该存在 - 这是为了确保您的应用程序不会'崩溃是因为 AWS 更改了它们的 API 或者您忽略了处理预期的异常)。
有关长轮询和可能的异常的详细信息,请参阅 doc。
关于使用异步客户端:你有什么特别的理由使用它吗?如果没有,那就不要:单个接收器线程更容易管理。
如果您想使用 SQS 然后使用 lambda 来处理请求,您可以按照 link 中给出的步骤进行操作,或者您始终使用 lambda 而不是 SQS 并为每个请求调用 lambda。
据我所知,Amazon SQS 无法支持主动侦听器模型,其中 Amazon SQS 会 "push" 向您的侦听器发送消息,或者在有消息时调用您的消息侦听器。
因此,您总是需要轮询消息。轮询支持两种轮询机制 - 短轮询和长轮询。每个都有自己的优点和缺点,但在大多数情况下,您通常最终会使用长轮询,尽管默认的轮询是短轮询。长轮询机制在网络流量方面绝对更有效,更具成本效益(因为亚马逊根据请求的数量向您收费),并且当您希望以时间敏感的方式处理消息时,它也是首选机制( ~= 尽快处理)。
关于长轮询和短轮询还有更多值得了解的错综复杂的内容,在这里解释所有这些有点困难,但如果您愿意,可以通过以下博客阅读更多关于此的详细信息。它还有一些代码示例,应该会有帮助。
http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/
就 while(true) 循环而言,我会说这取决于。
如果您正在使用长轮询,并且可以将等待时间设置为(最大)20 秒,这样如果没有消息,您轮询 SQS 的频率不会超过 20 秒。如果有消息,您可以决定是频繁轮询(消息一到达就处理)还是总是按时间间隔(比如每 n 秒)处理它们。
另一点需要注意的是,您可以在单个 receiveMessages 请求中读取多达 10 条消息,因此这也将减少您对 SQS 的调用次数,从而降低成本。而且正如上面的博客详细解释的那样,你可能会请求阅读 10 条消息,但即使队列中有那么多消息,它也可能不会 return 你 10。
但总的来说,如果您希望在运行时关闭轮询,我会说您需要构建适当的挂钩和异常处理,以防您使用 while(true) 类型的结构。
要考虑的另一个方面是您是想在主应用程序线程中轮询 SQS 还是想生成另一个线程。因此,另一种选择是在主线程中创建一个带有单个线程的 ScheduledThreadPoolExecutor,以安排一个线程定期(每隔几秒)轮询 SQS,并且您可能不需要 while(true) 结构。
截至 2019 年,SQS 可以触发 lambda:
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
我有一个名为 QueueService
的简单 class,其中一些方法包装了 Java 的 AWS SQS SDK 中的方法。例如:
public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();
ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
for(Message message : messages) {
Hashtable<String, String> resultItem = new Hashtable<String, String>();
resultItem.put("MessageId", message.getMessageId());
resultItem.put("ReceiptHandle", message.getReceiptHandle());
resultItem.put("Body", message.getBody());
resultList.add(resultItem);
}
return resultList;
}
我有另一个名为 App
的另一个 class,它有一个 main
并创建了一个 QueueService
的实例。
我正在寻找 "pattern" 来使 App
中的 main
监听队列中的新消息。现在我有一个 while(true)
循环,我在其中调用 receiveMessages
方法:
while(true) {
messages = queueService.receiveMessages(queueURL);
for(Hashtable<String, String> message: messages) {
String receiptHandle = message.get("ReceiptHandle");
String messageBody = message.get("MessageBody");
System.out.println(messageBody);
queueService.deleteMessage(queueURL, receiptHandle);
}
}
这是正确的方法吗?我应该使用SQS SDK中的异步消息接收方法吗?
您缺少一些东西:
- 使用
receiveMessages(ReceiveMessageRequest)
并设置等待时间以启用长轮询。 - 将您的 AWS 调用包装在 try/catch 块中。特别要注意
OverLimitException
,如果你有太多的动态消息,它可以从receiveMessages()
中抛出。 - 将
while
循环的整个主体包裹在它自己的 try/catch 块中,记录捕获到的任何异常(不应该存在 - 这是为了确保您的应用程序不会'崩溃是因为 AWS 更改了它们的 API 或者您忽略了处理预期的异常)。
有关长轮询和可能的异常的详细信息,请参阅 doc。
关于使用异步客户端:你有什么特别的理由使用它吗?如果没有,那就不要:单个接收器线程更容易管理。
如果您想使用 SQS 然后使用 lambda 来处理请求,您可以按照 link 中给出的步骤进行操作,或者您始终使用 lambda 而不是 SQS 并为每个请求调用 lambda。
据我所知,Amazon SQS 无法支持主动侦听器模型,其中 Amazon SQS 会 "push" 向您的侦听器发送消息,或者在有消息时调用您的消息侦听器。
因此,您总是需要轮询消息。轮询支持两种轮询机制 - 短轮询和长轮询。每个都有自己的优点和缺点,但在大多数情况下,您通常最终会使用长轮询,尽管默认的轮询是短轮询。长轮询机制在网络流量方面绝对更有效,更具成本效益(因为亚马逊根据请求的数量向您收费),并且当您希望以时间敏感的方式处理消息时,它也是首选机制( ~= 尽快处理)。
关于长轮询和短轮询还有更多值得了解的错综复杂的内容,在这里解释所有这些有点困难,但如果您愿意,可以通过以下博客阅读更多关于此的详细信息。它还有一些代码示例,应该会有帮助。
http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/
就 while(true) 循环而言,我会说这取决于。 如果您正在使用长轮询,并且可以将等待时间设置为(最大)20 秒,这样如果没有消息,您轮询 SQS 的频率不会超过 20 秒。如果有消息,您可以决定是频繁轮询(消息一到达就处理)还是总是按时间间隔(比如每 n 秒)处理它们。
另一点需要注意的是,您可以在单个 receiveMessages 请求中读取多达 10 条消息,因此这也将减少您对 SQS 的调用次数,从而降低成本。而且正如上面的博客详细解释的那样,你可能会请求阅读 10 条消息,但即使队列中有那么多消息,它也可能不会 return 你 10。
但总的来说,如果您希望在运行时关闭轮询,我会说您需要构建适当的挂钩和异常处理,以防您使用 while(true) 类型的结构。
要考虑的另一个方面是您是想在主应用程序线程中轮询 SQS 还是想生成另一个线程。因此,另一种选择是在主线程中创建一个带有单个线程的 ScheduledThreadPoolExecutor,以安排一个线程定期(每隔几秒)轮询 SQS,并且您可能不需要 while(true) 结构。
截至 2019 年,SQS 可以触发 lambda: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html