如何在 Kafka 流消费者上减慢或设置给定速度?
How to slow down or set given speed on the Kafka stream consumer?
我正在尝试控制 KStream 消耗的消息数量,但我不太成功。
我正在使用:
max.poll.interval.ms=100
和
max.poll.records=20
每秒获得 200 条消息。
不过好像不太好,我统计的也是每秒500条消息。
流消费端还要设置什么?
I am using: max.poll.interval.ms=100 and max.poll.records=20 to get
like 200 messages per second.
max.poll.interval.ms 和 max.poll.records 属性不能以这种方式工作。
max.poll.interval.ms 指示消费者在每次消费者轮询主题之间必须等待的最大时间间隔(以毫秒为单位)。
max.poll.records表示consumer每次轮询topic时最多可以消费的记录数。
每次轮询之间的间隔不受上述两个属性控制,而是由您的消费者确认获取的记录所花费的时间控制。
例如,假设存在一个主题 X,其中有 1000 条记录,消费者确认获取的记录所花费的时间为 20 毫秒。在 max.poll.interval.ms = 100 和 max.poll.records = 20 的情况下,消费者将每 20 毫秒轮询一次 Kafka 主题,并且在每次轮询中,最多将获取 20 条记录。如果确认获取的记录所花费的时间大于 max.poll.interval.ms,轮询将被视为失败,并且将从 Kafka 主题再次轮询该特定批次。
您可以在消费者端使用类似 akka-stream-kafka(又名 reactive-kafka)的东西。 akka-streams 具有很好的节流功能,在这里会派上用场:
http://doc.akka.io/docs/akka/snapshot/java/stream/stream-quickstart.html#time-based-processing
一个 KafkaConsumer(也是 KafkaStreams
内部使用的 KafkaConsumer,尽可能快地读取记录。
您提到的参数会对性能产生影响,但您无法控制实际数据速率。另外需要注意的是,max.poll.records
只配置了poll()
return的记录数,对client-broker通信没有影响。 KafkaConsumer
可以在与代理对话时获取更多记录,然后 return 在 poll()
上缓冲消息,只要记录在缓冲区中(即,对于这种情况,poll()
是一个 client-side 运算符,仅确保您不会通过 max.poll.interval.ms
超时)。因此,您可能对 fetch.max.bytes
更感兴趣,它决定了从代理获取的字节大小。如果减少此参数,消费者的效率会降低,因此吞吐量会降低。 (虽然不推荐)。
配置吞吐量的另一种方法是配额 (https://kafka.apache.org/documentation/#design_quotas) 这是代理端配置,允许您限制客户端可以读取 and/or 写入的数据量。
在 Kafka Streams 中(以及使用普通 KafkaConsumer 时)最好的做法是手动限制对 poll()
的调用。对于 Kafka Streams,您可以将 Thread.sleep()
添加到任何 UDF 中。如果您不想将其搭载到现有的运算符中,您只需添加一个具有短暂状态的 foreach()
(即 class 成员变量)来跟踪吞吐量并计算您需要多少休眠以相应地限制吞吐量。
在 Kafka 中有 Kafka Quota 的新概念。
所有细节都在这里Kafka -> 4.9 Quotas
我正在尝试控制 KStream 消耗的消息数量,但我不太成功。
我正在使用:
max.poll.interval.ms=100
和
max.poll.records=20
每秒获得 200 条消息。
不过好像不太好,我统计的也是每秒500条消息。
流消费端还要设置什么?
I am using: max.poll.interval.ms=100 and max.poll.records=20 to get like 200 messages per second.
max.poll.interval.ms 和 max.poll.records 属性不能以这种方式工作。
max.poll.interval.ms 指示消费者在每次消费者轮询主题之间必须等待的最大时间间隔(以毫秒为单位)。
max.poll.records表示consumer每次轮询topic时最多可以消费的记录数。
每次轮询之间的间隔不受上述两个属性控制,而是由您的消费者确认获取的记录所花费的时间控制。
例如,假设存在一个主题 X,其中有 1000 条记录,消费者确认获取的记录所花费的时间为 20 毫秒。在 max.poll.interval.ms = 100 和 max.poll.records = 20 的情况下,消费者将每 20 毫秒轮询一次 Kafka 主题,并且在每次轮询中,最多将获取 20 条记录。如果确认获取的记录所花费的时间大于 max.poll.interval.ms,轮询将被视为失败,并且将从 Kafka 主题再次轮询该特定批次。
您可以在消费者端使用类似 akka-stream-kafka(又名 reactive-kafka)的东西。 akka-streams 具有很好的节流功能,在这里会派上用场:
http://doc.akka.io/docs/akka/snapshot/java/stream/stream-quickstart.html#time-based-processing
一个 KafkaConsumer(也是 KafkaStreams
内部使用的 KafkaConsumer,尽可能快地读取记录。
您提到的参数会对性能产生影响,但您无法控制实际数据速率。另外需要注意的是,max.poll.records
只配置了poll()
return的记录数,对client-broker通信没有影响。 KafkaConsumer
可以在与代理对话时获取更多记录,然后 return 在 poll()
上缓冲消息,只要记录在缓冲区中(即,对于这种情况,poll()
是一个 client-side 运算符,仅确保您不会通过 max.poll.interval.ms
超时)。因此,您可能对 fetch.max.bytes
更感兴趣,它决定了从代理获取的字节大小。如果减少此参数,消费者的效率会降低,因此吞吐量会降低。 (虽然不推荐)。
配置吞吐量的另一种方法是配额 (https://kafka.apache.org/documentation/#design_quotas) 这是代理端配置,允许您限制客户端可以读取 and/or 写入的数据量。
在 Kafka Streams 中(以及使用普通 KafkaConsumer 时)最好的做法是手动限制对 poll()
的调用。对于 Kafka Streams,您可以将 Thread.sleep()
添加到任何 UDF 中。如果您不想将其搭载到现有的运算符中,您只需添加一个具有短暂状态的 foreach()
(即 class 成员变量)来跟踪吞吐量并计算您需要多少休眠以相应地限制吞吐量。
在 Kafka 中有 Kafka Quota 的新概念。
所有细节都在这里Kafka -> 4.9 Quotas