使用 Kafka 提交问题
Commit issue with Kafka
我正在开发一个模块,要求有一个生产者,我们使用 kafka 作为数据生产队列并将其提供给消费者。
现在在消费者中,我正在尝试实现至少一次消息传递方案。
为此,我必须汇集来自 kafka 的消息,然后消费者 those.After 消费我正在调用 consumer.commitAsync(offset,Callback).
我想知道会发生什么
案例一)。当从未调用 commitAsync() api 时(假设在调用此 api 之前出现异常)。在我的例子中,我假设消息将再次发送给消费者;但它不是 happening.Consumer 永远不会再获取该数据。
案例2)。如果消费者重新启动。
下面是消费者设置属性的代码片段
private Properties getConsumerProperties() {
final Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "server");
props.put(GROUP_ID_CONFIG, "groupName");
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(HEARTBEAT_INTERVAL_MS_CONFIG, heartBeatinterval);
props.put(METADATA_MAX_AGE_CONFIG, metaDataMaxAge);
props.put(SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
现在在consumer的基础上,有一些属性设置;我有 3 个主题并为每个主题创建 3 个消费者(因为有 3 个分区和 3 个卡夫卡经纪人)。
为了消耗数据...我根据一些 属性 从 kafka 收到的数据包识别数据包..并将其传递给相关主题(我为不同的主题采用了不同的线程池并根据数据包中的 属性 创建任务并提交给线程池。在任务中,处理后我调用 consumer.commitAsync(offset,callback).
如果某些数据包没有调用 commitAsync,我期待从 kafka 再次提取相同的消息......但令我惊讶的是它没有回来......我错过了吗something.Is我们还需要在 apache-kafka 中为 At-Least-One 做一些设置。
求推荐。
您的问题中有几件事需要解决。
在我获得有关如何实现至少一次行为的建议之前,我将尝试解决这两种情况:
Case 1). when commitAsync() api is never called(suppose there was an
exception just before calling this api).In my case,I was supposing the
message will be pumped again to consumer; but it is not
happening.Consumer never get that data again.
你的消费者没有得到旧数据的原因可能是因为 enable.auto.commit
属性,默认设置为 true
并且会定期提交偏移量的背景。因此,后续运行的消费者将找到一个可用的偏移量,并等待新的 data/messages 到达。
Case 2). if the consumer reboots.
这也是类似的,即如果消费者在重启后找到一个提交的偏移量,它将从该偏移量开始消费,无论该偏移量是否由于 enable.auto.commit
属性 而自动提交设置为 true
或通过显式调用 commitAsync()/commitSync()
。
现在,转到关于如何实现至少一次行为的部分 - 我可以想到以下两种方法:
- 如果您想控制提交偏移量,请将 "enable.auto.commit" 属性 设置为
false
,然后重试调用 commitSync()
或 commitAsync()
在回调函数中处理。
注意:选择同步还是异步提交将取决于您的延迟预算和任何其他要求。所以,这里就不过多介绍这些细节了。
- 另一种选择是利用自动偏移提交功能,即将
enable.auto.commit
设置为 true
并将 auto.commit.interval.ms
设置为可接受的数字(同样,根据您对提交频率的要求你喜欢提交偏移量)。
我认为 Kafka 的默认行为以至少一次语义为中心,因此它应该相当简单。
希望对您有所帮助!
我正在开发一个模块,要求有一个生产者,我们使用 kafka 作为数据生产队列并将其提供给消费者。
现在在消费者中,我正在尝试实现至少一次消息传递方案。
为此,我必须汇集来自 kafka 的消息,然后消费者 those.After 消费我正在调用 consumer.commitAsync(offset,Callback).
我想知道会发生什么
案例一)。当从未调用 commitAsync() api 时(假设在调用此 api 之前出现异常)。在我的例子中,我假设消息将再次发送给消费者;但它不是 happening.Consumer 永远不会再获取该数据。
案例2)。如果消费者重新启动。
下面是消费者设置属性的代码片段
private Properties getConsumerProperties() {
final Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "server");
props.put(GROUP_ID_CONFIG, "groupName");
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(HEARTBEAT_INTERVAL_MS_CONFIG, heartBeatinterval);
props.put(METADATA_MAX_AGE_CONFIG, metaDataMaxAge);
props.put(SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
现在在consumer的基础上,有一些属性设置;我有 3 个主题并为每个主题创建 3 个消费者(因为有 3 个分区和 3 个卡夫卡经纪人)。
为了消耗数据...我根据一些 属性 从 kafka 收到的数据包识别数据包..并将其传递给相关主题(我为不同的主题采用了不同的线程池并根据数据包中的 属性 创建任务并提交给线程池。在任务中,处理后我调用 consumer.commitAsync(offset,callback).
如果某些数据包没有调用 commitAsync,我期待从 kafka 再次提取相同的消息......但令我惊讶的是它没有回来......我错过了吗something.Is我们还需要在 apache-kafka 中为 At-Least-One 做一些设置。
求推荐。
您的问题中有几件事需要解决。
在我获得有关如何实现至少一次行为的建议之前,我将尝试解决这两种情况:
Case 1). when commitAsync() api is never called(suppose there was an exception just before calling this api).In my case,I was supposing the message will be pumped again to consumer; but it is not happening.Consumer never get that data again.
你的消费者没有得到旧数据的原因可能是因为 enable.auto.commit
属性,默认设置为 true
并且会定期提交偏移量的背景。因此,后续运行的消费者将找到一个可用的偏移量,并等待新的 data/messages 到达。
Case 2). if the consumer reboots.
这也是类似的,即如果消费者在重启后找到一个提交的偏移量,它将从该偏移量开始消费,无论该偏移量是否由于 enable.auto.commit
属性 而自动提交设置为 true
或通过显式调用 commitAsync()/commitSync()
。
现在,转到关于如何实现至少一次行为的部分 - 我可以想到以下两种方法:
- 如果您想控制提交偏移量,请将 "enable.auto.commit" 属性 设置为
false
,然后重试调用commitSync()
或commitAsync()
在回调函数中处理。
注意:选择同步还是异步提交将取决于您的延迟预算和任何其他要求。所以,这里就不过多介绍这些细节了。
- 另一种选择是利用自动偏移提交功能,即将
enable.auto.commit
设置为true
并将auto.commit.interval.ms
设置为可接受的数字(同样,根据您对提交频率的要求你喜欢提交偏移量)。
我认为 Kafka 的默认行为以至少一次语义为中心,因此它应该相当简单。
希望对您有所帮助!