如何强制消费者读取kafka中的特定分区
How to force a consumer to read a specific partition in kafka
我有一个应用程序可以从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个包含 5 个分区的主题,并且有 5 个 kafka 消费者。但是网页下载的超时时间是 60 秒。
当 url 之一正在下载时,服务器假定消息丢失并将数据重新发送给不同的消费者。
我已经尝试了
中提到的所有内容
Kafka consumer configuration / performance issues
和
https://github.com/spring-projects/spring-kafka/issues/202
但我每次都会收到不同的错误。
是否可以将特定消费者与kafka中的分区绑定?
我正在为我的应用程序使用 kafka-python
我从未使用过 Python 客户端,但 Java 客户端支持 assign
方法,您可以使用该方法代替 subscribe
请求分配特定的主题的分区。当然,您失去了自动重新平衡功能,您必须手动处理这些用例。
也许我猜你的情况到底发生了什么。如果你的consumer从kafka中获取url,然后去下载内容,你说大概需要60s左右。所以你的消费者因为下载而阻止它,并且无法将心跳发送到kafka服务器。所以kafka服务器认为这个消费者宕机了,所以它做了一个组重新平衡,并将未提交的消息重新发送给其他消费者。
因此您可以尝试两种解决方案:
将配置 session_timeout_ms
设置为 60000 或更大。默认是30s,对你来说不够
更好的解决方案是使用多线程。当你的consumer从kafka中获取消息,然后启动一个新的线程来下载内容时,它不会阻塞consumer.poll,所以它可以很好地工作。
我错过了 Kafka 的文档-python。我们可以使用 TopicPartition class 为特定消费者分配一个分区。
http://kafka-python.readthedocs.io/en/master/
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
我有一个应用程序可以从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容。我创建了一个包含 5 个分区的主题,并且有 5 个 kafka 消费者。但是网页下载的超时时间是 60 秒。 当 url 之一正在下载时,服务器假定消息丢失并将数据重新发送给不同的消费者。
我已经尝试了
中提到的所有内容Kafka consumer configuration / performance issues
和
https://github.com/spring-projects/spring-kafka/issues/202
但我每次都会收到不同的错误。
是否可以将特定消费者与kafka中的分区绑定? 我正在为我的应用程序使用 kafka-python
我从未使用过 Python 客户端,但 Java 客户端支持 assign
方法,您可以使用该方法代替 subscribe
请求分配特定的主题的分区。当然,您失去了自动重新平衡功能,您必须手动处理这些用例。
也许我猜你的情况到底发生了什么。如果你的consumer从kafka中获取url,然后去下载内容,你说大概需要60s左右。所以你的消费者因为下载而阻止它,并且无法将心跳发送到kafka服务器。所以kafka服务器认为这个消费者宕机了,所以它做了一个组重新平衡,并将未提交的消息重新发送给其他消费者。
因此您可以尝试两种解决方案:
将配置
session_timeout_ms
设置为 60000 或更大。默认是30s,对你来说不够更好的解决方案是使用多线程。当你的consumer从kafka中获取消息,然后启动一个新的线程来下载内容时,它不会阻塞consumer.poll,所以它可以很好地工作。
我错过了 Kafka 的文档-python。我们可以使用 TopicPartition class 为特定消费者分配一个分区。
http://kafka-python.readthedocs.io/en/master/
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)