Spring 具有循环和多个 HTTP 和异步 SQS 的集成块流
Spring Integration block flow having Loop and multiple HTTP and async SQS
我有一个流程
1. Starts with a config map -> MainGateway.start(configMap) -> void
2. Splits map into multiple messages per entry
3. For every config entry do the following using an orchestrator java class:
BEGIN LOOP (offset and limit)
Data d = HTTPGateway.getData();
PublishGateway.sendMessage(d); -> Send to 2 SQS queues
END LOOP
要求
我必须通过 cron 安排这个流程。一种选择是提供将启动流程的 HTTP 端点。但是第二个 HTTP 请求应该 wait/timeout/error 直到第一个请求完成。
问题
我一直在寻找实现流线程阻塞的障碍,直到它完成并且只有一个线程 http 处理器,所以一次只处理 1 个请求,我可以知道流何时完成。 (所有配置条目对象的循环结束,所有发送到 SQS 的消息都被确认)。我怎样才能做到这一点?我有一个循环,正在使用带有执行器的发布-订阅通道进行并行配置和并行 SQS 调度。
为了清楚起见,我已经删减了下面的 XML config
。
<!-- Bring in list of Configs to process -->
<int:gateway service-interface="Gateway"
default-request-channel="configListChannel" />
<int:chain input-channel="configListChannel" output-channel="configChannel">
<!-- Split the list to one instance of config per message -->
<int:splitter/>
<int:filter expression="payload.enablePolling" />
</int:chain>
<!-- Manually orchestrate a loop to query a system as per config and publish messages to SQS -->
<bean class="Orchestrator" id="orchestrator" />
<int:service-activator ref="orchestrator" method="getData" input-channel="configChannel" />
<!-- The flow from this point onwards is triggered inside a loop controlled by the Orchestrator
The following Gateway calls are inside Orchestrators loop -->
<!-- Create a Http request from the Orchestrator using a Gateway -->
<int:gateway service-interface="HttpGateway">
<int:method name="getData"
request-channel="requestChannel"
payload-expression="#args[0]">
</int:method>
</int:gateway>
<!-- Transform request object to json and invoke Http endpoint -->
<int:chain input-channel="requestChannel" id="httpRequestChain">
<int:object-to-json-transformer />
<int-http:outbound-gateway url-expression="headers['config'].url"
http-method="POST"
expected-response-type="java.lang.String"
/>
</int:chain>
<!-- Publish Messages to Outbound Gateway -->
<task:executor id="executor" pool-size="5" />
<int:publish-subscribe-channel id="publishChannel" task-executor="executor" />
<int:gateway service-interface="PublishGateway" >
<int:method name="publishToOutbound" payload-expression="#args[0]" request-channel="publishChannel" />
</int:gateway>
<!-- Route to System A SQS with transformations (omitted here)-->
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-a-queue" success-channel="successChannel" failure-channel="errorChannel"/>
<!-- Route to System B SQS with transformations (omitted here)-->
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-b-queue" success-channel="successChannel" failure-channel="errorChannel"/>
<int:logging-channel-adapter logger-name="sqsCallbackLogger" log-full-message="true" channel="successChannel" />
与此同时,我正在尝试将 spring-integration-samples
中的 A B C 障碍示例应用到我的用例中。
正如您在评论中指出的,aggregator
方法可以用于您的解决方案。
这样您就可以聚合那些并行 SQS 请求的结果,并等待原始请求者的聚合回复。这样,即使您的流程内部仍然并发,它也会真正被阻止。您调用一个网关,它的回复将来自聚合器。
我有一个流程
1. Starts with a config map -> MainGateway.start(configMap) -> void
2. Splits map into multiple messages per entry
3. For every config entry do the following using an orchestrator java class:
BEGIN LOOP (offset and limit)
Data d = HTTPGateway.getData();
PublishGateway.sendMessage(d); -> Send to 2 SQS queues
END LOOP
要求 我必须通过 cron 安排这个流程。一种选择是提供将启动流程的 HTTP 端点。但是第二个 HTTP 请求应该 wait/timeout/error 直到第一个请求完成。
问题 我一直在寻找实现流线程阻塞的障碍,直到它完成并且只有一个线程 http 处理器,所以一次只处理 1 个请求,我可以知道流何时完成。 (所有配置条目对象的循环结束,所有发送到 SQS 的消息都被确认)。我怎样才能做到这一点?我有一个循环,正在使用带有执行器的发布-订阅通道进行并行配置和并行 SQS 调度。
为了清楚起见,我已经删减了下面的 XML config
。
<!-- Bring in list of Configs to process -->
<int:gateway service-interface="Gateway"
default-request-channel="configListChannel" />
<int:chain input-channel="configListChannel" output-channel="configChannel">
<!-- Split the list to one instance of config per message -->
<int:splitter/>
<int:filter expression="payload.enablePolling" />
</int:chain>
<!-- Manually orchestrate a loop to query a system as per config and publish messages to SQS -->
<bean class="Orchestrator" id="orchestrator" />
<int:service-activator ref="orchestrator" method="getData" input-channel="configChannel" />
<!-- The flow from this point onwards is triggered inside a loop controlled by the Orchestrator
The following Gateway calls are inside Orchestrators loop -->
<!-- Create a Http request from the Orchestrator using a Gateway -->
<int:gateway service-interface="HttpGateway">
<int:method name="getData"
request-channel="requestChannel"
payload-expression="#args[0]">
</int:method>
</int:gateway>
<!-- Transform request object to json and invoke Http endpoint -->
<int:chain input-channel="requestChannel" id="httpRequestChain">
<int:object-to-json-transformer />
<int-http:outbound-gateway url-expression="headers['config'].url"
http-method="POST"
expected-response-type="java.lang.String"
/>
</int:chain>
<!-- Publish Messages to Outbound Gateway -->
<task:executor id="executor" pool-size="5" />
<int:publish-subscribe-channel id="publishChannel" task-executor="executor" />
<int:gateway service-interface="PublishGateway" >
<int:method name="publishToOutbound" payload-expression="#args[0]" request-channel="publishChannel" />
</int:gateway>
<!-- Route to System A SQS with transformations (omitted here)-->
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-a-queue" success-channel="successChannel" failure-channel="errorChannel"/>
<!-- Route to System B SQS with transformations (omitted here)-->
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" channel="publishChannel" queue="system-b-queue" success-channel="successChannel" failure-channel="errorChannel"/>
<int:logging-channel-adapter logger-name="sqsCallbackLogger" log-full-message="true" channel="successChannel" />
与此同时,我正在尝试将 spring-integration-samples
中的 A B C 障碍示例应用到我的用例中。
正如您在评论中指出的,aggregator
方法可以用于您的解决方案。
这样您就可以聚合那些并行 SQS 请求的结果,并等待原始请求者的聚合回复。这样,即使您的流程内部仍然并发,它也会真正被阻止。您调用一个网关,它的回复将来自聚合器。