Python Kafka消费者阅读已读消息
Python Kafka consumer reading already read messages
Kafka 消费者代码 -
def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, "data")
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=1000,
group_id="Group2",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
consumer.seek_to_beginning(topic_partition)
for message in consumer:
print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()
预期行为
它应该只读取生产者写入的最后一条消息。它应该只打印:
file_data key=None value=b'data'
当前行为
在 运行 代码后打印:
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
您需要使用 consumer.seek_to_end(topic_partition)
而不是 consumer.seek_to_beginning(topic_partition)
。
来自文档:
Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection) and seekToEnd(Collection) respectively).
简单形式:
consumer = KafkaConsumer('topicName', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
consumer.poll()
consumer.seek_to_end()
for message in consumer:
print(message)
consumer.close()
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import KafkaProducer
def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, b'data')
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=1000,
group_id="Group2",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
# consumer.seek_to_beginning(topic_partition)
for message in consumer:
print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()
test()
这符合您的预期。如果你想让它从头开始,那么只调用 seekToBeginning
Kafka 消费者代码 -
def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, "data")
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=1000,
group_id="Group2",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
consumer.seek_to_beginning(topic_partition)
for message in consumer:
print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()
预期行为 它应该只读取生产者写入的最后一条消息。它应该只打印:
file_data key=None value=b'data'
当前行为 在 运行 代码后打印:
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
您需要使用 consumer.seek_to_end(topic_partition)
而不是 consumer.seek_to_beginning(topic_partition)
。
来自文档:
Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection) and seekToEnd(Collection) respectively).
简单形式:
consumer = KafkaConsumer('topicName', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
consumer.poll()
consumer.seek_to_end()
for message in consumer:
print(message)
consumer.close()
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import KafkaProducer
def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, b'data')
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=1000,
group_id="Group2",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
# consumer.seek_to_beginning(topic_partition)
for message in consumer:
print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()
test()
这符合您的预期。如果你想让它从头开始,那么只调用 seekToBeginning