需要一些关于 Spring 集成流程的指导

Need some guidance with Spring Integration Flow

我是 Spring Integration 的新手,已经在 Whosebug 上阅读了相当多的文档和其他主题。但是我仍然对如何在 Spring 启动应用程序中应用新获得的知识感到不知所措。 这是应该发生的事情:

  1. 从 Kafka 主题接收消息,例如来自“请求主题”(有效负载是自定义作业 POJO)。 InboundChannelAdapter?
  2. 做一些准备(从 git 回购中结帐)
  3. 使用批处理作业处理文件
  4. 提交并推送到 git,使用提交 ID 更新作业对象
  5. 使用更新的作业对象向 Kafka 发布消息,例如“回复主题”。 OutboundChannelAdapter?

使用 DSL 或普通 Java 配置都没有关系。尝试多种变体后,我的问题是我无法达到预期的结果。例如,处理程序会被调用得太早,或者根本不会被调用,因此不会更新步骤 5 中的回复。 此外,在任何给定时间应该只有一个流 运行,所以我想,在某个时候应该涉及一个队列,可能在步骤 1(?)。

我应该在何时何地使用 QueueChannels、DirectChannel(或任何其他?),我是否需要 GatewayHandlers,例如使用 commit-id 回复? 任何提示表示赞赏。

像这样:

@Bean
IntegrationFlow flow() {
    return IntegrationFlows.from(Kafka.inboundGateway(...))
            .handle(// prep)
            .transform(// to JobLaunchRequest)
            .handle(// JobLaunchingGateway)
            .handle(// cleanUp and return result)
            .get();
}

它一次只会处理一个请求(默认并发)。