Apache Camel:通往同一路线的多条路线

Apache Camel: Several Routes to the Same Route

我想将消息从更多路由路由到同一路由,但它并没有像我假设的那样工作。我设置如下(我只是放下精华):

from("direct:a") [...]
    .to("direct:c");

from("direct:b") [...]
    .to("direct:c");

from(direct:c) <my aggregator functionality comes here>
    .to("direct:someOtherRoute");

但是,这仅在 "a" 或 "b" 恰好有一条路线到达 "c" 而不是两者都到达时有效。 我应该如何将 "a" 和 "b" 路由到 "c"? 谢谢。

编辑 1:

我尝试了 Alexey 的解决方案,但使用 "seda" 或 "vm" 没有解决问题。实际上,无论使用 seda 还是 vm 调用路由 "c",聚合器只会从路由 "a" 或路由 "b".

调用一次

但是,如果我创建另一条具有相同内容和路线的路线 "c2",例如"b" 到 "c2",然后就可以了。尽管如此,这并不是解决它的好方法。

你有任何进一步的想法吗?我在同一个 CamelContext 中使用路由,所以在同一个 JVM 中。

我还在 link http://camel.apache.org/seda.html 上发现了一个有趣的评论 它指出 Alexey 和 Sunar 还告诉 seda 和 vm 是异步的和直接同步的,但您也可以直接实现异步功能,如下所示:

from("direct:stageName").thread(5).process(...)

"[...] Instead, you might wish to configure a Direct endpoint with a thread pool, which can process messages both synchronously and asynchronously. [...]

我也测试了它,但在我的情况下它没有产生任何结果。

EDIT2:

我在这里添加我如何使用聚合器,即本例中的路由 "c":

from("vm:AGGREGATOR").routeId("AGGREGATOR")
        .aggregate( constant("AGG"), new RecordAggregator())
        .completionTimeout(AGGREGATOR_TIMEOUT)
        .process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                LOGGER.info("### Process AGGREGATOR");
                [...]
             }
        })
        .marshal().csv()//.tracing()
        .to("file:extract?fileName=${in.header.AGG}.csv")
        .end();

在日志中,字符串“### Process Aggregator”仅出现一次。我只是想知道它是否不能依赖于我正在使用的 .completionTimeout(AGGREGATOR_TIMEOUT) 。在我的理解中,应该在这段时间内header中为每个不同的AGG值创建一个文件。这个理解对吗?

我觉得使用异步组件,比如sedavmactivemq可能会解决您的问题。

这样的行为direct组件因为direct是同步组件,这也可能与在第三条路由中使用聚合器有关。

示例:

from("direct:a") [...]
    .to("seda:c");

from("direct:b") [...]
    .to("seda:c");

from(seda:c) <your aggregator functionality comes here>
    .to("direct:someOtherRoute");

编辑 1:

现在,当我看到聚合器时,我认为这就是完成标准中的问题。

在你的情况下,你必须使用表达式 correlationExpression:

from("vm:AGGREGATOR").routeId("AGGREGATOR")
    .aggregate().simple("${header.AGG}",String.class) // ${property.AGG}
        .aggregationStrategy(new RecordAggregator())
        .completionInterval(AGGREGATOR_TIMEOUT) //.completionTimeout(AGGREGATOR_TIMEOUT)
        .forceCompletionOnStop()
        .process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                LOGGER.info("### Process AGGREGATOR");
                [...]
             }
        })
        .marshal().csv()//.tracing()
        .to("file:extract?fileName=${in.header.AGG}.csv&fileExist=Override")
    .end();

而且,也许 completionTimeout 太低了...

试试下面的方法,这只是一个示例 来自("timer:foo?repeatCount=1&delay=1000").routeId("firstroute") .setBody(简单("sundar")).to("direct:a");

    from("timer:foo1?repeatCount=1&delay=1000").routeId("secondRoute")
            .setBody(simple("sundar1")).to("direct:a");

    from("direct:a")
            .aggregate(new AggregationStrategy() {

                @Override
                public Exchange aggregate(Exchange arg0, Exchange arg1) {
                    Exchange argReturn = null;
                    if (arg0 == null) {
                        argReturn= arg1;
                    }
                    if (arg1 == null) {
                        argReturn= arg0;
                    }
                    if (arg1 != null && arg0 != null) {
                        try {
                            String arg1Str = arg1.getIn()
                                    .getMandatoryBody().toString();
                            String arg2Str = arg0.getIn()
                                    .getMandatoryBody().toString();
                            arg1.getIn().setBody(arg1Str + arg2Str);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        argReturn= arg1;
                    }
                    return argReturn;
                }
            }).constant(true).completionSize(2)
            .to("direct:b").end();

    from("direct:b").to("log:sundarLog?showAll=true&multiline=true");

您可以使用 seda 或其他异步路由,正如 Yakunin.Using 聚合器所指出的,这里的主要争论点是 completionSize ,我在这里使用 2 ,因为有两条路由正在发送消息。