spring 整合:多步多渠道订阅者

spring integration: multiple step multiple channel subscribers

我需要实现一个由多个步骤组成的集成流程,其中每个步骤都可以由可变数量的处理器(插件)执行。

我目前拥有的:

<!-- gateway -->
<int:gateway default-request-channel="step/1" service-interface="ServiceGateway">
    <int:method name="send" />
</int:gateway>


<!-- plugin 1 -->
<int:publish-subscribe-channel id="step/1" apply-sequence="true" />

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer" />
</int:service-activator>

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer2" />
</int:service-activator>


<!-- plugin 2 -->
<int:publish-subscribe-channel id="step/2" apply-sequence="true" />

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="Transformer3" />
</int:service-activator>

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="HttpTransformer4" />
</int:service-activator>

<!-- aggregation -->
<int:channel id="end" />
<int:aggregator input-channel="end" />

预期行为如下:

  1. 通过网关发送第一个请求
  2. 输入由 2 个 "step/1" 插件处理
  3. "step/1" 插件的每个输出都由 "step/2" 插件处理
  4. 聚合器应聚合 4 个项目 (1 -> 2 -> 4)

一切正常,但结果不是预期的,我只收到 2 个(随机)项目而不是 4 个。

我想问题是聚合器仅在两个项目后触发发布,因为 "step/2" 频道中的 "apply-sequence" 覆盖了 "step/1" 中的 "apply-sequence"。所以问题是:如何让聚合器等待所有消息?

提前致谢。

自定义发布策略:

@SuppressWarnings("unchecked")
@Override
public boolean canRelease ( MessageGroup group ) {

    MessageHeaders headers = group.getOne ().getHeaders ();
    List<List<Object>> sequenceDetails = (List<List<Object>>) headers.get ( "sequenceDetails" );
    System.out.println ( sequenceDetails );

    int expectedSize = 1;
    //map message id, max group size reached (i.e. sequenceNumber==sequenceSize)
    for ( List<Object> sequenceItem : sequenceDetails ) {
        if ( sequenceItem.get ( 1 ) != sequenceItem.get ( 2 ) ) {
            System.err.println ( "--> AGG: no release check, group max not reached" );
            return false;
        }
        expectedSize *= (int) sequenceItem.get ( 2 );//multiplies the group sizes
    }

    int expectedSize2 = expectedSize * (int) headers.get ( "sequenceSize" );

    int currentSize = group.getMessages ().size () * expectedSize;
    System.err.println ( "--> AGG: " + expectedSize2 + " : " + currentSize );
    boolean canRelease = expectedSize2 == currentSize;
    if ( canRelease ) {
        System.out.println ( "ok" );
    }
    return canRelease;
}

打印出来:

[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]

--> AGG:无发布检查,未达到组最大值

[[7099b583-55d4-87d3-4502-993f05bfb388, 1, 2]]

--> AGG:无发布检查,未达到组最大值

[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]

--> 合计:4 : 2

[[7099b583-55d4-87d3-4502-993f05bfb388, 2, 2]]

--> 合计:4 : 4

聚合码:

@Aggregator
public Object aggregate ( List<Message<?>> objects ) {

    List<Object> res = new ArrayList<> ();
    for ( Message<?> m : objects ) {
        res.add ( m.getPayload () );
        MessageHeaders headers2 = m.getHeaders ();
        System.out.println ( headers2.get ( "history" ) );
    }

    return res;
}

打印出来:

gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)# 57018165,end2

gateway2,core-channel,(inner bean)#57018165,async/step/1,core-channel,(inner bean)#57018165,async/step/2,core-channel,(inner bean)# 57018165,end2

[102, 202] --> 最终结果列表,预计由4项组成

使用自定义发布策略。来自第一个 pubsub 的相关数据被第二个 pubsub 推入 sequenceDetails headers 中的堆栈。

编辑

问题是有两组;您需要关联初始 correlationId。这是一个纯 SpEL 解决方案;使用自定义 correlation/release 策略来确保数据符合预期可能更安全(并使用 getOne() 而不是迭代器)...

<int:aggregator input-channel="end2"
        correlation-strategy-expression=
           "headers['sequenceDetails'][0][0]"
        release-strategy-expression=
           "size() == iterator().next().headers['sequenceSize'] * iterator().next().headers['sequenceDetails'][0][2]" />