Spring 集成渠道链接怪异
Spring Integration Channel Chaining Weirdness
我可能只是遗漏了一些非常简单的东西(或滥用了一些东西),但我试图建立两个直接通道,这样一个流将一些数据顺序地传递给每个通道。所以使用 Spring Integration JAVA DSL 我有这样的东西(这个例子大大简化):
public static final String TEST_CHANNEL = "testGateway";
public static final String TEST_UPPER_CHANNEL = "testChannelUpper";
public static final String TEST_LOWER_CHANNEL = "testChannelLower";
@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
return MessageChannels.direct(TEST_CHANNEL).get();
}
@Bean(name = TEST_UPPER_CHANNEL)
public MessageChannel testChannelUpperChannel() {
return MessageChannels.direct(TEST_UPPER_CHANNEL).get();
}
@Bean(name = TEST_LOWER_CHANNEL)
public MessageChannel testChannelLowerChannel() {
return MessageChannels.direct(TEST_LOWER_CHANNEL).get();
}
@Bean
public IntegrationFlow testFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.channel(TEST_UPPER_CHANNEL)
.channel(TEST_LOWER_CHANNEL)
.get();
}
@Bean
public IntegrationFlow testUpperFlow() {
return IntegrationFlows
.from(TEST_UPPER_CHANNEL)
.<String, String>transform(String::toUpperCase)
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow testLowerFlow() {
return IntegrationFlows
.from(TEST_LOWER_CHANNEL)
.<String, String>transform(String::toLowerCase)
.handle(System.out::println)
.get();
}
我正在使用 REST 端点通过网关调用流,但是当我这样做时,似乎只调用了一个通道。该通道似乎在调用之间也是随机的(有时转到 testChannelUpper,有时转到 testChannelLower)。
我在整个执行过程中基本上都是这样结束的:
(每次我只是达到这个终点http://localhost:9090/test?test=HellOoi)
执行 1:
GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=4aa7b075-23cc-6ab3-10a1-c7cb73bae49b , 时间戳=1447686848477}]
执行 2:
GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=a18dcd01-da18-b00d-30c0-e1a03ce19104 , 时间戳=1447686853549}]
执行 3:
GenericMessage [payload=helloi, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testLowerFlow.channel#0, id=5f0abcb9-378e-7a3c-9c93-a04ff6352927 , 时间戳=1447686857545}]
我相信我在这里所做的尝试也显示在 DSL wiki 的 channelFlow 示例中:
https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference
所以我使用的规格是:
Spring 启动 v1.2.2.RELEASE
Spring v4.1.5.RELEASE
spring-集成-java-dsl 1.0.2.RELEASE
JDK1.8.0_40-b25
所以...还有其他人见过这种行为吗?我只是在滥用渠道实施吗?还有其他想法吗?提前致谢!
正如 Gary 指出的那样,最好的方法是建立一个 pub-sub 并在上面订购消费者:
@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
return MessageChannels.publishSubscribe(TEST_CHANNEL).get();
}
@Bean
public IntegrationFlow testUpperFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.<String, String>transform(String::toUpperCase, e -> e.order(1))
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow testLowerFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.<String, String>transform(String::toLowerCase, e -> e.order(2))
.handle(System.out::println)
.get();
}
这样做的目的是什么...
@Bean
public IntegrationFlow testFlow() {
return IntegrationFlows
.from(TEST_CHANNEL).fixedSubscriberChannel()
.channel(TEST_UPPER_CHANNEL)
.channel(TEST_LOWER_CHANNEL)
.get();
}
?
所做的只是将三个通道连接在一起。
事实上,您最终在 TEST_UPPER_CHANNEL- 这个流程中的桥和另一个流程中的变压器上有 2 个消费者。
默认情况下,直接渠道中的调度使用循环分配。所以第一个消息将发送到桥接器,下一个发送到变压器等。
我可能只是遗漏了一些非常简单的东西(或滥用了一些东西),但我试图建立两个直接通道,这样一个流将一些数据顺序地传递给每个通道。所以使用 Spring Integration JAVA DSL 我有这样的东西(这个例子大大简化):
public static final String TEST_CHANNEL = "testGateway";
public static final String TEST_UPPER_CHANNEL = "testChannelUpper";
public static final String TEST_LOWER_CHANNEL = "testChannelLower";
@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
return MessageChannels.direct(TEST_CHANNEL).get();
}
@Bean(name = TEST_UPPER_CHANNEL)
public MessageChannel testChannelUpperChannel() {
return MessageChannels.direct(TEST_UPPER_CHANNEL).get();
}
@Bean(name = TEST_LOWER_CHANNEL)
public MessageChannel testChannelLowerChannel() {
return MessageChannels.direct(TEST_LOWER_CHANNEL).get();
}
@Bean
public IntegrationFlow testFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.channel(TEST_UPPER_CHANNEL)
.channel(TEST_LOWER_CHANNEL)
.get();
}
@Bean
public IntegrationFlow testUpperFlow() {
return IntegrationFlows
.from(TEST_UPPER_CHANNEL)
.<String, String>transform(String::toUpperCase)
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow testLowerFlow() {
return IntegrationFlows
.from(TEST_LOWER_CHANNEL)
.<String, String>transform(String::toLowerCase)
.handle(System.out::println)
.get();
}
我正在使用 REST 端点通过网关调用流,但是当我这样做时,似乎只调用了一个通道。该通道似乎在调用之间也是随机的(有时转到 testChannelUpper,有时转到 testChannelLower)。
我在整个执行过程中基本上都是这样结束的: (每次我只是达到这个终点http://localhost:9090/test?test=HellOoi)
执行 1: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=4aa7b075-23cc-6ab3-10a1-c7cb73bae49b , 时间戳=1447686848477}]
执行 2: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=a18dcd01-da18-b00d-30c0-e1a03ce19104 , 时间戳=1447686853549}]
执行 3: GenericMessage [payload=helloi, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testLowerFlow.channel#0, id=5f0abcb9-378e-7a3c-9c93-a04ff6352927 , 时间戳=1447686857545}]
我相信我在这里所做的尝试也显示在 DSL wiki 的 channelFlow 示例中: https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference
所以我使用的规格是:
Spring 启动 v1.2.2.RELEASE
Spring v4.1.5.RELEASE
spring-集成-java-dsl 1.0.2.RELEASE
JDK1.8.0_40-b25
所以...还有其他人见过这种行为吗?我只是在滥用渠道实施吗?还有其他想法吗?提前致谢!
正如 Gary 指出的那样,最好的方法是建立一个 pub-sub 并在上面订购消费者:
@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
return MessageChannels.publishSubscribe(TEST_CHANNEL).get();
}
@Bean
public IntegrationFlow testUpperFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.<String, String>transform(String::toUpperCase, e -> e.order(1))
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow testLowerFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.<String, String>transform(String::toLowerCase, e -> e.order(2))
.handle(System.out::println)
.get();
}
这样做的目的是什么...
@Bean
public IntegrationFlow testFlow() {
return IntegrationFlows
.from(TEST_CHANNEL).fixedSubscriberChannel()
.channel(TEST_UPPER_CHANNEL)
.channel(TEST_LOWER_CHANNEL)
.get();
}
?
所做的只是将三个通道连接在一起。
事实上,您最终在 TEST_UPPER_CHANNEL- 这个流程中的桥和另一个流程中的变压器上有 2 个消费者。
默认情况下,直接渠道中的调度使用循环分配。所以第一个消息将发送到桥接器,下一个发送到变压器等。