Apache Camel:聚合 pollEnrich 结果而不是来自以及如何保存 headers

Apache Camel: aggregate on pollEnrich results rather than from and how to preserve headers

在我的骆驼路线中,我使用来自 queue 的消息;每条消息包含 headers "pad"(路径)和文件前缀。例如:

message1: pad="/some/dir", file="AAA" message2: pad="/another/dir", 文件="BRD"

每条消息我想创建一个文件: 消息 1:/some/dir/AAA.tar(包含所有文件 /some/dir/AAA*) message2:/another/dir/BRD.tar(包含/another/dir/BRD.tar中的所有文件)

目录和文件名在另一个路径中收集。

到目前为止我有这条骆驼路线:

from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
    .simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregate(new TarAggregationStrategy(false,true))
     .constant(true)
     .completionFromBatchConsumer()
     .eagerCheckCompletion()
     .parallelProcessing(false)
     .setHeader("file", simple("${header.file}"))
     .setHeader("pad", simple("${header.pad}"))
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");

我这里有几个问题: - 当 运行 这条路线时,我在看到日志行 "tarring to" 之前看到多个 "starting with message" 行 - 日志行 "tarring to" 实际上是“.tar”,header 是空的... - 创建的“.tar”文件存储在“./ignored”中,并包含来自每个 jms 消息文件 header 的一个文件。

这让我相信聚合发生在我意想不到的水平上;我想汇总 pollEnrich 的结果,而不是 queue 上其他消息的结果。为什么,以及如何让它按我想要的方式运行?

另一个是丢失的headers;这可能是由于对错误项目的聚合...无论如何,我认为聚合中的 setHeader()s 应该设置它们,但无论如何它们都丢失了;我怎样才能保存它们?

我对骆驼编程比较陌生;所以请原谅我的缺点;代码中的缩进是我 认为 范围应该的样子;这可能是完全关闭。我正在使用 camel-2.20.1,但可以切换到任何其他版本。

编辑 阅读使我稍微改变了路线;如评论中所写;现在看起来像这样:(TarAggregationStrategy() 在我的 CamelContext 中创建并添加到注册表中)

from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
    .simple("file:${header.pad}?antInclude=${header.file}.*")
    .aggregationStrategyRef("tarAggregationStrategy")
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");

现在好像好多了;由于无法根据堆栈跟踪创建临时文件,因此实际 tar 不会发生:

org.apache.camel.component.file.GenericFileOperationFailedException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
        at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:174)
        at org.apache.camel.processor.PollEnricher.process(PollEnricher.java:280)
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
        at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.addFileToTar(TarAggregationStrategy.java:199)
        at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:167)
        ... 19 more

我注意到的是 Could not create temp file 之后 ( 和 ) 之间的部分实际上是 body 的内容(我本可以将其留空,但没有明显的原因我填充文件 ID)

如果您想保留消息中的 header,以便它们在聚合后仍然存在,您的聚合策略 必须这样做。我认为 TarAggregationStrategy 不会这样做。

将聚合器视为边界。它收集 Camel Exchanges(Camel-wrapped 消息)并根据 AggregationStrategy 创建一个 new Exchange。我想大多数 out-of-the-box 聚合器都专注于合并或附加消息正文,而不是 header。

因此,如果您希望 header 的 header.fileheader.pad 在聚合中存活下来,则必须在您自己的聚合策略中实现它。

由于您使用 TarAggregationStrategy,您可能可以扩展或装饰这个,只需实现 header 内容并将 body 内容委托给 TarAggregationStrategy