Spring集成完成异步改造前下
Spring Integration complete asynchronous transformation before the next
我有一个集成流程,它定期轮询数据库以检索任何 MachineLine
尚未处理的实体并对其进行处理。流程检索 MachineLine
对象的集合,然后我想将其拆分为单独的对象,将这些对象转换为 ReportDetails
对象并将转换后的对象保存到数据库中的另一个 table .流程定义如下:
@Bean
public IntegrationFlow processMachineLine() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManager)
.entityClass(MachineLine.class)
.jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
e -> e.poller(Pollers.fixedDelay(5000)))
.split()
.transform(MachineLine.class, this::transformMachineLineToReportDetails)
.handle(Jpa.outboundAdapter(this.entityManager)
.entityClass(ReportDetails.class),
ConsumerEndpointSpec::transactional)
.get();
}
上面的定义工作正常,但速度很慢。 transformMachineLineToReportDetails
方法向另一个服务发送 HTTP 请求,该请求需要数秒才能响应。使用当前的流定义,每个 MachineLine
对象在对它们执行相同操作之前等待前一个对象被转换和持久化。
因此,理想的解决方案是异步执行此转换和持久化。一种可能的解决方案是在 .split()
和 .transform(...)
之间插入以下行:
.channel(new ExecutorChannel(Executors.newCachedThreadPool()))
但是,这允许 JPA 入站适配器在处理和保存上次轮询的结果之前再次轮询数据库。这意味着在下一次轮询之前未转换和持久化的先前数据库轮询返回的任何 MachineLine
实体将被第二次检索并尝试第二次转换和持久化。这显然会导致不必要的资源成本,并且在尝试将具有相同 ID 的多个 ReportDetails
对象持久保存到数据库时也会产生错误。
有没有一种方法可以异步转换 MachineLine
对象,但要确保在上一次轮询的结果完成整个流程之前不会再次轮询数据库(即所有 MachineLine
对象被转换和持久化)?
我通过自定义 AbstractMessageSourceAdvice
查看它的唯一方法是针对某些 AtomicBoolean
标志(也可能是一个 bean)检查 beforeReceive()
。由于您使用 Pollers.fixedDelay(5000)
,您的轮询策略仍然是单线程的。因此,当 AbstractMessageSourceAdvice
不允许时,我们最好不要让同一个线程对 JPA 执行轮询。布尔标志在开始时应为 true
,您将其更改为提到的 split()
之前的 false
。您可以使用 publishSubscribeChannel()
作为两个订阅者来做到这一点。或者甚至在 AbstractMessageSourceAdvice
实现中这样做 - 在 beforeReceive()
实现中有点 compareAndSet(true, false)
。
然后你在使用 ExecutorChannel
提到的转换后分裂并坚持。
在您的流程结束时,您需要放置一个 publishSubscribeChannel()
和两个订阅者 - 1. handle(Jpa.outboundAdapter(this.entityManager)
; 2. aggregate()
等待所有拆分完成。在那之后 aggregate()
你放置一个简单的 handle(m -> pollingFlagBean().set(true))
。
仅此而已:只有当所有项目都已处理并聚合回组时,您的新轮询才会发生。只有在那之后,你才允许你使用那个 AtomicBoolean
.
再次进行轮询。
您也可以考虑将此标志逻辑与 SimpleActiveIdleMessageSourceAdvice
结合使用,以更改主动和被动模式之间的轮询周期,以避免在等待聚合时出现大空闲。
任何其他异步解决方案仍然不适用于您,因为切换到其他线程将立即释放轮询进程以使其再次旋转。
我有一个集成流程,它定期轮询数据库以检索任何 MachineLine
尚未处理的实体并对其进行处理。流程检索 MachineLine
对象的集合,然后我想将其拆分为单独的对象,将这些对象转换为 ReportDetails
对象并将转换后的对象保存到数据库中的另一个 table .流程定义如下:
@Bean
public IntegrationFlow processMachineLine() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManager)
.entityClass(MachineLine.class)
.jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
e -> e.poller(Pollers.fixedDelay(5000)))
.split()
.transform(MachineLine.class, this::transformMachineLineToReportDetails)
.handle(Jpa.outboundAdapter(this.entityManager)
.entityClass(ReportDetails.class),
ConsumerEndpointSpec::transactional)
.get();
}
上面的定义工作正常,但速度很慢。 transformMachineLineToReportDetails
方法向另一个服务发送 HTTP 请求,该请求需要数秒才能响应。使用当前的流定义,每个 MachineLine
对象在对它们执行相同操作之前等待前一个对象被转换和持久化。
因此,理想的解决方案是异步执行此转换和持久化。一种可能的解决方案是在 .split()
和 .transform(...)
之间插入以下行:
.channel(new ExecutorChannel(Executors.newCachedThreadPool()))
但是,这允许 JPA 入站适配器在处理和保存上次轮询的结果之前再次轮询数据库。这意味着在下一次轮询之前未转换和持久化的先前数据库轮询返回的任何 MachineLine
实体将被第二次检索并尝试第二次转换和持久化。这显然会导致不必要的资源成本,并且在尝试将具有相同 ID 的多个 ReportDetails
对象持久保存到数据库时也会产生错误。
有没有一种方法可以异步转换 MachineLine
对象,但要确保在上一次轮询的结果完成整个流程之前不会再次轮询数据库(即所有 MachineLine
对象被转换和持久化)?
我通过自定义 AbstractMessageSourceAdvice
查看它的唯一方法是针对某些 AtomicBoolean
标志(也可能是一个 bean)检查 beforeReceive()
。由于您使用 Pollers.fixedDelay(5000)
,您的轮询策略仍然是单线程的。因此,当 AbstractMessageSourceAdvice
不允许时,我们最好不要让同一个线程对 JPA 执行轮询。布尔标志在开始时应为 true
,您将其更改为提到的 split()
之前的 false
。您可以使用 publishSubscribeChannel()
作为两个订阅者来做到这一点。或者甚至在 AbstractMessageSourceAdvice
实现中这样做 - 在 beforeReceive()
实现中有点 compareAndSet(true, false)
。
然后你在使用 ExecutorChannel
提到的转换后分裂并坚持。
在您的流程结束时,您需要放置一个 publishSubscribeChannel()
和两个订阅者 - 1. handle(Jpa.outboundAdapter(this.entityManager)
; 2. aggregate()
等待所有拆分完成。在那之后 aggregate()
你放置一个简单的 handle(m -> pollingFlagBean().set(true))
。
仅此而已:只有当所有项目都已处理并聚合回组时,您的新轮询才会发生。只有在那之后,你才允许你使用那个 AtomicBoolean
.
您也可以考虑将此标志逻辑与 SimpleActiveIdleMessageSourceAdvice
结合使用,以更改主动和被动模式之间的轮询周期,以避免在等待聚合时出现大空闲。
任何其他异步解决方案仍然不适用于您,因为切换到其他线程将立即释放轮询进程以使其再次旋转。