如何强制消费者读取kafka中的特定分区

How to force a consumer to read a specific partition in kafka

我有一个应用程序可以从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个包含 5 个分区的主题,并且有 5 个 kafka 消费者。但是网页下载的超时时间是 60 秒。 当 url 之一正在下载时,服务器假定消息丢失并将数据重新发送给不同的消费者。

我已经尝试了

中提到的所有内容

Kafka consumer configuration / performance issues

https://github.com/spring-projects/spring-kafka/issues/202

但我每次都会收到不同的错误。

是否可以将特定消费者与kafka中的分区绑定? 我正在为我的应用程序使用 kafka-python

我从未使用过 Python 客户端,但 Java 客户端支持 assign 方法,您可以使用该方法代替 subscribe 请求分配特定的主题的分区。当然,您失去了自动重新平衡功能,您必须手动处理这些用例。

也许我猜你的情况到底发生了什么。如果你的consumer从kafka中获取url,然后去下载内容,你说大概需要60s左右。所以你的消费者因为下载而阻止它,并且无法将心跳发送到kafka服务器。所以kafka服务器认为这个消费者宕机了,所以它做了一个组重新平衡,并将未提交的消息重新发送给其他消费者。

因此您可以尝试两种解决方案:

  1. 将配置 session_timeout_ms 设置为 60000 或更大。默认是30s,对你来说不够

  2. 更好的解决方案是使用多线程。当你的consumer从kafka中获取消息,然后启动一个新的线程来下载内容时,它不会阻塞consumer.poll,所以它可以很好地工作。

我错过了 Kafka 的文档-python。我们可以使用 TopicPartition class 为特定消费者分配一个分区。

http://kafka-python.readthedocs.io/en/master/

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)