带有 ThreadpoolExecutor 的 SQSListener
SQSListener with ThreadpoolExecutor
在下面的示例中,我将最大和核心池大小设置为 1。但是没有消息正在处理。当我启用调试日志时,我能够看到从 SQS 中提取的消息,但我猜它没有被处理/删除。但是,当我将核心和最大池大小增加到 2 时,消息似乎已被处理。
编辑
我相信 Spring 可能为从队列中读取数据的接收器分配线程,因此它无法为正在处理消息的侦听器分配线程。当我将 corepoolsize 增加到 2 时,我看到正在从队列中读取消息。当我添加另一个侦听器(用于死信队列)时,我遇到了同样的问题 - 2 个线程不够用,因为消息没有被处理。当我将 corepoolsize 增加到 3 时,它开始处理消息。我假设在这种情况下,分配了 1 个线程来读取队列中的消息,并为 2 个侦听器分配了 1 个线程。
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
侦听器配置
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}
通过将 corePoolSize
和 maximumPoolSize
设置为相同,您创建了一个 fixed-size thread pool
。记录了对规则的很好解释 here
设置 maxPoolSize
隐式允许删除任务。
但是,默认队列容量为 Integer.MAX_VALUE
,在实际应用中为无穷大。
需要注意的是 ThreadPoolTaskExecutor
在下面使用了 ThreadPoolExecutor
,它有一种不寻常的排队方法,在 the docs:
中有描述
If corePoolSize
or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
这意味着 maxPoolSize
仅在队列已满时才相关,否则线程数永远不会超过 corePoolSize
。
例如,如果我们提交任务 永远不会完成 到线程池:
- 前
corePoolSize
个提交将分别启动一个新线程;
- 之后,所有提交都进入队列;
- 如果队列有限且其容量已用尽,则每次提交都会启动一个新线程,最多
maxPoolSize
;
- 当池和队列都已满时,新的提交将被拒绝。
排队 - 阅读docs
任何 BlockingQueue
可用于传输和保留提交的任务。此队列的使用与池大小交互:
- 如果少于 corePoolSize 个线程 运行ning,Executor 总是
更喜欢添加新线程而不是排队。
- 如果 corePoolSize 或更多线程处于 运行ning 状态,Executor 总是
更喜欢排队请求而不是添加新线程。
- 如果请求无法排队,除非创建一个新线程
这将超过 maximumPoolSize,在这种情况下,任务将
被拒绝了。
Unbounded queues
. Using an unbounded queue (for example a
LinkedBlockingQueue
without a predefined capacity) will cause new
tasks to be queued in cases where all corePoolSize threads are busy.
Thus, no more than corePoolSize
threads will ever be created. (And the
value of the maximumPoolSize
therefore doesn't have any effect.)
- 如果线程数小于
corePoolSize
,新建一个
线程到 运行 一个新任务。
- 如果线程数等于(或大于)
corePoolSize
,将任务放入队列。
- 如果队列已满,线程数小于
maxPoolSize
,为 运行 中的任务创建一个新线程。
- 如果队列已满,且线程数大于或
等于
maxPoolSize
,拒绝任务
在下面的示例中,我将最大和核心池大小设置为 1。但是没有消息正在处理。当我启用调试日志时,我能够看到从 SQS 中提取的消息,但我猜它没有被处理/删除。但是,当我将核心和最大池大小增加到 2 时,消息似乎已被处理。
编辑
我相信 Spring 可能为从队列中读取数据的接收器分配线程,因此它无法为正在处理消息的侦听器分配线程。当我将 corepoolsize 增加到 2 时,我看到正在从队列中读取消息。当我添加另一个侦听器(用于死信队列)时,我遇到了同样的问题 - 2 个线程不够用,因为消息没有被处理。当我将 corepoolsize 增加到 3 时,它开始处理消息。我假设在这种情况下,分配了 1 个线程来读取队列中的消息,并为 2 个侦听器分配了 1 个线程。
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
侦听器配置
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}
通过将 corePoolSize
和 maximumPoolSize
设置为相同,您创建了一个 fixed-size thread pool
。记录了对规则的很好解释 here
设置 maxPoolSize
隐式允许删除任务。
但是,默认队列容量为 Integer.MAX_VALUE
,在实际应用中为无穷大。
需要注意的是 ThreadPoolTaskExecutor
在下面使用了 ThreadPoolExecutor
,它有一种不寻常的排队方法,在 the docs:
If
corePoolSize
or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
这意味着 maxPoolSize
仅在队列已满时才相关,否则线程数永远不会超过 corePoolSize
。
例如,如果我们提交任务 永远不会完成 到线程池:
- 前
corePoolSize
个提交将分别启动一个新线程; - 之后,所有提交都进入队列;
- 如果队列有限且其容量已用尽,则每次提交都会启动一个新线程,最多
maxPoolSize
; - 当池和队列都已满时,新的提交将被拒绝。
排队 - 阅读docs
任何 BlockingQueue
可用于传输和保留提交的任务。此队列的使用与池大小交互:
- 如果少于 corePoolSize 个线程 运行ning,Executor 总是 更喜欢添加新线程而不是排队。
- 如果 corePoolSize 或更多线程处于 运行ning 状态,Executor 总是 更喜欢排队请求而不是添加新线程。
- 如果请求无法排队,除非创建一个新线程 这将超过 maximumPoolSize,在这种情况下,任务将 被拒绝了。
Unbounded queues
. Using an unbounded queue (for example aLinkedBlockingQueue
without a predefined capacity) will cause new tasks to be queued in cases where all corePoolSize threads are busy. Thus, no more thancorePoolSize
threads will ever be created. (And the value of themaximumPoolSize
therefore doesn't have any effect.)
- 如果线程数小于
corePoolSize
,新建一个 线程到 运行 一个新任务。 - 如果线程数等于(或大于)
corePoolSize
,将任务放入队列。 - 如果队列已满,线程数小于
maxPoolSize
,为 运行 中的任务创建一个新线程。 - 如果队列已满,且线程数大于或
等于
maxPoolSize
,拒绝任务