使用 Kafka 提交问题

Commit issue with Kafka

我正在开发一个模块,要求有一个生产者,我们使用 kafka 作为数据生产队列并将其提供给消费者。

现在在消费者中,我正在尝试实现至少一次消息传递方案。

为此,我必须汇集来自 kafka 的消息,然后消费者 those.After 消费我正在调用 consumer.commitAsync(offset,Callback).

我想知道会发生什么

案例一)。当从未调用 commitAsync() api 时(假设在调用此 api 之前出现异常)。在我的例子中,我假设消息将再次发送给消费者;但它不是 happening.Consumer 永远不会再获取该数据。

案例2)。如果消费者重新启动。

下面是消费者设置属性的代码片段

private Properties getConsumerProperties() {
        final Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "server");
        props.put(GROUP_ID_CONFIG, "groupName");
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(HEARTBEAT_INTERVAL_MS_CONFIG, heartBeatinterval);
        props.put(METADATA_MAX_AGE_CONFIG, metaDataMaxAge);
        props.put(SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return props;
    }

现在在consumer的基础上,有一些属性设置;我有 3 个主题并为每个主题创建 3 个消费者(因为有 3 个分区和 3 个卡夫卡经纪人)。

为了消耗数据...我根据一些 属性 从 kafka 收到的数据包识别数据包..并将其传递给相关主题(我为不同的主题采用了不同的线程池并根据数据包中的 属性 创建任务并提交给线程池。在任务中,处理后我调用 consumer.commitAsync(offset,callback).

如果某些数据包没有调用 commitAsync,我期待从 kafka 再次提取相同的消息......但令我惊讶的是它没有回来......我错过了吗something.Is我们还需要在 apache-kafka 中为 At-Least-One 做一些设置。

求推荐。

您的问题中有几件事需要解决。

在我获得有关如何实现至少一次行为的建议之前,我将尝试解决这两种情况:

Case 1). when commitAsync() api is never called(suppose there was an exception just before calling this api).In my case,I was supposing the message will be pumped again to consumer; but it is not happening.Consumer never get that data again.

你的消费者没有得到旧数据的原因可能是因为 enable.auto.commit 属性,默认设置为 true 并且会定期提交偏移量的背景。因此,后续运行的消费者将找到一个可用的偏移量,并等待新的 data/messages 到达。

Case 2). if the consumer reboots.

这也是类似的,即如果消费者在重启后找到一个提交的偏移量,它将从该偏移量开始消费,无论该偏移量是否由于 enable.auto.commit 属性 而自动提交设置为 true 或通过显式调用 commitAsync()/commitSync()

现在,转到关于如何实现至少一次行为的部分 - 我可以想到以下两种方法

  1. 如果您想控制提交偏移量,请将 "enable.auto.commit" 属性 设置为 false,然后重试调用 commitSync()commitAsync()在回调函数中处理。

注意:选择同步还是异步提交将取决于您的延迟预算和任何其他要求。所以,这里就不过多介绍这些细节了。

  1. 另一种选择是利用自动偏移提交功能,即将 enable.auto.commit 设置为 true 并将 auto.commit.interval.ms 设置为可接受的数字(同样,根据您对提交频率的要求你喜欢提交偏移量)。

我认为 Kafka 的默认行为以至少一次语义为中心,因此它应该相当简单。

希望对您有所帮助!