Kafka 复制器:ConsumerTimestampsInterceptor 与 kafka Streams?

Kafka replicator: ConsumerTimestampsInterceptor with kafka Streams?

我们正在尝试在 2 个数据中心之间复制我们的偏移量。对于单个消费者来说真的很容易,只需添加:

consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor

现在我们有一个使用 kafka-streams 的应用程序。在捆绑多个东西之后,我们不能像以前那样复制偏移量。 比如我们也试过:

kafka.streams.properties.consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor

但运气不好 谢谢!

尝试用代码而不是 属性 文件

例如制作人

config.put(
    StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
    ConsumerTimestampsInterceptor.class.getName()
);

或者,确保 kafka.streams.properties 是创建所有 StreamsConfig 属性的正确 属性 前缀

最后 Kafka Streams 删除了 group.id 参数。但是复制器仍然需要它。

    override fun configure(configs: Map<String, *>?) {
        val newConfigs = configs!! + mapOf("group.id" to configs?.get("customgroupid"))
        super.configure(newConfigs)
    }
}

然后将其添加到您的 .properties 文件中:

kafka.streams.consumer.customgroupid=${kafka.streams.group.id} # customgroupid is not removed
kafka.streams.consumer.interceptor.classes=com.myjob.mymodule.ConsumerTimestampsInterceptorConfigurator