在 Spring Integration 中阻止执行发送通道,直到拆分器完成处理产生的执行通道
Prevent execution of sender channel until spawned executor channels from splitter finish processing in Spring Integration
目前,我的程序以 single-threaded 方式执行,它通过上下文文件并发出各种 HTTP 请求,来自 HTTP 请求的响应用于使用 XSLT 创建最终响应。我现在正在尝试 multi-thread 其中一些请求,因为在后续请求中只需要使用第一个 HTTP 请求中的数据。
到目前为止,我已经创建了一个生成 5 条不同消息的拆分器。输出通道是一个线程池大小为 5 的 ExecutorChannel。输出通道是一个 header 值路由器,它使用在拆分器中添加的 header 来决定上下文文件的哪一部分将消息发送到。
<splitter input-channel="spliter-input" output-channel="splitter-output" ref="MultiThreadedSplitter" method="split" />
<channel id="splitterRouter">
<dispatcher task-executor="splitterExecutor" />
<channel/>
<task:executor id="splitterExecutor" pool-size="5" />
<header-value-router input-channel="splitter-output" header-name="splitHeader">
<mapping value="httpRequest1" channel="httpRequest1" />
...
<mapping value="httpRequest5" channel="httpRequest5" />
<header-value-router/>
正如我在生成的日志中看到的那样,以上所有内容都在工作,请求是以 multi-threaded 方式发出的,所有请求都具有不同的线程上下文。除了我遇到的问题是,原始线程似乎正在尝试继续执行上下文文件并完成消息的发送和接收,因此我创建的扩展 MessagingGatewaySupport 的 class 尝试接收响应消息,但响应消息为空,因此导致空指针异常。产生的 5 个线程继续执行,我可以看到请求正在成功发出,最后,我可以看到我想要返回的最终响应,但原始线程已经发回了空指针错误。
有没有办法阻止原来调用splitter的主线程执行?
@Artem 在此先感谢您的帮助:P
看起来您的 MessagingGatewaySupport
配置的 replyTimeout
太短了。由于您将拆分的消息发送到不同的线程,调用者不再有阻塞,所以它只是转到 receive()
部分进行回复。
见
/**
* Set the timeout value for receiving reply messages. If not
* explicitly configured, the default is one second.
* @param replyTimeout the timeout value in milliseconds
*/
public void setReplyTimeout(long replyTimeout) {
和private static final long DEFAULT_TIMEOUT = 1000L;
如果你真的return最后有一些回复,这一切都会很好。
如果你根本没有它并且仍然想阻止直到某些事件,请考虑使用 Barrier
组件:https://docs.spring.io/spring-integration/reference/html/message-routing.html#barrier
目前,我的程序以 single-threaded 方式执行,它通过上下文文件并发出各种 HTTP 请求,来自 HTTP 请求的响应用于使用 XSLT 创建最终响应。我现在正在尝试 multi-thread 其中一些请求,因为在后续请求中只需要使用第一个 HTTP 请求中的数据。
到目前为止,我已经创建了一个生成 5 条不同消息的拆分器。输出通道是一个线程池大小为 5 的 ExecutorChannel。输出通道是一个 header 值路由器,它使用在拆分器中添加的 header 来决定上下文文件的哪一部分将消息发送到。
<splitter input-channel="spliter-input" output-channel="splitter-output" ref="MultiThreadedSplitter" method="split" />
<channel id="splitterRouter">
<dispatcher task-executor="splitterExecutor" />
<channel/>
<task:executor id="splitterExecutor" pool-size="5" />
<header-value-router input-channel="splitter-output" header-name="splitHeader">
<mapping value="httpRequest1" channel="httpRequest1" />
...
<mapping value="httpRequest5" channel="httpRequest5" />
<header-value-router/>
正如我在生成的日志中看到的那样,以上所有内容都在工作,请求是以 multi-threaded 方式发出的,所有请求都具有不同的线程上下文。除了我遇到的问题是,原始线程似乎正在尝试继续执行上下文文件并完成消息的发送和接收,因此我创建的扩展 MessagingGatewaySupport 的 class 尝试接收响应消息,但响应消息为空,因此导致空指针异常。产生的 5 个线程继续执行,我可以看到请求正在成功发出,最后,我可以看到我想要返回的最终响应,但原始线程已经发回了空指针错误。
有没有办法阻止原来调用splitter的主线程执行?
@Artem 在此先感谢您的帮助:P
看起来您的 MessagingGatewaySupport
配置的 replyTimeout
太短了。由于您将拆分的消息发送到不同的线程,调用者不再有阻塞,所以它只是转到 receive()
部分进行回复。
见
/**
* Set the timeout value for receiving reply messages. If not
* explicitly configured, the default is one second.
* @param replyTimeout the timeout value in milliseconds
*/
public void setReplyTimeout(long replyTimeout) {
和private static final long DEFAULT_TIMEOUT = 1000L;
如果你真的return最后有一些回复,这一切都会很好。
如果你根本没有它并且仍然想阻止直到某些事件,请考虑使用 Barrier
组件:https://docs.spring.io/spring-integration/reference/html/message-routing.html#barrier