Kafka Streams - 覆盖默认的 addSink 实现/自定义生产者
Kafka Streams - override default addSink implementation / custom producer
这是我第一次 post 在这里,我不确定以前是否在这里介绍过,但这里是:我有一个 Kafka Streams 应用程序,使用处理器 API,遵循以下拓扑:
1. Consume data from an input topic (processor.addSource())
2. Inserts data into a DB (processor.addProcessor())
3. Produce its process status to an output topic (processor.addSink())
App 工作时间很长,但是,出于可追溯性目的,我需要在 kstreams 向输出主题及其 RecordMetaData(主题、分区、偏移量)生成消息时在日志中记录。
示例如下:
KEY="MY_KEY" OUTPUT_TOPIC="MY-OUTPUT-TOPIC" PARTITION="1" OFFSET="1000" STATUS="SUCCESS"
我不确定是否有办法覆盖默认的 kafka 流生产者以添加此日志记录,或者创建我自己的生产者以将其插入 addSink 进程。我通过实现自己的 ExceptionHandler (default.producer.exception.handler) 部分实现了它,但它只涵盖了异常。
提前致谢,
吉列尔梅
如果您将流应用程序配置为使用 ProducerInterceptor
,那么您应该能够获得所需的信息。具体来说,实施 onAcknowledgement() 将提供对上面列出的所有内容的访问权限。
在流应用程序中配置拦截器:
Properties props = new Properties();
// add this configuration in addition to your other streams configs
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(MyProducerInterceptor.class));
如果需要,您可以提供多个拦截器,只需添加 class 名称并将列表实现从单例更改为常规 List
。拦截器的执行遵循列表中 classes 的顺序。
编辑:为了清楚起见,您可以通过 KafkaClientSupplier interface, but IMHO using an interceptor is the cleaner approach. But which direction to go is up to you. You pass in your KafkaClientSupplier
in an overloaded Kafka Streams constructor.
覆盖 Kafka Streams 中提供的 Producer
这是我第一次 post 在这里,我不确定以前是否在这里介绍过,但这里是:我有一个 Kafka Streams 应用程序,使用处理器 API,遵循以下拓扑:
1. Consume data from an input topic (processor.addSource())
2. Inserts data into a DB (processor.addProcessor())
3. Produce its process status to an output topic (processor.addSink())
App 工作时间很长,但是,出于可追溯性目的,我需要在 kstreams 向输出主题及其 RecordMetaData(主题、分区、偏移量)生成消息时在日志中记录。
示例如下:
KEY="MY_KEY" OUTPUT_TOPIC="MY-OUTPUT-TOPIC" PARTITION="1" OFFSET="1000" STATUS="SUCCESS"
我不确定是否有办法覆盖默认的 kafka 流生产者以添加此日志记录,或者创建我自己的生产者以将其插入 addSink 进程。我通过实现自己的 ExceptionHandler (default.producer.exception.handler) 部分实现了它,但它只涵盖了异常。
提前致谢,
吉列尔梅
如果您将流应用程序配置为使用 ProducerInterceptor
,那么您应该能够获得所需的信息。具体来说,实施 onAcknowledgement() 将提供对上面列出的所有内容的访问权限。
在流应用程序中配置拦截器:
Properties props = new Properties();
// add this configuration in addition to your other streams configs
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(MyProducerInterceptor.class));
如果需要,您可以提供多个拦截器,只需添加 class 名称并将列表实现从单例更改为常规 List
。拦截器的执行遵循列表中 classes 的顺序。
编辑:为了清楚起见,您可以通过 KafkaClientSupplier interface, but IMHO using an interceptor is the cleaner approach. But which direction to go is up to you. You pass in your KafkaClientSupplier
in an overloaded Kafka Streams constructor.
Producer