Kafka Streams - 处理超时
Kafka KStreams - processing timeouts
我正在尝试使用 <KStream>.process()
和 TimeWindows.of("name", 30000)
来批量处理一些 KTable 值并发送它们。似乎 30 秒超过了消费者超时间隔,在此之后 Kafka 认为该消费者已失效并释放分区。
我已经尝试提高 poll 和 commit interval 的频率来避免这种情况:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
不幸的是,这些错误仍在发生:
(很多)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
其次是这些:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
显然我需要更频繁地将心跳发送回服务器。怎么样?
我的拓扑是:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
KTable 每 30 秒按键对值进行分组。在 Processor.init() 我调用 context.schedule(30000)
.
DBProcessorSupplier 提供 DBProcessor 的实例。这是 AbstractProcessor 的一个实现,其中提供了所有覆盖。他们所做的只是记录,所以我知道每个人何时被击中。
这是一个非常简单的拓扑,但很明显我在某处遗漏了一步。
编辑:
我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案。我喜欢当客户端退出/死亡时分区很快可用的概念。
编辑:
为了简化问题,我从图中删除了聚合步骤。现在只是消费者->处理器()。 (如果我将消费者直接发送到 .print()
,它会很快工作,所以我知道没问题)。 (同样,如果我通过 .print()
输出聚合(KTable),它似乎也可以)。
我发现 .process()
- 应该 每 30 秒调用 .punctuate()
实际上阻塞了可变的时间长度并输出了一些随机(如果有的话)。
进一步:
我将调试级别设置为 'debug' 并重新运行。我看到了很多消息:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>
但 .punctuate()
函数中的断点未命中。所以它做了很多工作,但没有给我机会使用它。
几点说明:
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
是提交间隔的下限,即在提交之后,下一次提交不会在此时间之前发生。基本上,Kafka Stream 会在这段时间过后尝试尽快提交,但无法保证下一次提交实际需要多长时间。
StreamsConfig.POLL_MS_CONFIG
用于内部KafkaConsumer#poll()
调用,指定poll()
调用的最大阻塞时间。
因此,这两个值都无助于心跳更频繁。
Kafka Streams 在处理记录时遵循 "depth-first" 策略。这意味着,在每个记录的 poll()
之后,将执行拓扑的所有运算符。假设您有三个连续的地图,那么在处理 next/second 记录之前,将调用所有三个地图作为第一条记录。
因此,在第一个 poll()
的所有记录得到完全处理后,将进行下一个 poll()
调用。如果你想更频繁地检测信号,你需要确保单个 poll()
调用获取更少的记录,这样处理所有记录花费的时间更少,下一个 poll()
将被更早地触发。
您可以使用通过 StreamsConfig
指定的 KafkaConsumer
的配置参数来完成此操作(请参阅 https://kafka.apache.org/documentation.html#consumerconfigs):
streamConfig.put(ConsumerConfig.XXX, VALUE);
max.poll.records
:如果您减小此值,将轮询较少的记录
session.timeout.ms
:如果你增加这个值,有更多的时间来处理数据(添加这个是为了完整性,因为它实际上是一个客户端设置而不是 server/broker 端配置 - 即使您知道此解决方案但不喜欢它:))
EDIT
As of Kafka 0.10.1
it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time).
To prefix a parameter you can use StreamsConfig#consumerPrefix()
or StreamsConfig#producerPrefix()
, respectively. For example:
streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);
还要补充一点:这个问题中描述的场景是一个已知问题,并且已经有 KIP-62 为 KafkaConsumer
引入了一个发送心跳的后台线程,从而将心跳与 poll()
来电。 Kafka Streams 将在即将发布的版本中利用这一新功能。
我正在尝试使用 <KStream>.process()
和 TimeWindows.of("name", 30000)
来批量处理一些 KTable 值并发送它们。似乎 30 秒超过了消费者超时间隔,在此之后 Kafka 认为该消费者已失效并释放分区。
我已经尝试提高 poll 和 commit interval 的频率来避免这种情况:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
不幸的是,这些错误仍在发生:
(很多)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
其次是这些:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
显然我需要更频繁地将心跳发送回服务器。怎么样?
我的拓扑是:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
KTable 每 30 秒按键对值进行分组。在 Processor.init() 我调用 context.schedule(30000)
.
DBProcessorSupplier 提供 DBProcessor 的实例。这是 AbstractProcessor 的一个实现,其中提供了所有覆盖。他们所做的只是记录,所以我知道每个人何时被击中。
这是一个非常简单的拓扑,但很明显我在某处遗漏了一步。
编辑:
我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案。我喜欢当客户端退出/死亡时分区很快可用的概念。
编辑:
为了简化问题,我从图中删除了聚合步骤。现在只是消费者->处理器()。 (如果我将消费者直接发送到 .print()
,它会很快工作,所以我知道没问题)。 (同样,如果我通过 .print()
输出聚合(KTable),它似乎也可以)。
我发现 .process()
- 应该 每 30 秒调用 .punctuate()
实际上阻塞了可变的时间长度并输出了一些随机(如果有的话)。
进一步:
我将调试级别设置为 'debug' 并重新运行。我看到了很多消息:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>
但 .punctuate()
函数中的断点未命中。所以它做了很多工作,但没有给我机会使用它。
几点说明:
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
是提交间隔的下限,即在提交之后,下一次提交不会在此时间之前发生。基本上,Kafka Stream 会在这段时间过后尝试尽快提交,但无法保证下一次提交实际需要多长时间。StreamsConfig.POLL_MS_CONFIG
用于内部KafkaConsumer#poll()
调用,指定poll()
调用的最大阻塞时间。
因此,这两个值都无助于心跳更频繁。
Kafka Streams 在处理记录时遵循 "depth-first" 策略。这意味着,在每个记录的 poll()
之后,将执行拓扑的所有运算符。假设您有三个连续的地图,那么在处理 next/second 记录之前,将调用所有三个地图作为第一条记录。
因此,在第一个 poll()
的所有记录得到完全处理后,将进行下一个 poll()
调用。如果你想更频繁地检测信号,你需要确保单个 poll()
调用获取更少的记录,这样处理所有记录花费的时间更少,下一个 poll()
将被更早地触发。
您可以使用通过 StreamsConfig
指定的 KafkaConsumer
的配置参数来完成此操作(请参阅 https://kafka.apache.org/documentation.html#consumerconfigs):
streamConfig.put(ConsumerConfig.XXX, VALUE);
max.poll.records
:如果您减小此值,将轮询较少的记录session.timeout.ms
:如果你增加这个值,有更多的时间来处理数据(添加这个是为了完整性,因为它实际上是一个客户端设置而不是 server/broker 端配置 - 即使您知道此解决方案但不喜欢它:))
EDIT
As of Kafka
0.10.1
it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can useStreamsConfig#consumerPrefix()
orStreamsConfig#producerPrefix()
, respectively. For example:streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);
还要补充一点:这个问题中描述的场景是一个已知问题,并且已经有 KIP-62 为 KafkaConsumer
引入了一个发送心跳的后台线程,从而将心跳与 poll()
来电。 Kafka Streams 将在即将发布的版本中利用这一新功能。