动态创建有序的 IntegrationFlows 链

Dynamically create a chain of ordered IntegrationFlows

我正在创建一系列使用 Spring 集成的流程,这些流程明确地使用 Java DSL。这些进程中的每一个都做不同的事情,但它们有一些相同的处理逻辑

示例:

get
process
deduplicate
emit

我想实质上创建一个 post 处理集成流程链,可以 enabled/disabled 通过 configuration/profiles。

示例:

get
preprocess flow 1 (if enabled)
...
preprocess flow n (if enabled)
process
postprocess flow 1 (if enabled)
...
postprocess flow n (if enabled)
emit

我很确定这在 SI 中还不存在,但我想我会问。我唯一能想到的就是创建一个动态创建直接消息通道的 bean,并且在配置过程中,我可以给每个集成流使用来获取它们的 "from" 和 "channel" 留言频道.

示例:

@Configuration
public class BaseIntegrationConfiguration {
    @Bean
    public MessageChannel preProcessMessageChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public MessageChannel processMessageChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public MessageChannel postProcessMessageChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public MessageChannel emitMessageChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow getDataFlow(MessageChannel preProcessMessageChannel) {
        return IntegrationFlows
                .from(/* some inbound channel adapter */)
                // do other flow stuff
                .channel(preProcessMessageChannel)
                .get();
    }

    @Bean
    public IntegrationFlowChainMessageChannelGenerator preProcessFlowGenerator(
            MessageChannel preProcessMessageChannel, 
            MessageChannel processMessageChannel) {
        IntegrationFlowChainMessageChannelGenerator generator = new IntegrationFlowChainMessageChannelGenerator ();
        generator.startWith(preProcessMessageChannel);
        generator.endWith(processMessageChannel);

        return generator;
    }

    @Bean
    public IntegrationFlow processFlow(
            MessageChannel processMessageChannel, 
            MessageChannel postProcessMessageChannel) {
        return IntegrationFlows
                .from(processMessageChannel)
                // do other flow stuff
                .channel(postProcessMessageChannel)
                .get();
    }

    @Bean
    public IntegrationFlowChainMessageChannelGenerator postProcessFlowGenerator(
            MessageChannel postProcessMessageChannel, 
            MessageChannel emitMessageChannel) {
        IntegrationFlowChainMessageChannelGenerator generator = new IntegrationFlowChainMessageChannelGenerator ();
        generator.startWith(postProcessMessageChannel);
        generator.endWith(emitMessageChannel);

        return generator;
    }
}

@Configuration
@Order(1)
@Profile("PreProcessFlowOne")
public class PreProcessOneIntegrationConfiguration {
    @Bean
    public IntegrationFlow preProcessFlowOne(IntegrationFlowChainMessageChannelGenerator preProcessFlowGenerator) {
        return IntegrationFlows
                .from(preProcessFlowGenerator.getSourceChannel())
                // flow specific behavior here
                .channel(preProcessFlowGenerator.getDestinationChannel())
                .get();
    }
}

@Configuration
@Order(2)
@Profile("PreProcessFlowTwo")
public class PreProcessTwoIntegrationConfiguration {
    @Bean
    public IntegrationFlow preProcessFlowTwo(IntegrationFlowChainMessageChannelGenerator preProcessFlowGenerator) {
        return IntegrationFlows
                .from(preProcessFlowGenerator.getSourceChannel())
                // flow specific behavior here
                .channel(preProcessFlowGenerator.getDestinationChannel())
                .get();
    }
}

@Configuration
@Order(1)
@Profile("PostProcessFlowOne")
public class PostProcessOneIntegrationConfiguration {
    @Bean
    public IntegrationFlow postProcessFlowOne(IntegrationFlowChainMessageChannelGenerator postProcessFlowGenerator) {
        return IntegrationFlows
                .from(postProcessFlowGenerator.getSourceChannel())
                // flow specific behavior here
                .channel(postProcessFlowGenerator.getDestinationChannel())
                .get();
    }
}

@Configuration
@Order(2)
@Profile("PostProcessFlowTwo")
public class PostProcessTwoIntegrationConfiguration {
    @Bean
    public IntegrationFlow postProcessFlowTwo(IntegrationFlowChainMessageChannelGenerator postProcessFlowGenerator) {
        return IntegrationFlows
                .from(postProcessFlowGenerator.getSourceChannel())
                // flow specific behavior here
                .channel(postProcessFlowGenerator.getDestinationChannel())
                .get();
    }
}

这里的想法是 "getDestinationChannel" 的调用每次都会创建一个新通道,并将最后生成的通道的输出连接到配置的 "endWith",每次调用都会连接到 "getSourceChannel" returns 最后创建的目标频道,或者,如果有 none,"startWith" 个频道。

在我写作和思考这个问题时,我开始认为可能有更好的方法,但我认为我会把它放在那里以征求一些意见。

谢谢。

目前在 DSL 中不直接支持它,但 routing slip 可能会满足您的需求。

如果您的 getdedup 等是单独的流程,您可以在初始流程开始时初始化路由清单,以包含或不包含预处理步骤的输入通道( s) 在主流量通道之间的列表中。

虽然 DSL 中还没有第一个 class 支持,但您可以使用 header enricher 来设置路由列表。 header 名字是 IntegrationMessageHeaderAccessor.ROUTING_SLIP.

编辑

其实不用自己维护header;向下滚动参考手册中有关路由单的章节,了解如何使用 Java.

配置 HeaderEnricher