Kafka AvroConsumer 使用 offsets_for_times 从时间戳消费
Kafka AvroConsumer consume from timestamp using offsets_for_times
正在尝试使用 confluent_kafka.AvroConsumer 使用给定时间戳中的消息。
if flag:
# creating a list
topic_partitons_to_search = list(
map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))
print("Searching for offsets with %s" % topic_partitons_to_search)
offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0)
print("offsets_for_times results: %s" % offsets)
for x in offsets:
c.seek(x)
flag=False
控制台returns这个
Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]
offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]
{'name': 'Hello'}
{'name': 'Hello'}
{'name': 'Hello1'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Offset 8'}
{'name': 'Offset 9'}
{'name': 'Offset 10'}
{'name': 'Offset 11'}
{'name': 'New'}
这些是我在 my_topic2 的分区 0 中的所有消息(分区 1 中没有任何消息),我们应该不会返回任何消息,因为我们没有从当前时间生成的消息 (time.time()) .然后我希望能够使用 time.time() - 60000
之类的东西来获取最后 60000 毫秒内的所有消息
Python time.time() returns 自纪元以来的秒数,offsets_for_times 使用纪元以来的毫秒数,所以当我发送秒数时它计算的日期比今天早得多,这意味着我们应该包括我所有的偏移量。
正在尝试使用 confluent_kafka.AvroConsumer 使用给定时间戳中的消息。
if flag:
# creating a list
topic_partitons_to_search = list(
map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))
print("Searching for offsets with %s" % topic_partitons_to_search)
offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0)
print("offsets_for_times results: %s" % offsets)
for x in offsets:
c.seek(x)
flag=False
控制台returns这个
Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]
offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]
{'name': 'Hello'}
{'name': 'Hello'}
{'name': 'Hello1'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Offset 8'}
{'name': 'Offset 9'}
{'name': 'Offset 10'}
{'name': 'Offset 11'}
{'name': 'New'}
这些是我在 my_topic2 的分区 0 中的所有消息(分区 1 中没有任何消息),我们应该不会返回任何消息,因为我们没有从当前时间生成的消息 (time.time()) .然后我希望能够使用 time.time() - 60000
之类的东西来获取最后 60000 毫秒内的所有消息
Python time.time() returns 自纪元以来的秒数,offsets_for_times 使用纪元以来的毫秒数,所以当我发送秒数时它计算的日期比今天早得多,这意味着我们应该包括我所有的偏移量。