Apache Camel:通往同一路线的多条路线
Apache Camel: Several Routes to the Same Route
我想将消息从更多路由路由到同一路由,但它并没有像我假设的那样工作。我设置如下(我只是放下精华):
from("direct:a") [...]
.to("direct:c");
from("direct:b") [...]
.to("direct:c");
from(direct:c) <my aggregator functionality comes here>
.to("direct:someOtherRoute");
但是,这仅在 "a" 或 "b" 恰好有一条路线到达 "c" 而不是两者都到达时有效。 我应该如何将 "a" 和 "b" 路由到 "c"? 谢谢。
编辑 1:
我尝试了 Alexey 的解决方案,但使用 "seda" 或 "vm" 没有解决问题。实际上,无论使用 seda 还是 vm 调用路由 "c",聚合器只会从路由 "a" 或路由 "b".
调用一次
但是,如果我创建另一条具有相同内容和路线的路线 "c2",例如"b" 到 "c2",然后就可以了。尽管如此,这并不是解决它的好方法。
你有任何进一步的想法吗?我在同一个 CamelContext 中使用路由,所以在同一个 JVM 中。
我还在 link http://camel.apache.org/seda.html 上发现了一个有趣的评论
它指出 Alexey 和 Sunar 还告诉 seda 和 vm 是异步的和直接同步的,但您也可以直接实现异步功能,如下所示:
from("direct:stageName").thread(5).process(...)
"[...] Instead, you might wish to configure a Direct endpoint with a thread pool, which can process messages both synchronously and asynchronously. [...]
我也测试了它,但在我的情况下它没有产生任何结果。
EDIT2:
我在这里添加我如何使用聚合器,即本例中的路由 "c":
from("vm:AGGREGATOR").routeId("AGGREGATOR")
.aggregate( constant("AGG"), new RecordAggregator())
.completionTimeout(AGGREGATOR_TIMEOUT)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
LOGGER.info("### Process AGGREGATOR");
[...]
}
})
.marshal().csv()//.tracing()
.to("file:extract?fileName=${in.header.AGG}.csv")
.end();
在日志中,字符串“### Process Aggregator”仅出现一次。我只是想知道它是否不能依赖于我正在使用的 .completionTimeout(AGGREGATOR_TIMEOUT) 。在我的理解中,应该在这段时间内header中为每个不同的AGG值创建一个文件。这个理解对吗?
我觉得使用异步组件,比如seda、vm、activemq可能会解决您的问题。
这样的行为direct组件因为direct是同步组件,这也可能与在第三条路由中使用聚合器有关。
示例:
from("direct:a") [...]
.to("seda:c");
from("direct:b") [...]
.to("seda:c");
from(seda:c) <your aggregator functionality comes here>
.to("direct:someOtherRoute");
编辑 1:
现在,当我看到聚合器时,我认为这就是完成标准中的问题。
在你的情况下,你必须使用表达式 correlationExpression
:
from("vm:AGGREGATOR").routeId("AGGREGATOR")
.aggregate().simple("${header.AGG}",String.class) // ${property.AGG}
.aggregationStrategy(new RecordAggregator())
.completionInterval(AGGREGATOR_TIMEOUT) //.completionTimeout(AGGREGATOR_TIMEOUT)
.forceCompletionOnStop()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
LOGGER.info("### Process AGGREGATOR");
[...]
}
})
.marshal().csv()//.tracing()
.to("file:extract?fileName=${in.header.AGG}.csv&fileExist=Override")
.end();
而且,也许 completionTimeout
太低了...
试试下面的方法,这只是一个示例
来自("timer:foo?repeatCount=1&delay=1000").routeId("firstroute")
.setBody(简单("sundar")).to("direct:a");
from("timer:foo1?repeatCount=1&delay=1000").routeId("secondRoute")
.setBody(simple("sundar1")).to("direct:a");
from("direct:a")
.aggregate(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange arg0, Exchange arg1) {
Exchange argReturn = null;
if (arg0 == null) {
argReturn= arg1;
}
if (arg1 == null) {
argReturn= arg0;
}
if (arg1 != null && arg0 != null) {
try {
String arg1Str = arg1.getIn()
.getMandatoryBody().toString();
String arg2Str = arg0.getIn()
.getMandatoryBody().toString();
arg1.getIn().setBody(arg1Str + arg2Str);
} catch (Exception e) {
e.printStackTrace();
}
argReturn= arg1;
}
return argReturn;
}
}).constant(true).completionSize(2)
.to("direct:b").end();
from("direct:b").to("log:sundarLog?showAll=true&multiline=true");
您可以使用 seda 或其他异步路由,正如 Yakunin.Using 聚合器所指出的,这里的主要争论点是 completionSize ,我在这里使用 2 ,因为有两条路由正在发送消息。
我想将消息从更多路由路由到同一路由,但它并没有像我假设的那样工作。我设置如下(我只是放下精华):
from("direct:a") [...]
.to("direct:c");
from("direct:b") [...]
.to("direct:c");
from(direct:c) <my aggregator functionality comes here>
.to("direct:someOtherRoute");
但是,这仅在 "a" 或 "b" 恰好有一条路线到达 "c" 而不是两者都到达时有效。 我应该如何将 "a" 和 "b" 路由到 "c"? 谢谢。
编辑 1:
我尝试了 Alexey 的解决方案,但使用 "seda" 或 "vm" 没有解决问题。实际上,无论使用 seda 还是 vm 调用路由 "c",聚合器只会从路由 "a" 或路由 "b".
调用一次但是,如果我创建另一条具有相同内容和路线的路线 "c2",例如"b" 到 "c2",然后就可以了。尽管如此,这并不是解决它的好方法。
你有任何进一步的想法吗?我在同一个 CamelContext 中使用路由,所以在同一个 JVM 中。
我还在 link http://camel.apache.org/seda.html 上发现了一个有趣的评论 它指出 Alexey 和 Sunar 还告诉 seda 和 vm 是异步的和直接同步的,但您也可以直接实现异步功能,如下所示:
from("direct:stageName").thread(5).process(...)
"[...] Instead, you might wish to configure a Direct endpoint with a thread pool, which can process messages both synchronously and asynchronously. [...]
我也测试了它,但在我的情况下它没有产生任何结果。
EDIT2:
我在这里添加我如何使用聚合器,即本例中的路由 "c":
from("vm:AGGREGATOR").routeId("AGGREGATOR")
.aggregate( constant("AGG"), new RecordAggregator())
.completionTimeout(AGGREGATOR_TIMEOUT)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
LOGGER.info("### Process AGGREGATOR");
[...]
}
})
.marshal().csv()//.tracing()
.to("file:extract?fileName=${in.header.AGG}.csv")
.end();
在日志中,字符串“### Process Aggregator”仅出现一次。我只是想知道它是否不能依赖于我正在使用的 .completionTimeout(AGGREGATOR_TIMEOUT) 。在我的理解中,应该在这段时间内header中为每个不同的AGG值创建一个文件。这个理解对吗?
我觉得使用异步组件,比如seda、vm、activemq可能会解决您的问题。
这样的行为direct组件因为direct是同步组件,这也可能与在第三条路由中使用聚合器有关。
示例:
from("direct:a") [...]
.to("seda:c");
from("direct:b") [...]
.to("seda:c");
from(seda:c) <your aggregator functionality comes here>
.to("direct:someOtherRoute");
编辑 1:
现在,当我看到聚合器时,我认为这就是完成标准中的问题。
在你的情况下,你必须使用表达式 correlationExpression
:
from("vm:AGGREGATOR").routeId("AGGREGATOR")
.aggregate().simple("${header.AGG}",String.class) // ${property.AGG}
.aggregationStrategy(new RecordAggregator())
.completionInterval(AGGREGATOR_TIMEOUT) //.completionTimeout(AGGREGATOR_TIMEOUT)
.forceCompletionOnStop()
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
LOGGER.info("### Process AGGREGATOR");
[...]
}
})
.marshal().csv()//.tracing()
.to("file:extract?fileName=${in.header.AGG}.csv&fileExist=Override")
.end();
而且,也许 completionTimeout
太低了...
试试下面的方法,这只是一个示例 来自("timer:foo?repeatCount=1&delay=1000").routeId("firstroute") .setBody(简单("sundar")).to("direct:a");
from("timer:foo1?repeatCount=1&delay=1000").routeId("secondRoute")
.setBody(simple("sundar1")).to("direct:a");
from("direct:a")
.aggregate(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange arg0, Exchange arg1) {
Exchange argReturn = null;
if (arg0 == null) {
argReturn= arg1;
}
if (arg1 == null) {
argReturn= arg0;
}
if (arg1 != null && arg0 != null) {
try {
String arg1Str = arg1.getIn()
.getMandatoryBody().toString();
String arg2Str = arg0.getIn()
.getMandatoryBody().toString();
arg1.getIn().setBody(arg1Str + arg2Str);
} catch (Exception e) {
e.printStackTrace();
}
argReturn= arg1;
}
return argReturn;
}
}).constant(true).completionSize(2)
.to("direct:b").end();
from("direct:b").to("log:sundarLog?showAll=true&multiline=true");
您可以使用 seda 或其他异步路由,正如 Yakunin.Using 聚合器所指出的,这里的主要争论点是 completionSize ,我在这里使用 2 ,因为有两条路由正在发送消息。