在触发下一个路由之前等待所有文件被消耗

Wait for all files to be consumed before triggering next route

我有一个路由使用一个文件位置(递归)和一个在另一个文件位置处理文件的路由。有时第一条路线可能会找到多个文件。

from(fileLocation)
 .autoStartup(true)
 .routeId("file-scanner")
 .to(newFileLocation)
 .to("direct:processFile")

from("direct:processFile")
 .routeId("file-processor")
 .bean(*doing some stuff with new file location*)

所以有时最终的情况是 file-scanner 复制一个文件,file-processor 处理该文件,然后 file-scanner 再复制一个文件,然后 运行又来了。

我本质上想要的是 file-scannerfile-processor 开始处理文件之前复制所有文件。这样,我就可以运行只处理一次。

我使用的 fileLocation 是使用如下配置定义的:

recursive=true&noop=true&idempotent=false&delay=3600000&include=.*.json&autoCreate=false&startingDirectoryMustExist=true&readLock=none

所有决策都围绕批处理消费者交换属性。 我认为,您可以实施两种截然不同的解决方案:

  • 解决方案,基于聚合器集成模式。您需要将批处理中的所有文件聚合到字符串列表中,因为您的文件包含 JSON。聚合器选项“completionFromBatchConsumer”可以帮助您聚合从给定轮询中的文件端点消耗的所有文件。聚合后,您可以一起处理聚合文件。也许您可以开发自定义聚合策略来实现您的 bean 的逻辑,标记为 "doing some stuff with new file location".

  • 触发器,基于控制总线集成模式:


    from(fileLocation) 
    .autoStartup(true)  
    .routeId("file-scanner")  
    .to(newFileLocation)
    .choice().when(exchangeProperty("CamelBatchComplete"))
    .process(new Processor() {
       @Override
       public void process(Exchange exchange) throws Exception {
           exchange.getContext().startRoute("file-processor");
       }
    })
    .end();

    from(newFileLocation).  
    .routeId("file-processor") 
    .autoStartup(false)   
    .bean(*doing some stuff with new file location*)
    .choice().when(exchangeProperty("CamelBatchComplete"))
    .process(new Processor() {
       @Override
       public void process(Exchange exchange) throws Exception {
           exchange.getContext().stopRoute("file-processor");
       }
    })
    .end();