Kafka 复制器:ConsumerTimestampsInterceptor 与 kafka Streams?
Kafka replicator: ConsumerTimestampsInterceptor with kafka Streams?
我们正在尝试在 2 个数据中心之间复制我们的偏移量。对于单个消费者来说真的很容易,只需添加:
consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
现在我们有一个使用 kafka-streams 的应用程序。在捆绑多个东西之后,我们不能像以前那样复制偏移量。
比如我们也试过:
kafka.streams.properties.consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
但运气不好
谢谢!
尝试用代码而不是 属性 文件
例如制作人
config.put(
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerTimestampsInterceptor.class.getName()
);
或者,确保 kafka.streams.properties
是创建所有 StreamsConfig 属性的正确 属性 前缀
最后 Kafka Streams 删除了 group.id 参数。但是复制器仍然需要它。
override fun configure(configs: Map<String, *>?) {
val newConfigs = configs!! + mapOf("group.id" to configs?.get("customgroupid"))
super.configure(newConfigs)
}
}
然后将其添加到您的 .properties 文件中:
kafka.streams.consumer.customgroupid=${kafka.streams.group.id} # customgroupid is not removed
kafka.streams.consumer.interceptor.classes=com.myjob.mymodule.ConsumerTimestampsInterceptorConfigurator
我们正在尝试在 2 个数据中心之间复制我们的偏移量。对于单个消费者来说真的很容易,只需添加:
consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
现在我们有一个使用 kafka-streams 的应用程序。在捆绑多个东西之后,我们不能像以前那样复制偏移量。 比如我们也试过:
kafka.streams.properties.consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
但运气不好 谢谢!
尝试用代码而不是 属性 文件
例如制作人
config.put(
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerTimestampsInterceptor.class.getName()
);
或者,确保 kafka.streams.properties
是创建所有 StreamsConfig 属性的正确 属性 前缀
最后 Kafka Streams 删除了 group.id 参数。但是复制器仍然需要它。
override fun configure(configs: Map<String, *>?) {
val newConfigs = configs!! + mapOf("group.id" to configs?.get("customgroupid"))
super.configure(newConfigs)
}
}
然后将其添加到您的 .properties 文件中:
kafka.streams.consumer.customgroupid=${kafka.streams.group.id} # customgroupid is not removed
kafka.streams.consumer.interceptor.classes=com.myjob.mymodule.ConsumerTimestampsInterceptorConfigurator