如何在 mule 中并行处理列表?
How to process a list in parallel in mule?
我有一个对象列表,现在我正在 foreach 中处理它。该列表只不过是一串在内部启动其他内容的 ID。
<flow name="flow1" processingStrategy="synchronous">
<quartz:inbound-endpoint jobName="integration" repeatInterval="86400000" responseTimeout="10000" doc:name="Quartz" >
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<component class="RequestFeeder" doc:name="RequestFeeder"/>
<foreach collection="#[payload]" doc:name="For Each">
<flow-ref name="createFlow" doc:name="createFlow"/>
<flow-ref name="queueFlow" doc:name="queueFlow"/>
<flow-ref name="statusCheckFlow" doc:name="statusCheckFlow"/>
<flow-ref name="resultsFlow" doc:name="resultsFlow"/>
<flow-ref name="sftpFlow" doc:name="sftpFlow"/>
<logger message="RequestType #[flowVars['rqstType']] complete" level="INFO" doc:name="Done"/>
</foreach>
<logger message="ALL 15 REQUESTS HAVE BEEN PROCESSED" level="INFO" doc:name="Logger"/>
</flow>
我想并行处理它们。即对列表中的所有 15 个请求并行执行相同的 4 个流引用。这看起来很简单,但我还没有弄明白。任何帮助表示赞赏。
我想您仍然希望这四个流按顺序流向 运行,对吧?
如果不是这种情况,您可以随时更改线程配置文件。
您可以做的另一件事是将四个流包装在一个异步范围内,尽管您可能需要更改处理器。
无论如何,我认为你最好使用分散收集组件:
- https://developer.mulesoft.com/docs/display/current/Scatter-Gather
- https://www.mulesoft.com/exchange#!/scatter-gather-flow-control
这不需要 for each 范围将拆分列表并在不同的线程中执行每个项目。您可以定义要并行 运行 的线程数(因此您不只是使用池来旋转新线程)。
最后要注意的是,要汇总所有已处理项目的结果。我认为您可以使用自定义聚合策略来更改它,但不确定是否真的如此,请查看相关文档。
HTH
Mule组件的Scatter-gather是便于并行处理的组件之一,下面是一个简单的例子:-
<scatter-gather >
<flow-ref name="flow1" />
<flow-ref name="flow2" />
<flow-ref name="flow3" />
</scatter-gather>
因此,您想要并行执行的流程可以保存在
中
分散-聚集方法的一种替代方法是简单地拆分集合并对列表中的项目使用 VM 队列。如果您不需要等待并收集所有 15 个结果,此方法会更简单,如果您这样做仍然有效。
尝试这样的事情。 Mule 自动使用线程池 (more info) 来 运行 您的流程,因此下面的 requestProcessor 流程将并行处理您的请求。
<flow name="scheduleRequests">
<quartz:inbound-endpoint jobName="integration" repeatInterval="86400000" responseTimeout="10000" doc:name="Quartz" >
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<component class="RequestFeeder" doc:name="RequestFeeder"/>
<collection-splitter />
<vm:outbound-endpoint path="requests" />
</flow>
<flow name="requestProcessor">
<vm:inbound-endpoint path="requests" />
<flow-ref name="createFlow" doc:name="createFlow"/>
<flow-ref name="queueFlow" doc:name="queueFlow"/>
<flow-ref name="statusCheckFlow" doc:name="statusCheckFlow"/>
<flow-ref name="resultsFlow" doc:name="resultsFlow"/>
<flow-ref name="sftpFlow" doc:name="sftpFlow"/>
</flow>
你说4个流,但列表包含5个流。如果您希望所有流按顺序执行,但集合中的每个项目并行执行,您将需要一个拆分器,后跟一个包含所有 (4/5) 流的单独 vm 流,如此处解释:https://support.mulesoft.com/s/article/Concurrently-processing-Collection-and-getting-the-results.
如果您希望循环内的流程并行执行,那么您可以选择 Scatter-Gather 组件。
重要的是要弄清楚您想要实现的两件事中的哪一件,因为解决方案会非常不同。所以基本的区别是,在 Scatter-Gather 中,单个消息被发送给多个接收者以并行处理,但在 Splitter-Aggregator 中,单个消息被拆分为多个子消息并单独处理,然后聚合。参见:http://muthurajud.blogspot.com/2016/07/eai-patterns-scattergather-versus.html
我有一个对象列表,现在我正在 foreach 中处理它。该列表只不过是一串在内部启动其他内容的 ID。
<flow name="flow1" processingStrategy="synchronous">
<quartz:inbound-endpoint jobName="integration" repeatInterval="86400000" responseTimeout="10000" doc:name="Quartz" >
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<component class="RequestFeeder" doc:name="RequestFeeder"/>
<foreach collection="#[payload]" doc:name="For Each">
<flow-ref name="createFlow" doc:name="createFlow"/>
<flow-ref name="queueFlow" doc:name="queueFlow"/>
<flow-ref name="statusCheckFlow" doc:name="statusCheckFlow"/>
<flow-ref name="resultsFlow" doc:name="resultsFlow"/>
<flow-ref name="sftpFlow" doc:name="sftpFlow"/>
<logger message="RequestType #[flowVars['rqstType']] complete" level="INFO" doc:name="Done"/>
</foreach>
<logger message="ALL 15 REQUESTS HAVE BEEN PROCESSED" level="INFO" doc:name="Logger"/>
</flow>
我想并行处理它们。即对列表中的所有 15 个请求并行执行相同的 4 个流引用。这看起来很简单,但我还没有弄明白。任何帮助表示赞赏。
我想您仍然希望这四个流按顺序流向 运行,对吧? 如果不是这种情况,您可以随时更改线程配置文件。
您可以做的另一件事是将四个流包装在一个异步范围内,尽管您可能需要更改处理器。
无论如何,我认为你最好使用分散收集组件:
- https://developer.mulesoft.com/docs/display/current/Scatter-Gather
- https://www.mulesoft.com/exchange#!/scatter-gather-flow-control
这不需要 for each 范围将拆分列表并在不同的线程中执行每个项目。您可以定义要并行 运行 的线程数(因此您不只是使用池来旋转新线程)。
最后要注意的是,要汇总所有已处理项目的结果。我认为您可以使用自定义聚合策略来更改它,但不确定是否真的如此,请查看相关文档。
HTH
Mule组件的Scatter-gather是便于并行处理的组件之一,下面是一个简单的例子:-
<scatter-gather >
<flow-ref name="flow1" />
<flow-ref name="flow2" />
<flow-ref name="flow3" />
</scatter-gather>
因此,您想要并行执行的流程可以保存在
中分散-聚集方法的一种替代方法是简单地拆分集合并对列表中的项目使用 VM 队列。如果您不需要等待并收集所有 15 个结果,此方法会更简单,如果您这样做仍然有效。
尝试这样的事情。 Mule 自动使用线程池 (more info) 来 运行 您的流程,因此下面的 requestProcessor 流程将并行处理您的请求。
<flow name="scheduleRequests">
<quartz:inbound-endpoint jobName="integration" repeatInterval="86400000" responseTimeout="10000" doc:name="Quartz" >
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<component class="RequestFeeder" doc:name="RequestFeeder"/>
<collection-splitter />
<vm:outbound-endpoint path="requests" />
</flow>
<flow name="requestProcessor">
<vm:inbound-endpoint path="requests" />
<flow-ref name="createFlow" doc:name="createFlow"/>
<flow-ref name="queueFlow" doc:name="queueFlow"/>
<flow-ref name="statusCheckFlow" doc:name="statusCheckFlow"/>
<flow-ref name="resultsFlow" doc:name="resultsFlow"/>
<flow-ref name="sftpFlow" doc:name="sftpFlow"/>
</flow>
你说4个流,但列表包含5个流。如果您希望所有流按顺序执行,但集合中的每个项目并行执行,您将需要一个拆分器,后跟一个包含所有 (4/5) 流的单独 vm 流,如此处解释:https://support.mulesoft.com/s/article/Concurrently-processing-Collection-and-getting-the-results.
如果您希望循环内的流程并行执行,那么您可以选择 Scatter-Gather 组件。
重要的是要弄清楚您想要实现的两件事中的哪一件,因为解决方案会非常不同。所以基本的区别是,在 Scatter-Gather 中,单个消息被发送给多个接收者以并行处理,但在 Splitter-Aggregator 中,单个消息被拆分为多个子消息并单独处理,然后聚合。参见:http://muthurajud.blogspot.com/2016/07/eai-patterns-scattergather-versus.html