Spring 整合 - 屏障
Spring Integration - Barrier
我有一个使用网关 (void) 向两者 (pub/sub) 发送消息的线程:
- barrier,在真正执行时保持线程(requires-reply="true" timeout="XXXX",output-channel="nullChannel"
和
- 拆分器接下来将拆分作为消息发送到带有轮询器的服务激活器(直接通道)和实际 processing/execution
的线程执行器
如何正确配置处理可能由执行程序线程抛出的异常并在下面的 catch 块中捕获它们:
try {
gateway.trigger()
} catch (ReplyRequiredException e) {
//fine here
} catch (Throwable t) {
// catch every exception here... or somehow configure these exceptions to discard the thread that waits on the barrier and throw below business exception
throw new SomeExecutionFailedException()
}
编辑
<!--gateway.trigger()—>
<int:gateway id=“gateway"
service-interface="com.Gateway"
default-request-channel=“channel1"
default-reply-timeout="0"/>
<int:publish-subscribe-channel id=“channel1"/>
<int:splitter input-channel=“channel1" output-channel=“channel2"
order="1">
<bean class=“com.Splitter"/>
</int:splitter>
<int:barrier id=“barrier" input-channel=“channel1"
output-channel="nullChannel"
correlation-strategy-expression=“'XXX’” <!--hack here-->
requires-reply="true"
timeout=“40000"
order="2">
</int:barrier>
<int:channel id=“channel2">
<int:queue capacity="30"/>
</int:channel>
<!— actual processing/execution —>
<int:service-activator id=“executionAct" input-channel=“channel2"
output-channel=“channel3" ref=“executionService">
<int:poller fixed-rate="111" time-unit="MILLISECONDS" max-messages-per-poll="22"
task-executor=“exec"/>
</int:service-activator>
<bean id=“executionService" class=“com.SomeExecService"/>
<bean id=“exec" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadFactory" ref=“execFactory"/>
...
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy"/>
</property>
</bean>
<bean id=“execFactory"
class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
...
</bean>
<int:channel id=“channel3"/>
<int:chain input-channel=“channel3" output-channel=“channel4">
...
<int:aggregator
group-timeout=“30000"
discard-channel=“discardChannel" release-strategy=“com.ReleaseStrategy"
send-partial-result-on-expiry="false">
<bean class="com.Aggregator"/>
</int:aggregator>
</int:chain>
<int:channel id=“discardChannel”/>
<int:channel id=“channel4"/>
<!— processing done - wake up barrier —>
<int:service-activator id=“barrierReleaseAct" input-channel=“channel4" output-channel="nullChannel">
<bean class="com.ServiceThatSendsXXXMessageToChannel5ToReleaseBarrier"/>
</int:service-activator>
<int:channel id=“channel5"/>
<int:outbound-channel-adapter channel=“channel5"
ref=“barrier" method="trigger"/>
您需要提供更多信息、配置等。
什么释放了屏障,什么时候?
是否要将异常传播到主线程?
多次拆分失败怎么办等等
一般的答案是向屏障的 trigger
方法发送一个带有 Throwable
有效负载的消息,将通过抛出一个 MessagingException
作为它的 Throwable
来释放线程原因。网关解包 MessagingException
并抛出原因(这是发送到屏障的触发方法的原始有效负载)。
编辑
我添加了一个 pull request to the barrier sample app 来展示一种在异步线程上收集异常并导致屏障将合并的异常抛回给网关调用者的技术。
我有一个使用网关 (void) 向两者 (pub/sub) 发送消息的线程:
- barrier,在真正执行时保持线程(requires-reply="true" timeout="XXXX",output-channel="nullChannel"
和
- 拆分器接下来将拆分作为消息发送到带有轮询器的服务激活器(直接通道)和实际 processing/execution 的线程执行器
如何正确配置处理可能由执行程序线程抛出的异常并在下面的 catch 块中捕获它们:
try {
gateway.trigger()
} catch (ReplyRequiredException e) {
//fine here
} catch (Throwable t) {
// catch every exception here... or somehow configure these exceptions to discard the thread that waits on the barrier and throw below business exception
throw new SomeExecutionFailedException()
}
编辑
<!--gateway.trigger()—>
<int:gateway id=“gateway"
service-interface="com.Gateway"
default-request-channel=“channel1"
default-reply-timeout="0"/>
<int:publish-subscribe-channel id=“channel1"/>
<int:splitter input-channel=“channel1" output-channel=“channel2"
order="1">
<bean class=“com.Splitter"/>
</int:splitter>
<int:barrier id=“barrier" input-channel=“channel1"
output-channel="nullChannel"
correlation-strategy-expression=“'XXX’” <!--hack here-->
requires-reply="true"
timeout=“40000"
order="2">
</int:barrier>
<int:channel id=“channel2">
<int:queue capacity="30"/>
</int:channel>
<!— actual processing/execution —>
<int:service-activator id=“executionAct" input-channel=“channel2"
output-channel=“channel3" ref=“executionService">
<int:poller fixed-rate="111" time-unit="MILLISECONDS" max-messages-per-poll="22"
task-executor=“exec"/>
</int:service-activator>
<bean id=“executionService" class=“com.SomeExecService"/>
<bean id=“exec" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadFactory" ref=“execFactory"/>
...
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy"/>
</property>
</bean>
<bean id=“execFactory"
class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
...
</bean>
<int:channel id=“channel3"/>
<int:chain input-channel=“channel3" output-channel=“channel4">
...
<int:aggregator
group-timeout=“30000"
discard-channel=“discardChannel" release-strategy=“com.ReleaseStrategy"
send-partial-result-on-expiry="false">
<bean class="com.Aggregator"/>
</int:aggregator>
</int:chain>
<int:channel id=“discardChannel”/>
<int:channel id=“channel4"/>
<!— processing done - wake up barrier —>
<int:service-activator id=“barrierReleaseAct" input-channel=“channel4" output-channel="nullChannel">
<bean class="com.ServiceThatSendsXXXMessageToChannel5ToReleaseBarrier"/>
</int:service-activator>
<int:channel id=“channel5"/>
<int:outbound-channel-adapter channel=“channel5"
ref=“barrier" method="trigger"/>
您需要提供更多信息、配置等。
什么释放了屏障,什么时候?
是否要将异常传播到主线程?
多次拆分失败怎么办等等
一般的答案是向屏障的 trigger
方法发送一个带有 Throwable
有效负载的消息,将通过抛出一个 MessagingException
作为它的 Throwable
来释放线程原因。网关解包 MessagingException
并抛出原因(这是发送到屏障的触发方法的原始有效负载)。
编辑
我添加了一个 pull request to the barrier sample app 来展示一种在异步线程上收集异常并导致屏障将合并的异常抛回给网关调用者的技术。