w=10=sh cload stream kafka streams FOR
spring cloud stream kafka streams DLQ
我正在使用 Apache Kafka 2.7.0 和 Spring Cloud Stream Kafka Streams。
在我的 Spring Cloud Stream (Kafka Streams) 应用程序中,我已将我的 application.yml 配置为在输入主题中的消息出现反序列化错误时使用 sendToDlq 机制:
spring:
cloud:
stream:
function:
definition: processor
bindings:
processor-in-0:
destination: input-topic
consumer:
dlqName: input-topic-dlq
processor-out-0:
destination: output-topic
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
configuration:
metrics.recording.level: DEBUG
brokers:
- localhost:9092
我启动了我的应用程序,但我没有看到这个主题存在。文档指出,如果 DLQ 主题不存在,将创建该主题。
如果我尝试使用 DLQ 主题,我会收到如下错误:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
[2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
此刻,当我查询 Zookeeper ls /brokers/topics 时,我看到创建了主题。
现在,我尝试 POST 向输入主题发送非 JSON 消息(我的默认反序列化器是 JSON)。
但是我在创建的 input-topic-dlq 主题中看不到任何消息。
奇怪的是我可以在默认的“error.input-topic-dlq.appId”主题中看到消息。
我是不是做错了什么?
我设法弄明白了。 Spring Cloud Stream Kafka Streams Binder 的当前文档中似乎有错字。
绑定的目标应该在 spring.cloud.streams.bindings
级别,就像您已经拥有的那样,但是特定于实现的消费者属性应该在 spring.cloud.streams.kafka.streams.bindings
级别。
因此您的配置应如下所示:
spring:
cloud:
stream:
function:
definition: processor
bindings:
processor-in-0:
destination: input-topic
processor-out-0:
destination: output-topic
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
bindings:
processor-in-0:
consumer:
dlqName: input-topic-dlq
configuration:
metrics.recording.level: DEBUG
brokers:
- localhost:9092
我正在使用 Apache Kafka 2.7.0 和 Spring Cloud Stream Kafka Streams。
在我的 Spring Cloud Stream (Kafka Streams) 应用程序中,我已将我的 application.yml 配置为在输入主题中的消息出现反序列化错误时使用 sendToDlq 机制:
spring:
cloud:
stream:
function:
definition: processor
bindings:
processor-in-0:
destination: input-topic
consumer:
dlqName: input-topic-dlq
processor-out-0:
destination: output-topic
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
configuration:
metrics.recording.level: DEBUG
brokers:
- localhost:9092
我启动了我的应用程序,但我没有看到这个主题存在。文档指出,如果 DLQ 主题不存在,将创建该主题。
如果我尝试使用 DLQ 主题,我会收到如下错误:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
[2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
此刻,当我查询 Zookeeper ls /brokers/topics 时,我看到创建了主题。
现在,我尝试 POST 向输入主题发送非 JSON 消息(我的默认反序列化器是 JSON)。
但是我在创建的 input-topic-dlq 主题中看不到任何消息。
奇怪的是我可以在默认的“error.input-topic-dlq.appId”主题中看到消息。
我是不是做错了什么?
我设法弄明白了。 Spring Cloud Stream Kafka Streams Binder 的当前文档中似乎有错字。
绑定的目标应该在 spring.cloud.streams.bindings
级别,就像您已经拥有的那样,但是特定于实现的消费者属性应该在 spring.cloud.streams.kafka.streams.bindings
级别。
因此您的配置应如下所示:
spring:
cloud:
stream:
function:
definition: processor
bindings:
processor-in-0:
destination: input-topic
processor-out-0:
destination: output-topic
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
bindings:
processor-in-0:
consumer:
dlqName: input-topic-dlq
configuration:
metrics.recording.level: DEBUG
brokers:
- localhost:9092