不同主题使用相同的偏移值
The same offset value is used by different topics
我们的拓扑使用 KafkaSpout
从 kafka 主题中获取消息。我们有约 150 个主题,约 12 个分区,8 个风暴执行器和 2 个风暴节点上的任务。
Storm 版本 1.0.5,Kafka 代理版本 10.0.2,Kafka 客户端版本 0.9.0.1。我们不会删除 Kafka 主题。
在某个时刻,我在 worker.log
中观察到大量重复的 WARN 消息
2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils
Thread-15-kafka-spout-executor[18 18] [WARN] Partition{host1:9092,
topic=topic_1, partition=10} Got fetch request with offset out of
range: [9248]
2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils
Thread-23-kafka-spout-executor[16 16] [WARN]
Partition{host=host2:9092, topic=topic_2, partition=0} Got fetch
request with offset out of range: [22650006]
2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils
Thread-23-kafka-spout-executor[16 16] [WARN]
Partition{host=host3:9092, topic=topic_3, partition=4} Got fetch
request with offset out of range: [1011584]
2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils
Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host1:9092,
topic=topic4, partition=4} Got fetch request with offset out of
range: [9266]
2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils
Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host=host2:9092,
topic=topic5, partition=4} Got fetch request with offset out of range:
[9266]
2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils
Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host1:9092,
topic=topic6, partition=4} Got fetch request with offset out of range:
[1011584]
2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils
Thread-15-kafka-spout-executor[18 18] [WARN]
Partition{host=host2:9092, topic=topic6, partition=10} Got fetch
request with offset out of range: [9248]
出于某种原因,相同的常量偏移值被用于不同主题的相同分区。
我启用了 DEBUG 模式并更准确地观察了日志文件。
2018-05-29 14:37:03.573 o.a.s.k.PartitionManager
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed
offset (1572936) to ZK for Partition{host=host3:9092, topic=topic1,
partition=8} for topology: topology1
2018-05-29 14:37:03.577 o.a.s.k.PartitionManager
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed
offset (1572936) to ZK for Partition{host=host1:9092, topic=topic2,
partition=8} for topology: topology1
2018-05-29 14:37:03.578 o.a.s.k.PartitionManager
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed
offset (1572936) to ZK for Partition{host=host2:9092, topic=topic3,
partition=8} for topology: topology1
2018-05-29 14:38:07.581 o.a.s.k.PartitionManager
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed
offset (61292573) to ZK for Partition{host=host1:9092, topic=topic4,
partition=8} for topology: topology1
2018-05-29 14:38:07.582 o.a.s.k.PartitionManager
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed
offset (61292573) to ZK for Partition{host=host2:9092, topic=topic5,
partition=8} for topology: topology1
2018-05-29 14:38:07.584 o.a.s.k.PartitionManager
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed
offset (61292573) to ZK for Partition{host=host3:9092, topic=topic6,
partition=8} for topology: topology1
我注意到所有主题的一部分被分成了两个独立的组。每组由 31 个主题组成。每个组中的所有主题都对每个分区使用相同的偏移值。然而,该值不是恒定的,而是在 8 个不同的值之间变化。这 8 个值中的每一个对于小组中的特定主题都是正确的。此外,这些值中的每一个都随着时间的推移而增长,并且所有主题都同步更新。
每个组中的大多数主题(62 个中的 55 个)都有相应的 'offset out or range' 警告消息,但具有常量值。其他 7 个主题在没有警告消息的情况下继续正常工作,但它们的偏移值也在发生变化。
我查看了 storm-kafka
的源代码并注意到 useStartOffsetTimeIfOffsetOutOfRange
标志在我们的情况下不起作用,因为我们没有失败的元组并且 kafka 偏移量小于 _emittedToOffset
。因此,相同的 WARN 消息会一次又一次地被记录。
} catch (TopicOffsetOutOfRangeException e) {
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
// fetch failed, so don't update the fetch metrics
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
_lostMessageCount.incrBy(omitted.size());
}
_pending.headMap(offset).clear();
LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
}
if (offset > _emittedToOffset) {
_lostMessageCount.incrBy(offset - _emittedToOffset);
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
}
return;
}
但是我不明白 _emittedToOffset
怎么可能得到相同的值
针对不同的主题。您可能知道为什么会发生这种情况吗?
当 Kafka 代理失败时,storm-kafka 源代码中存在一个错误。这里有对应的JIRA ticket and pull request和fix.
我们的拓扑使用 KafkaSpout
从 kafka 主题中获取消息。我们有约 150 个主题,约 12 个分区,8 个风暴执行器和 2 个风暴节点上的任务。
Storm 版本 1.0.5,Kafka 代理版本 10.0.2,Kafka 客户端版本 0.9.0.1。我们不会删除 Kafka 主题。
在某个时刻,我在 worker.log
中观察到大量重复的 WARN 消息2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition{host1:9092, topic=topic_1, partition=10} Got fetch request with offset out of range: [9248]
2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host=host2:9092, topic=topic_2, partition=0} Got fetch request with offset out of range: [22650006]
2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host=host3:9092, topic=topic_3, partition=4} Got fetch request with offset out of range: [1011584]
2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host1:9092, topic=topic4, partition=4} Got fetch request with offset out of range: [9266]
2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition{host=host2:9092, topic=topic5, partition=4} Got fetch request with offset out of range: [9266]
2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition{host1:9092, topic=topic6, partition=4} Got fetch request with offset out of range: [1011584]
2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition{host=host2:9092, topic=topic6, partition=10} Got fetch request with offset out of range: [9248]
出于某种原因,相同的常量偏移值被用于不同主题的相同分区。
我启用了 DEBUG 模式并更准确地观察了日志文件。
2018-05-29 14:37:03.573 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host3:9092, topic=topic1, partition=8} for topology: topology1
2018-05-29 14:37:03.577 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host1:9092, topic=topic2, partition=8} for topology: topology1
2018-05-29 14:37:03.578 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition{host=host2:9092, topic=topic3, partition=8} for topology: topology1
2018-05-29 14:38:07.581 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host1:9092, topic=topic4, partition=8} for topology: topology1
2018-05-29 14:38:07.582 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host2:9092, topic=topic5, partition=8} for topology: topology1
2018-05-29 14:38:07.584 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition{host=host3:9092, topic=topic6, partition=8} for topology: topology1
我注意到所有主题的一部分被分成了两个独立的组。每组由 31 个主题组成。每个组中的所有主题都对每个分区使用相同的偏移值。然而,该值不是恒定的,而是在 8 个不同的值之间变化。这 8 个值中的每一个对于小组中的特定主题都是正确的。此外,这些值中的每一个都随着时间的推移而增长,并且所有主题都同步更新。 每个组中的大多数主题(62 个中的 55 个)都有相应的 'offset out or range' 警告消息,但具有常量值。其他 7 个主题在没有警告消息的情况下继续正常工作,但它们的偏移值也在发生变化。
我查看了 storm-kafka
的源代码并注意到 useStartOffsetTimeIfOffsetOutOfRange
标志在我们的情况下不起作用,因为我们没有失败的元组并且 kafka 偏移量小于 _emittedToOffset
。因此,相同的 WARN 消息会一次又一次地被记录。
} catch (TopicOffsetOutOfRangeException e) {
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
// fetch failed, so don't update the fetch metrics
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
_lostMessageCount.incrBy(omitted.size());
}
_pending.headMap(offset).clear();
LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
}
if (offset > _emittedToOffset) {
_lostMessageCount.incrBy(offset - _emittedToOffset);
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
}
return;
}
但是我不明白 _emittedToOffset
怎么可能得到相同的值
针对不同的主题。您可能知道为什么会发生这种情况吗?
当 Kafka 代理失败时,storm-kafka 源代码中存在一个错误。这里有对应的JIRA ticket and pull request和fix.