在触发下一个路由之前等待所有文件被消耗
Wait for all files to be consumed before triggering next route
我有一个路由使用一个文件位置(递归)和一个在另一个文件位置处理文件的路由。有时第一条路线可能会找到多个文件。
from(fileLocation)
.autoStartup(true)
.routeId("file-scanner")
.to(newFileLocation)
.to("direct:processFile")
from("direct:processFile")
.routeId("file-processor")
.bean(*doing some stuff with new file location*)
所以有时最终的情况是 file-scanner
复制一个文件,file-processor
处理该文件,然后 file-scanner
再复制一个文件,然后 运行又来了。
我本质上想要的是 file-scanner
在 file-processor
开始处理文件之前复制所有文件。这样,我就可以运行只处理一次。
我使用的 fileLocation
是使用如下配置定义的:
recursive=true&noop=true&idempotent=false&delay=3600000&include=.*.json&autoCreate=false&startingDirectoryMustExist=true&readLock=none
所有决策都围绕批处理消费者交换属性。
我认为,您可以实施两种截然不同的解决方案:
解决方案,基于聚合器集成模式。您需要将批处理中的所有文件聚合到字符串列表中,因为您的文件包含 JSON。聚合器选项“completionFromBatchConsumer”可以帮助您聚合从给定轮询中的文件端点消耗的所有文件。聚合后,您可以一起处理聚合文件。也许您可以开发自定义聚合策略来实现您的 bean 的逻辑,标记为 "doing some stuff with new file location".
触发器,基于控制总线集成模式:
from(fileLocation)
.autoStartup(true)
.routeId("file-scanner")
.to(newFileLocation)
.choice().when(exchangeProperty("CamelBatchComplete"))
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getContext().startRoute("file-processor");
}
})
.end();
from(newFileLocation).
.routeId("file-processor")
.autoStartup(false)
.bean(*doing some stuff with new file location*)
.choice().when(exchangeProperty("CamelBatchComplete"))
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getContext().stopRoute("file-processor");
}
})
.end();
我有一个路由使用一个文件位置(递归)和一个在另一个文件位置处理文件的路由。有时第一条路线可能会找到多个文件。
from(fileLocation)
.autoStartup(true)
.routeId("file-scanner")
.to(newFileLocation)
.to("direct:processFile")
from("direct:processFile")
.routeId("file-processor")
.bean(*doing some stuff with new file location*)
所以有时最终的情况是 file-scanner
复制一个文件,file-processor
处理该文件,然后 file-scanner
再复制一个文件,然后 运行又来了。
我本质上想要的是 file-scanner
在 file-processor
开始处理文件之前复制所有文件。这样,我就可以运行只处理一次。
我使用的 fileLocation
是使用如下配置定义的:
recursive=true&noop=true&idempotent=false&delay=3600000&include=.*.json&autoCreate=false&startingDirectoryMustExist=true&readLock=none
所有决策都围绕批处理消费者交换属性。 我认为,您可以实施两种截然不同的解决方案:
解决方案,基于聚合器集成模式。您需要将批处理中的所有文件聚合到字符串列表中,因为您的文件包含 JSON。聚合器选项“completionFromBatchConsumer”可以帮助您聚合从给定轮询中的文件端点消耗的所有文件。聚合后,您可以一起处理聚合文件。也许您可以开发自定义聚合策略来实现您的 bean 的逻辑,标记为 "doing some stuff with new file location".
触发器,基于控制总线集成模式:
from(fileLocation) .autoStartup(true) .routeId("file-scanner") .to(newFileLocation) .choice().when(exchangeProperty("CamelBatchComplete")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getContext().startRoute("file-processor"); } }) .end(); from(newFileLocation). .routeId("file-processor") .autoStartup(false) .bean(*doing some stuff with new file location*) .choice().when(exchangeProperty("CamelBatchComplete")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getContext().stopRoute("file-processor"); } }) .end();