Spring 集成:如何在 MessageChain 中间实现 JDBCOutboundGateway?

Spring integration: How to implement a JDBCOutboundGateway in the middle of a MessageChain?

在我看来,这似乎是一个可能到处都在重复的简单问题。 MessageHandlerChain 的一个非常基本的应用程序,可能只使用开箱即用的功能。

概念上,我需要的是:

(1) Polled JDBC reader (sets parameters for integration pass)
    |
    V
(2) JDBC Reader (uses input from (1) to fetch data to feed through channel
    |
    V
(3) JDBC writer (writes data fetched by (2) to target)
    |
    V
(4) JDBC writer (writes additional data from the original parameters fetched in (1))

我觉得我需要的是

Flow:
From: JdbcPollingChannelAdapter (setup adapter)
Handler: messageHandlerChain
    Handlers (
       JdbcPollingChannelAdapter (inbound adapter)
       JdbcOutboundGateway (outbound adapter)
       JdbcOutboundGateway (cleanup gateway)
    )

JdbcPollingChannelAdapter没有实现MessageHandlerAPI,所以我不知道如何根据设置步骤读取实际数据。

由于 JdbcOutboundGateway 没有实现 MessageProducer API,对于出站适配器需要使用什么,我有点不知所措。

我应该使用 OOB 类 吗?或者我是否需要以某种方式将两个适配器包装在 BridgeHandlers 中以使其工作?

提前致谢


编辑 (2) 其他配置问题

设置适配器正在拉回带有两个时间戳列的单行。它们正在被“enrich headers”部分正确处理。

但是,当入站适配器正在执行时,框架将 java.lang.Object 作为参数传入。不是字符串,不是时间戳,而是 new Object ().

中的实际 java.lang.Object

它正在传递正确数量的对象,但内容和数据类型丢失了。我是否正确认为需要配置 ExpressionEvaluatingSqlParameterSourceFactory?

留言:

GenericMessage [payload=[{startTime=2020-11-18 18:01:34.90944, endTime=2020-11-18 18:01:34.90944}], headers={startTime=2020-11-18 18:01:34.90944, id=835edf42-6f69-226a-18f4-ade030c16618, timestamp=1605897225384}]

SQL 在 JdbcOutboundGateway:

Select t.*, w.operation as "ops" from ADDRESS t
Inner join TT_ADDRESS w 
  on (t.ADDRESSID = w.ADDRESSID)
  And (w.LASTUPDATESTAMP >= :payload.from[0].get("startTime") and w.LASTUPDATESTAMP <= :payload.from[0].get("endTime") )

编辑: 添加解决方案 java DSL 配置

private JdbcPollingChannelAdapter setupAdapter; // select only
private JdbcOutboundGateway inboundAdapter; // select only
private JdbcOutboundGateway insertUpdateAdapter; // update only
private JdbcOutboundGateway deleteAdapter; // update only
private JdbcMessageHandler cleanupAdapter; // update only

        setFlow(IntegrationFlows
            .from(setupAdapter, c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("ALC_startTime", "payload.from[0].get(\"ALC_startTime\")")
                    .headerExpression("ALC_endTime", "payload.from[0].get(\"ALC_endTime\")"))
            .handle(inboundAdapter)
            .enrichHeaders(h -> h.headerExpression("ALC_operation", "payload.from[0].get(\"ALC_operation\")"))
            .handle(insertUpdateAdapter)
            .handle(deleteAdapter)
            .handle(cleanupAdapter)
            .get());

flowContext.registration(flow).id(this.getId().toString()).register();

如果您想将原始参数传递到流中的最后一个网关,您需要将这些参数存储在 headers 中,因为在每一步之后,回复消息的有效负载都会不同并且您将不再拥有原始设置数据。这是第一个。

其次:如果你处理 IntegrationFlow 和 Java DSL,你不需要担心 messageHandlerChain 因为概念上 IntegrationFlow 本身就是一个链但更先进。

我不确定为什么您需要使用 JdbcPollingChannelAdapter 根据流程开始时来自源的传入消息按需请求数据。

您肯定仍然需要使用 JdbcOutboundGateway 来实现 SELECT 模式。 updateQuery 是可选的,因此网关将在回复消息的有效负载中为您执行 SELECT 和 return 数据。

如果您接下来的两个步骤只是“写入”并且您不关心结果,您可能只需查看一个 PublishSubscribeChannel 和两个 JdbcMessageHandler 作为它的订阅者.如果没有为 PublishSubscribeChannel 提供 Executor,它们将被执行 one-by-one。