数量并行处理简单队列服务(SQS)
Amount parallel processing Simple Queue Service (SQS)
我正在使用 Spring 云来使用简单队列服务 (SQS)。我有以下并行处理配置:
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(50);
return simpleAsyncTaskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAutoStartup(true);
factory.setTaskExecutor(simpleAsyncTaskExecutor);
factory.setWaitTimeOut(20);
factory.setMaxNumberOfMessages(10);
return factory;
}
我需要在 50 个线程中处理 50 条消息(bean SimpleAsyncTaskExecutor 中的配置),但并行处理的只有 10 条消息(从 SQS 返回的 maxNumberOfMessages)
如何处理 50 条消息而不是 10 条消息?
我不会过多关注特定数字(例如 50 个线程的 50 条消息)。尝试对其进行性能测试(构建一些东西以在高峰时段将预期数量的消息推送到队列,并让您的服务处理它们,以查看它是否存在瓶颈)。
根据你的实际问题,你不能。 AWS SQS 根本不支持获取超过 10 条消息 pr。要求。请参阅 http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html 以供参考。 (在第 1 段)。
我找到了解决方案。
方法需要注解@Async
,将deletionPolicy
改为NEVER
,最后执行时删除消息
这样,队列消耗将遵守配置的线程数。例如,如果您有 50 个线程,将在 SQS 队列中发出 5 个请求(每个请求 10 条消息),从而并行处理总共 50 条消息。
代码如下所示:
@Async
@SqsListener(value = "sqsName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void consume(String message, Acknowledgment acknowledgment) throws InterruptedException, ExecutionException {
//your code
acknowledgment.acknowledge().get(); //To delete message from queue
}
我正在使用 Spring 云来使用简单队列服务 (SQS)。我有以下并行处理配置:
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(50);
return simpleAsyncTaskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAutoStartup(true);
factory.setTaskExecutor(simpleAsyncTaskExecutor);
factory.setWaitTimeOut(20);
factory.setMaxNumberOfMessages(10);
return factory;
}
我需要在 50 个线程中处理 50 条消息(bean SimpleAsyncTaskExecutor 中的配置),但并行处理的只有 10 条消息(从 SQS 返回的 maxNumberOfMessages)
如何处理 50 条消息而不是 10 条消息?
我不会过多关注特定数字(例如 50 个线程的 50 条消息)。尝试对其进行性能测试(构建一些东西以在高峰时段将预期数量的消息推送到队列,并让您的服务处理它们,以查看它是否存在瓶颈)。
根据你的实际问题,你不能。 AWS SQS 根本不支持获取超过 10 条消息 pr。要求。请参阅 http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html 以供参考。 (在第 1 段)。
我找到了解决方案。
方法需要注解@Async
,将deletionPolicy
改为NEVER
,最后执行时删除消息
这样,队列消耗将遵守配置的线程数。例如,如果您有 50 个线程,将在 SQS 队列中发出 5 个请求(每个请求 10 条消息),从而并行处理总共 50 条消息。
代码如下所示:
@Async
@SqsListener(value = "sqsName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void consume(String message, Acknowledgment acknowledgment) throws InterruptedException, ExecutionException {
//your code
acknowledgment.acknowledge().get(); //To delete message from queue
}