附加分配给特定分区的 KafaConsumer
Attaching KafaConsumer assigned to a specific partition
如何将 python 消费者脚本附加到特定的 kafka 分区。
在 运行 消费者脚本的两个实例(如下所示)中,每个实例随机选取一个分区,然后 consumes/prints 该特定分区的所有消息,正如预期的那样。
但是由于我需要将这些消息输出到磁盘上名为本地文件的分区,因此将脚本的每个实例附加到预先声明的分区 ID 会使事情变得更容易
文件名例如。 :
Date/Hour/PARTITION_ID-0.CSV
Date/Hour/PARTITION_ID-1.CSV
关于如何实现这一点的任何想法。
随意提出替代方法。
卡夫卡设置:
Topic:my-topic3 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: my-topic3 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: my-topic3 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Kafka 消费者脚本(在 python 中)[WITH FIX]
from kafka import KafkaConsumer
from kafka import TopicPartition
# To consume latest messages and auto-commit offsets
#consumer = KafkaConsumer('my-topic3',
# group_id='my-group',
# bootstrap_servers=['192.168.150.80:9092'])
# To consume messages from a specific PARTITION [ FIX ]
consumer = KafkaConsumer(bootstrap_servers='192.168.150.80:9092')
consumer.assign([TopicPartition('my-topic3', 1)])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("Topic= %s : Partition= %d : Offset= %d: key= %s value= %s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
更新:如下所示,我使用了赋值函数,但一直出现非法状态错误
赋值函数
consumer.assign([TopicPartition('my-topic3',1)])
错误
Traceback (most recent call last):
File "consumerExample.py", line 13, in <module>
consumer.assign([TopicPartition('my-topic3',1)])
File "/usr/lib/python2.7/site-packages/kafka/consumer/group.py", line 278, in assign
self._subscription.assign_from_user(partitions)
File "/usr/lib/python2.7/site-packages/kafka/consumer/subscription_state.py", line 189, in assign_from_user
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
kafka.errors.IllegalStateError: You must choose only one way to configure
You can use the assign()
method 手动分配一个或多个分区给消费者。
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
如何将 python 消费者脚本附加到特定的 kafka 分区。
在 运行 消费者脚本的两个实例(如下所示)中,每个实例随机选取一个分区,然后 consumes/prints 该特定分区的所有消息,正如预期的那样。
但是由于我需要将这些消息输出到磁盘上名为本地文件的分区,因此将脚本的每个实例附加到预先声明的分区 ID 会使事情变得更容易
文件名例如。 :
Date/Hour/PARTITION_ID-0.CSV
Date/Hour/PARTITION_ID-1.CSV
关于如何实现这一点的任何想法。
随意提出替代方法。
卡夫卡设置:
Topic:my-topic3 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: my-topic3 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: my-topic3 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Kafka 消费者脚本(在 python 中)[WITH FIX]
from kafka import KafkaConsumer
from kafka import TopicPartition
# To consume latest messages and auto-commit offsets
#consumer = KafkaConsumer('my-topic3',
# group_id='my-group',
# bootstrap_servers=['192.168.150.80:9092'])
# To consume messages from a specific PARTITION [ FIX ]
consumer = KafkaConsumer(bootstrap_servers='192.168.150.80:9092')
consumer.assign([TopicPartition('my-topic3', 1)])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("Topic= %s : Partition= %d : Offset= %d: key= %s value= %s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
更新:如下所示,我使用了赋值函数,但一直出现非法状态错误
赋值函数
consumer.assign([TopicPartition('my-topic3',1)])
错误
Traceback (most recent call last):
File "consumerExample.py", line 13, in <module>
consumer.assign([TopicPartition('my-topic3',1)])
File "/usr/lib/python2.7/site-packages/kafka/consumer/group.py", line 278, in assign
self._subscription.assign_from_user(partitions)
File "/usr/lib/python2.7/site-packages/kafka/consumer/subscription_state.py", line 189, in assign_from_user
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
kafka.errors.IllegalStateError: You must choose only one way to configure
You can use the assign()
method 手动分配一个或多个分区给消费者。
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)