concurrentConsumers 值会影响 Camel 处理器线程数吗?
Does concurrentConsumers value affect number of Camel processor threads?
我有一个 Apache Camel 路由和处理器从 ActiveMQ 代理消费。
航线代码-
@Component
public class MyRoute extends RouteBuilder {
private String mySubscription;
private MyProcessor myProcessor;
public MyRoute(@Value("${my.topic}") String mySubscription, MyProcessor myProcessor) {
this.mySubscription = mySubscription;
this.myProcessor = myProcessor;
}
@Override
public void configure() {
from(mySubscription)
.unmarshal().json(JsonLibrary.Jackson, MyDTO.class)
.bean(myProcessor, "process(${body})")
.end();
}
}
处理器代码-
@Slf4j
@Component
@AllArgsConstructor
public class MyProcessor {
public void process(MyDTO dto) {
//code that calls HTTP URLs
}
}
配置如下-
spring:
application:
name: my_listener
//Bean prefixes
pooledConnectionFactory:
maxConnections: 10
connectionFactory:
brokerURL: ${brokerURL}
redeliveryPolicy:
backOffMultiplier: 2.0
useExponentialBackOff: true
redeliveryDelay: 60000
maximumRedeliveries: 5
component:
forceSendOriginalMessage: true
concurrentConsumers: 15
//A bunch of HTTP URLs
brokerURL: <brokerURL>
在本地 运行 侦听器上并将 VisualVM 指向本地 ActiveMQ 代理,我可以在 VisualVM 线程选项卡下看到 15 个名称包含订阅名称的线程。如果我发送了 4 条消息,我会看到 4 个不同的线程进入 运行 状态。我没有看到任何可识别为处理器 class 对象的线程,尽管在 MBean 选项卡中的处理器下有一个 bean。在该 bean 上调用 getTotalExchanges() 显示 4 = 没有发送的消息。
concurrentConsumers设置(此处为15)是否只创建15个消费线程?通过处理器 class 的处理是否仍然按顺序进行?还是每个订阅线程都在自己的线程中调用处理器对象,从而使处理器逻辑成为多线程?
concurrentConsumers设置(此处为15)是否只创建15个消费线程?
Ans - 是的,懒惰地。您可以通过提供自己的线程池来控制它。
通过处理器 class 的处理是否仍然按顺序进行?还是每个订阅线程都在自己的线程中调用处理器对象,从而使处理器逻辑成为多线程?
答案 - 不,它是并行发生的,就像任何多线程应用程序一样,这意味着如果使用任何共享数据,处理方法应该是线程安全的。
我有一个 Apache Camel 路由和处理器从 ActiveMQ 代理消费。
航线代码-
@Component
public class MyRoute extends RouteBuilder {
private String mySubscription;
private MyProcessor myProcessor;
public MyRoute(@Value("${my.topic}") String mySubscription, MyProcessor myProcessor) {
this.mySubscription = mySubscription;
this.myProcessor = myProcessor;
}
@Override
public void configure() {
from(mySubscription)
.unmarshal().json(JsonLibrary.Jackson, MyDTO.class)
.bean(myProcessor, "process(${body})")
.end();
}
}
处理器代码-
@Slf4j
@Component
@AllArgsConstructor
public class MyProcessor {
public void process(MyDTO dto) {
//code that calls HTTP URLs
}
}
配置如下-
spring:
application:
name: my_listener
//Bean prefixes
pooledConnectionFactory:
maxConnections: 10
connectionFactory:
brokerURL: ${brokerURL}
redeliveryPolicy:
backOffMultiplier: 2.0
useExponentialBackOff: true
redeliveryDelay: 60000
maximumRedeliveries: 5
component:
forceSendOriginalMessage: true
concurrentConsumers: 15
//A bunch of HTTP URLs
brokerURL: <brokerURL>
在本地 运行 侦听器上并将 VisualVM 指向本地 ActiveMQ 代理,我可以在 VisualVM 线程选项卡下看到 15 个名称包含订阅名称的线程。如果我发送了 4 条消息,我会看到 4 个不同的线程进入 运行 状态。我没有看到任何可识别为处理器 class 对象的线程,尽管在 MBean 选项卡中的处理器下有一个 bean。在该 bean 上调用 getTotalExchanges() 显示 4 = 没有发送的消息。
concurrentConsumers设置(此处为15)是否只创建15个消费线程?通过处理器 class 的处理是否仍然按顺序进行?还是每个订阅线程都在自己的线程中调用处理器对象,从而使处理器逻辑成为多线程?
concurrentConsumers设置(此处为15)是否只创建15个消费线程?
Ans - 是的,懒惰地。您可以通过提供自己的线程池来控制它。
通过处理器 class 的处理是否仍然按顺序进行?还是每个订阅线程都在自己的线程中调用处理器对象,从而使处理器逻辑成为多线程?
答案 - 不,它是并行发生的,就像任何多线程应用程序一样,这意味着如果使用任何共享数据,处理方法应该是线程安全的。