kafka-python - 如何提交分区?
kafka-python - How do I commit a partition?
使用kafka-python-1.0.2.
如果我有一个包含 10 个分区的主题,我该如何提交一个特定的分区,同时遍历各个分区和消息。我似乎无法在文档或其他任何地方找到这方面的示例
从文档中,我想使用:
consumer.commit(offset=offsets)
具体来说,如何创建偏移所需的分区和 OffsetAndMetadata 字典(dict,可选)– {TopicPartition: OffsetAndMetadata}。
我希望函数调用就像这样:
consumer.commit(partition, offset)
但好像不是这样。
提前致谢。
所以看起来我可能已经想通了,有趣的是当你写下你的问题时会发生这种情况。这似乎有效:
meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)
需要更多测试,但如果有任何变化,将会更新。
没有必要使用元数据。
看这个例子:
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = createKafkaConsumer()
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
tp: OffsetAndMetadata(offset, None)
})
希望对您有所帮助。
from kafka import KafkaConsumer
from kafka import TopicPartition
TOPIC = "test_topic"
PARTITION = 0
consumer = KafkaConsumer(
group_id=TOPIC,
auto_offset_reset="earliest",
bootstrap_servers="localhost:9092",
request_timeout_ms=100000,
session_timeout_ms=99000,
max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
# do something
- 这里只分配一个分区并为该分区设置偏移量,如果有多个分区,则需要为每个分区分配一个然后设置偏移量。
- aalmeida88的回答有时对我有用,在某些情况下,它确实有用,aalmeida88给了我寻求的思路,看来也是一个有用的方法。
- 还有一点你可能需要注意的是,当你自己分配分区时,kafka manager似乎无法获取消费者信息,这可能是因为你分配分区时,你在kafka而不是zookeeper中设置了它,因此卡夫卡经理可能无法获得该信息。
希望对你有帮助ps!
---编辑-----
寻找更好的方法。
topic_partition = TopicPartition(TOPIC,
message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()
这将从kafka获取的消息中提取分区信息并保存手动分配分区的子句,从而在程序中需要设置多个分区的偏移量(并不少见)时带来方便。
ps:为了保证一个分区只设置一次,需要根据自己的应用设置flag。
只需要调用consumer.commit()
from kafka import KafkaConsumer
KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
consumer = KafkaConsumer(
KAFKA_TOPIC_NAME,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=KAFKA_CONSUMER_GROUP
)
for message in consumer:
print(message.value)
consumer.commit() # <--- This is what we need
# Optionally, To check if everything went good
from kafka import TopicPartition
print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
consumer = KafkaConsumer("topic_name", enable_auto_commit= False, bootstrap_servers=["128.0.0.1:9092"],group_id= "group_name")
msg = next(consumer)
consumer.commit({TopicPartition("topic_name", msg.partition): OffsetAndMetadata(msg.offset+1, '')})
使用kafka-python-1.0.2.
如果我有一个包含 10 个分区的主题,我该如何提交一个特定的分区,同时遍历各个分区和消息。我似乎无法在文档或其他任何地方找到这方面的示例
从文档中,我想使用:
consumer.commit(offset=offsets)
具体来说,如何创建偏移所需的分区和 OffsetAndMetadata 字典(dict,可选)– {TopicPartition: OffsetAndMetadata}。
我希望函数调用就像这样:
consumer.commit(partition, offset)
但好像不是这样。
提前致谢。
所以看起来我可能已经想通了,有趣的是当你写下你的问题时会发生这种情况。这似乎有效:
meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)
需要更多测试,但如果有任何变化,将会更新。
没有必要使用元数据。 看这个例子:
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
...
topic = 'your_topic'
partition = 0
tp = TopicPartition(topic,partition)
kafkaConsumer = createKafkaConsumer()
kafkaConsumer.assign([tp])
offset = 15394125
kafkaConsumer.commit({
tp: OffsetAndMetadata(offset, None)
})
希望对您有所帮助。
from kafka import KafkaConsumer
from kafka import TopicPartition
TOPIC = "test_topic"
PARTITION = 0
consumer = KafkaConsumer(
group_id=TOPIC,
auto_offset_reset="earliest",
bootstrap_servers="localhost:9092",
request_timeout_ms=100000,
session_timeout_ms=99000,
max_poll_records=100,
)
topic_partition = TopicPartition(TOPIC, PARTITION)
# format: topic, partition
consumer.assign([topic_partition])
consumer.seek(topic_partition, 1660000)
# format: TopicPartition, offset. 1660000 is the offset been set.
for message in consumer:
# do something
- 这里只分配一个分区并为该分区设置偏移量,如果有多个分区,则需要为每个分区分配一个然后设置偏移量。
- aalmeida88的回答有时对我有用,在某些情况下,它确实有用,aalmeida88给了我寻求的思路,看来也是一个有用的方法。
- 还有一点你可能需要注意的是,当你自己分配分区时,kafka manager似乎无法获取消费者信息,这可能是因为你分配分区时,你在kafka而不是zookeeper中设置了它,因此卡夫卡经理可能无法获得该信息。 希望对你有帮助ps!
---编辑-----
寻找更好的方法。
topic_partition = TopicPartition(TOPIC,
message.partition)
consumer.seek(topic_partition, offset_value)
consumer.commit()
这将从kafka获取的消息中提取分区信息并保存手动分配分区的子句,从而在程序中需要设置多个分区的偏移量(并不少见)时带来方便。
ps:为了保证一个分区只设置一次,需要根据自己的应用设置flag。
只需要调用consumer.commit()
from kafka import KafkaConsumer
KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
consumer = KafkaConsumer(
KAFKA_TOPIC_NAME,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=KAFKA_CONSUMER_GROUP
)
for message in consumer:
print(message.value)
consumer.commit() # <--- This is what we need
# Optionally, To check if everything went good
from kafka import TopicPartition
print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
consumer = KafkaConsumer("topic_name", enable_auto_commit= False, bootstrap_servers=["128.0.0.1:9092"],group_id= "group_name")
msg = next(consumer)
consumer.commit({TopicPartition("topic_name", msg.partition): OffsetAndMetadata(msg.offset+1, '')})