Apache Camel 拆分器、线程池和 JMS
Apache Camel Splitter, Threadpool and JMS
我在 spring xml 中定义了以下路由以拆分文本文件中的行并将每一行发送到 JMS 队列
<bean id="myPool" class="java.util.concurrent.Executors" factory-method="newCachedThreadPool"/>
<camelContext id="concurrent-route-context" xmlns="http://camel.apache.org/schema/spring" trace="true">
<route id="inbox-threadpool-split-route">
<from uri="{{inbox.uri}}" />
<log message="Starting to process file: ${header.CamelFileName}" />
<split streaming="true" executorServiceRef="myPool">
<tokenize token="\n" />
<to uri="{{inventory.queue.uri}}" />
</split>
<log message="Done processing file: ${header.CamelFileName}" />
</route>
</camelContext>
inbox.uri 是一个文件组件 uri 监听目录中的文件,而 inventory.queue.uri 是一个 JmsComponent uri 连接到 JMS 服务器中的队列(Tibco EMS 6.X 版本) . JmsComponent uri 很简单,如 "JmsComponent:queue:?username=&password="
上面的路由可以运行没有报错,但是从文件中拆分出来的行并没有作为JMS消息发送到队列中(即程序运行后队列还是空的)
如果我从拆分器定义中删除 executorServiceRef="myPool"(剩下的定义如
如果我用 "direct" 端点替换 "to" uri,那么无论拆分器中是否使用线程池,拆分后的消息都可以传递
JmsComponent 是否需要任何特殊设置才能使其与 Splitter + 线程池一起使用?或我错过的任何其他配置?
======= 编辑于 20150731 =======
我在使用包含 1000 行的大 CSV 文件进行测试时遇到了上述问题。如果我用一个小文件(例如只有 10 行)进行测试,我可以看到消息被传送到 inventory.queue,但是从日志看来似乎需要 10 秒才能完成拆分并将消息传送到队列... 下面捕获了日志:
2015-07-31 11:02:07,210 [main ] INFO SpringCamelContext - Apache Camel 2.15.0 (CamelContext: concurrent-route-context) started in 1.301 seconds
2015-07-31 11:02:07,220 [main ] INFO MainSupport - Apache Camel 2.15.0 starting
2015-07-31 11:02:17,250 [://target/inbox] INFO inbox-threadpool-split-route - Done processing file: smallfile.csv
查看从 11:02:07 开始的路线并在 11:02:17 显示 "Done processing..." 语句,即 10 秒
如果我用 5 行的 CSV 再次测试,将需要 5 秒...似乎每行需要 1 秒来拆分并传送到 JMS 队列...这非常慢
如果我把"to uri"改成"direct"而不是"JMS",分裂可以在一秒内很快完成
此外,从 JMS 侦听器日志中,它能够在同一秒内接收到所有 10 条消息。 Splitter 似乎会读取并拆分整个文件,"prepare" 所有十行的 10 条 JMS 消息,然后将所有消息传送到队列,但不会 "split 1 row and deliver 1 JMS message immediately"...
是否有任何选项或配置可以改变拆分器行为并增强拆分性能?
我在使用分词器处理 14G 文件时遇到了类似的问题。正如 Claus 在 Parsing Large Files with Apache Camel
上的 post 指出的那样,我能够通过使用 Aggregator 克服性能障碍
聚合批处理消息后,我使用生产者模板将这些消息路由到消息传递系统。希望对您有所帮助。
感谢@Aayush Tuladhar 分享的参考link,我更新了我的路线如下:
<camelContext id="concurrent-route-context" xmlns="http://camel.apache.org/schema/spring" trace="false" >
<route id="inbox-threadpool-split-route">
<from uri="{{inbox.uri}}" />
<log message="Starting to process file: ${header.CamelFileName}" />
<split streaming="true" executorServiceRef="myPool">
<tokenize token="\n" />
<log message="split index - $simple{property.CamelSplitIndex}, row content=$simple{body}" />
<aggregate strategyRef="stringBodyAggregator" completionInterval="750" >
<correlationExpression>
<simple>property.CamelSplitIndex</simple>
</correlationExpression>
<to uri="{{inventory.queue.uri}}" />
</aggregate>
</split>
<log message="Done processing file: ${header.CamelFileName}" />
</route>
</camelContext>
这里的技巧是在拆分器中添加了一个聚合器,它使用
property.CamelSplitIndex
作为相关表达式。 CamelSplitIndex 为每个拆分行不断增加,因此聚合器实际上没有 "aggregating" 任何东西,但结束 "aggregation" 并立即将 JMS 消息排入 JMS 队列。 aggregationStrategy只是简单的join了oldExchange和newExchange,但是这里并不重要,因为它只是用来满足aggregate EIP
需要的属性"strategyRef"
需要注意的一点是,在使用此技巧后,性能瓶颈转移到了 JMS 消息生成器,它每秒传递 1 条消息...我通过利用 CachingConnectionFactory 在中定义 JMS 连接解决了这个问题Spring.
我在 spring xml 中定义了以下路由以拆分文本文件中的行并将每一行发送到 JMS 队列
<bean id="myPool" class="java.util.concurrent.Executors" factory-method="newCachedThreadPool"/>
<camelContext id="concurrent-route-context" xmlns="http://camel.apache.org/schema/spring" trace="true">
<route id="inbox-threadpool-split-route">
<from uri="{{inbox.uri}}" />
<log message="Starting to process file: ${header.CamelFileName}" />
<split streaming="true" executorServiceRef="myPool">
<tokenize token="\n" />
<to uri="{{inventory.queue.uri}}" />
</split>
<log message="Done processing file: ${header.CamelFileName}" />
</route>
</camelContext>
inbox.uri 是一个文件组件 uri 监听目录中的文件,而 inventory.queue.uri 是一个 JmsComponent uri 连接到 JMS 服务器中的队列(Tibco EMS 6.X 版本) . JmsComponent uri 很简单,如 "JmsComponent:queue:?username=&password="
上面的路由可以运行没有报错,但是从文件中拆分出来的行并没有作为JMS消息发送到队列中(即程序运行后队列还是空的)
如果我从拆分器定义中删除 executorServiceRef="myPool"(剩下的定义如
如果我用 "direct" 端点替换 "to" uri,那么无论拆分器中是否使用线程池,拆分后的消息都可以传递
JmsComponent 是否需要任何特殊设置才能使其与 Splitter + 线程池一起使用?或我错过的任何其他配置?
======= 编辑于 20150731 =======
我在使用包含 1000 行的大 CSV 文件进行测试时遇到了上述问题。如果我用一个小文件(例如只有 10 行)进行测试,我可以看到消息被传送到 inventory.queue,但是从日志看来似乎需要 10 秒才能完成拆分并将消息传送到队列... 下面捕获了日志:
2015-07-31 11:02:07,210 [main ] INFO SpringCamelContext - Apache Camel 2.15.0 (CamelContext: concurrent-route-context) started in 1.301 seconds
2015-07-31 11:02:07,220 [main ] INFO MainSupport - Apache Camel 2.15.0 starting
2015-07-31 11:02:17,250 [://target/inbox] INFO inbox-threadpool-split-route - Done processing file: smallfile.csv
查看从 11:02:07 开始的路线并在 11:02:17 显示 "Done processing..." 语句,即 10 秒
如果我用 5 行的 CSV 再次测试,将需要 5 秒...似乎每行需要 1 秒来拆分并传送到 JMS 队列...这非常慢
如果我把"to uri"改成"direct"而不是"JMS",分裂可以在一秒内很快完成
此外,从 JMS 侦听器日志中,它能够在同一秒内接收到所有 10 条消息。 Splitter 似乎会读取并拆分整个文件,"prepare" 所有十行的 10 条 JMS 消息,然后将所有消息传送到队列,但不会 "split 1 row and deliver 1 JMS message immediately"...
是否有任何选项或配置可以改变拆分器行为并增强拆分性能?
我在使用分词器处理 14G 文件时遇到了类似的问题。正如 Claus 在 Parsing Large Files with Apache Camel
上的 post 指出的那样,我能够通过使用 Aggregator 克服性能障碍聚合批处理消息后,我使用生产者模板将这些消息路由到消息传递系统。希望对您有所帮助。
感谢@Aayush Tuladhar 分享的参考link,我更新了我的路线如下:
<camelContext id="concurrent-route-context" xmlns="http://camel.apache.org/schema/spring" trace="false" >
<route id="inbox-threadpool-split-route">
<from uri="{{inbox.uri}}" />
<log message="Starting to process file: ${header.CamelFileName}" />
<split streaming="true" executorServiceRef="myPool">
<tokenize token="\n" />
<log message="split index - $simple{property.CamelSplitIndex}, row content=$simple{body}" />
<aggregate strategyRef="stringBodyAggregator" completionInterval="750" >
<correlationExpression>
<simple>property.CamelSplitIndex</simple>
</correlationExpression>
<to uri="{{inventory.queue.uri}}" />
</aggregate>
</split>
<log message="Done processing file: ${header.CamelFileName}" />
</route>
</camelContext>
这里的技巧是在拆分器中添加了一个聚合器,它使用
property.CamelSplitIndex
作为相关表达式。 CamelSplitIndex 为每个拆分行不断增加,因此聚合器实际上没有 "aggregating" 任何东西,但结束 "aggregation" 并立即将 JMS 消息排入 JMS 队列。 aggregationStrategy只是简单的join了oldExchange和newExchange,但是这里并不重要,因为它只是用来满足aggregate EIP
需要的属性"strategyRef"需要注意的一点是,在使用此技巧后,性能瓶颈转移到了 JMS 消息生成器,它每秒传递 1 条消息...我通过利用 CachingConnectionFactory 在中定义 JMS 连接解决了这个问题Spring.