增加 topic:channel 支持队列的并发性
Increase concurrency on supported queue of topic:channel
我在 SpringXD 1.3 上进行测试。0.RELEASE 将消息复制到不同的接收器。我的配置是 RabbitMQ 支持的三节点集群作为消息总线。
我的测试是这样的:
第一个案例
stream create sourceToDuplicate --definition "trigger --fixedDelay=1
--timeUnit=MILLISECONDS --payload='test' > topic:test" --deploy
stream create processMessages1 --definition "topic:test > cassandra --initScript=file:<absolut-path-to>/int-db.cql --ingestQuery='insert into book (isbn, title, author) values (uuid(), ?, ?)'"
stream create processMessages2 --definition "topic:test > aggregator --count=1000 --timeout=1000 | file" --deploy
现在为了增加 cassandra-sink 上的消费者,我想用 "module.cassandra.consumer.concurrency=10" 部署第一个流。这 属性 让部署失败。
我的解决方法现在是第四个流,这样我就可以增加消费者:
第二种情况
stream create topicToQueue1 --definition "topic:test > queue:test1" --deploy
stream create processMessage1 --definition "queue:test1 > cassandra..."
stream deploy processMessage1 --properties "module.cassandra.consumer.concurrency=10"
最后我的问题是:如果在 rabbitmq 上已经为允许更多消费者的 topic:channel 添加了一个队列,为什么第一个用例会失败?
祝大家圣诞快乐
---更新---
版本:SpringXD 1.3.0.RELEASE
错误:
2015-12-18T13:58:28+0100 1.3.0.RELEASE INFO DeploymentSupervisor-0
zk.ZKStreamDeploymentHandler - Deployment status for stream 'processMessage1':
DeploymentStatus{state=failed,error(s)=java.lang.IllegalArgumentException:
RabbitMessageBus does not support consumer property: concurrency for processMessage1.topic:test.
at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateProperties(MessageBusSupport.java:786)
at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateConsumerProperties(MessageBusSupport.java:757)
at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus.bindPubSubConsumer(RabbitMessageBus.java:472)
at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageConsumer(AbstractMessageBusBinderPlugin.java:275)
at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:155)
at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73)
at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:509)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:503)
at org.apache.curator.framework.listen.ListenerContainer.run(ListenerContainer.java:92)
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.run(PathChildrenCache.java:762)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
topic:
命名通道上的并发数不能大于 1 - 否则每个线程都会获得消息的副本。
如果您想在命名通道上使用并发,它必须是 queue:
这样每个线程都会竞争消息。
我在 SpringXD 1.3 上进行测试。0.RELEASE 将消息复制到不同的接收器。我的配置是 RabbitMQ 支持的三节点集群作为消息总线。 我的测试是这样的:
第一个案例
stream create sourceToDuplicate --definition "trigger --fixedDelay=1 --timeUnit=MILLISECONDS --payload='test' > topic:test" --deploy stream create processMessages1 --definition "topic:test > cassandra --initScript=file:<absolut-path-to>/int-db.cql --ingestQuery='insert into book (isbn, title, author) values (uuid(), ?, ?)'" stream create processMessages2 --definition "topic:test > aggregator --count=1000 --timeout=1000 | file" --deploy
现在为了增加 cassandra-sink 上的消费者,我想用 "module.cassandra.consumer.concurrency=10" 部署第一个流。这 属性 让部署失败。
我的解决方法现在是第四个流,这样我就可以增加消费者:
第二种情况
stream create topicToQueue1 --definition "topic:test > queue:test1" --deploy stream create processMessage1 --definition "queue:test1 > cassandra..." stream deploy processMessage1 --properties "module.cassandra.consumer.concurrency=10"
最后我的问题是:如果在 rabbitmq 上已经为允许更多消费者的 topic:channel 添加了一个队列,为什么第一个用例会失败?
祝大家圣诞快乐
---更新---
版本:SpringXD 1.3.0.RELEASE
错误:
2015-12-18T13:58:28+0100 1.3.0.RELEASE INFO DeploymentSupervisor-0
zk.ZKStreamDeploymentHandler - Deployment status for stream 'processMessage1':
DeploymentStatus{state=failed,error(s)=java.lang.IllegalArgumentException:
RabbitMessageBus does not support consumer property: concurrency for processMessage1.topic:test.
at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateProperties(MessageBusSupport.java:786)
at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateConsumerProperties(MessageBusSupport.java:757)
at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus.bindPubSubConsumer(RabbitMessageBus.java:472)
at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageConsumer(AbstractMessageBusBinderPlugin.java:275)
at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:155)
at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73)
at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:509)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:503)
at org.apache.curator.framework.listen.ListenerContainer.run(ListenerContainer.java:92)
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.run(PathChildrenCache.java:762)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
topic:
命名通道上的并发数不能大于 1 - 否则每个线程都会获得消息的副本。
如果您想在命名通道上使用并发,它必须是 queue:
这样每个线程都会竞争消息。