Apache Camel:聚合 pollEnrich 结果而不是来自以及如何保存 headers
Apache Camel: aggregate on pollEnrich results rather than from and how to preserve headers
在我的骆驼路线中,我使用来自 queue 的消息;每条消息包含 headers "pad"(路径)和文件前缀。例如:
message1: pad="/some/dir", file="AAA"
message2: pad="/another/dir", 文件="BRD"
每条消息我想创建一个文件:
消息 1:/some/dir/AAA.tar(包含所有文件 /some/dir/AAA*)
message2:/another/dir/BRD.tar(包含/another/dir/BRD.tar中的所有文件)
目录和文件名在另一个路径中收集。
到目前为止我有这条骆驼路线:
from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
.simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregate(new TarAggregationStrategy(false,true))
.constant(true)
.completionFromBatchConsumer()
.eagerCheckCompletion()
.parallelProcessing(false)
.setHeader("file", simple("${header.file}"))
.setHeader("pad", simple("${header.pad}"))
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");
我这里有几个问题:
- 当 运行 这条路线时,我在看到日志行 "tarring to" 之前看到多个 "starting with message" 行
- 日志行 "tarring to" 实际上是“.tar”,header 是空的...
- 创建的“.tar”文件存储在“./ignored”中,并包含来自每个 jms 消息文件 header 的一个文件。
这让我相信聚合发生在我意想不到的水平上;我想汇总 pollEnrich 的结果,而不是 queue 上其他消息的结果。为什么,以及如何让它按我想要的方式运行?
另一个是丢失的headers;这可能是由于对错误项目的聚合...无论如何,我认为聚合中的 setHeader()s 应该设置它们,但无论如何它们都丢失了;我怎样才能保存它们?
我对骆驼编程比较陌生;所以请原谅我的缺点;代码中的缩进是我 认为 范围应该的样子;这可能是完全关闭。我正在使用 camel-2.20.1,但可以切换到任何其他版本。
编辑
阅读使我稍微改变了路线;如评论中所写;现在看起来像这样:(TarAggregationStrategy() 在我的 CamelContext 中创建并添加到注册表中)
from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
.simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregationStrategyRef("tarAggregationStrategy")
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");
现在好像好多了;由于无法根据堆栈跟踪创建临时文件,因此实际 tar 不会发生:
org.apache.camel.component.file.GenericFileOperationFailedException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:174)
at org.apache.camel.processor.PollEnricher.process(PollEnricher.java:280)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.addFileToTar(TarAggregationStrategy.java:199)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:167)
... 19 more
我注意到的是 Could not create temp file 之后 ( 和 ) 之间的部分实际上是 body 的内容(我本可以将其留空,但没有明显的原因我填充文件 ID)
如果您想保留消息中的 header,以便它们在聚合后仍然存在,您的聚合策略 必须这样做。我认为 TarAggregationStrategy
不会这样做。
将聚合器视为边界。它收集 Camel Exchanges(Camel-wrapped 消息)并根据 AggregationStrategy
创建一个 new Exchange。我想大多数 out-of-the-box 聚合器都专注于合并或附加消息正文,而不是 header。
因此,如果您希望 header 的 header.file
和 header.pad
在聚合中存活下来,则必须在您自己的聚合策略中实现它。
由于您使用 TarAggregationStrategy
,您可能可以扩展或装饰这个,只需实现 header 内容并将 body 内容委托给 TarAggregationStrategy
。
在我的骆驼路线中,我使用来自 queue 的消息;每条消息包含 headers "pad"(路径)和文件前缀。例如:
message1: pad="/some/dir", file="AAA" message2: pad="/another/dir", 文件="BRD"
每条消息我想创建一个文件: 消息 1:/some/dir/AAA.tar(包含所有文件 /some/dir/AAA*) message2:/another/dir/BRD.tar(包含/another/dir/BRD.tar中的所有文件)
目录和文件名在另一个路径中收集。
到目前为止我有这条骆驼路线:
from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
.simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregate(new TarAggregationStrategy(false,true))
.constant(true)
.completionFromBatchConsumer()
.eagerCheckCompletion()
.parallelProcessing(false)
.setHeader("file", simple("${header.file}"))
.setHeader("pad", simple("${header.pad}"))
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");
我这里有几个问题: - 当 运行 这条路线时,我在看到日志行 "tarring to" 之前看到多个 "starting with message" 行 - 日志行 "tarring to" 实际上是“.tar”,header 是空的... - 创建的“.tar”文件存储在“./ignored”中,并包含来自每个 jms 消息文件 header 的一个文件。
这让我相信聚合发生在我意想不到的水平上;我想汇总 pollEnrich 的结果,而不是 queue 上其他消息的结果。为什么,以及如何让它按我想要的方式运行?
另一个是丢失的headers;这可能是由于对错误项目的聚合...无论如何,我认为聚合中的 setHeader()s 应该设置它们,但无论如何它们都丢失了;我怎样才能保存它们?
我对骆驼编程比较陌生;所以请原谅我的缺点;代码中的缩进是我 认为 范围应该的样子;这可能是完全关闭。我正在使用 camel-2.20.1,但可以切换到任何其他版本。
编辑 阅读使我稍微改变了路线;如评论中所写;现在看起来像这样:(TarAggregationStrategy() 在我的 CamelContext 中创建并添加到注册表中)
from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
.simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregationStrategyRef("tarAggregationStrategy")
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");
现在好像好多了;由于无法根据堆栈跟踪创建临时文件,因此实际 tar 不会发生:
org.apache.camel.component.file.GenericFileOperationFailedException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:174)
at org.apache.camel.processor.PollEnricher.process(PollEnricher.java:280)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.addFileToTar(TarAggregationStrategy.java:199)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:167)
... 19 more
我注意到的是 Could not create temp file 之后 ( 和 ) 之间的部分实际上是 body 的内容(我本可以将其留空,但没有明显的原因我填充文件 ID)
如果您想保留消息中的 header,以便它们在聚合后仍然存在,您的聚合策略 必须这样做。我认为 TarAggregationStrategy
不会这样做。
将聚合器视为边界。它收集 Camel Exchanges(Camel-wrapped 消息)并根据 AggregationStrategy
创建一个 new Exchange。我想大多数 out-of-the-box 聚合器都专注于合并或附加消息正文,而不是 header。
因此,如果您希望 header 的 header.file
和 header.pad
在聚合中存活下来,则必须在您自己的聚合策略中实现它。
由于您使用 TarAggregationStrategy
,您可能可以扩展或装饰这个,只需实现 header 内容并将 body 内容委托给 TarAggregationStrategy
。