在 Spring Cloud Stream Kafka 中正确管理 DLQ
Correctly manage DLQ in Spring Cloud Stream Kafka
我想使用 kafka 在 Spring Cloud Stream 中管理一个 DLQ。
application.yaml
server:
port: 8091
eureka:
client:
serviceUrl:
defaultZone: http://IP:8761/eureka
spring:
application:
name: employee-consumer
cloud:
stream:
kafka:
binder:
brokers: IP:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json
bindings:
greetings-out:
consumer:
enableDlq: true
dlqName: dead-out
kafka:
consumer:
group-id: A
正如您在我的配置中看到的那样,我启用了 dlq 并为 dlq 主题设置了一个名称。
为了测试 DLQ 行为,我对某些消息抛出异常
我的监听器组件
@StreamListener("greetings-out")
public void handleGreetingsInput(@Payload Greetings greetings) throws Exception {
logger.info("Greetings input -> {}", greetings);
if (greetings.getMessage().equals("ciao")) {
throw new Exception("eer");
}
}
这样,等于 "ciao" 的消息抛出异常,在日志中我看到它被处理了三次
2018-07-09 13:19:57.256 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener : Greetings input -> com.mitro.model.Greetings@3da9d701[timestamp=0,message=ciao]
2018-07-09 13:19:58.259 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener : Greetings input -> com.mitro.model.Greetings@5bd62aaf[timestamp=0,message=ciao]
2018-07-09 13:20:00.262 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener : Greetings input -> com.mitro.model.Greetings@c26f92b[timestamp=0,message=ciao]
2018-07-09 13:20:00.266 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.mitro.service.GreetingsListener#handleGreetingsInput[1 args]; nested exception is java.lang.Exception: eer, failedMessage=GenericMessage [payload=byte[32], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@510302cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=greetings-out, kafka_receivedTimestamp=1531142397248}]
这对我来说很好,但我不明白为什么会创建一个名为 dead-out 的主题(请看下图)。
我做错了什么?
编辑 1:(仍然没有为 DLQ 创建主题)
server:
port: 8091
eureka:
client:
serviceUrl:
defaultZone: http://IP:8761/eureka
spring:
application:
name: employee-consumer
cloud:
stream:
kafka:
streams:
binder:
serdeError: sendToDlq
binder:
brokers: IP:9092
auto-create-topics: true
bindings:
greetings-out:
destination: greetings-out
contentType: application/json
consumer:
enableDql: true
dlqName: dead-out
autoCommitOnError: true
autoCommitOffset: true
bindings:
greetings-out:
destination: greetings-out
contentType: application/json
consumer:
enableDlq: true
dlqName: dead-out
autoCommitOnError: true
autoCommitOffset: true
kafka:
consumer:
group-id: A
看起来你的属性被颠倒了;公共属性 - destination、contentType - 必须在 spring.cloud.stream.bindings
下。 kafka 特定的属性(enableDlq、dlqName)必须在 spring.clound.stream.kafka.bindings
.
下
你把它们颠倒了。
编辑
您的(修改后的)配置有两个问题。
- 打字错误
enableDql
而不是 enableDlq
- 没有群组 - 您不能与匿名消费者进行 DLQ:
Caused by: java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
这很好用:
spring:
application:
name: employee-consumer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
bindings:
input:
consumer:
enableDlq: true
dlqName: dead-out
autoCommitOnError: true
autoCommitOffset: true
bindings:
input:
group: so51247113
destination: greetings-out
contentType: application/json
和
@SpringBootApplication
@EnableBinding(Sink.class)
public class So51247113Application {
public static void main(String[] args) {
SpringApplication.run(So51247113Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in) {
System.out.println(in);
throw new RuntimeException("fail");
}
@KafkaListener(id = "foo", topics = "dead-out")
public void dlq(Message<?> in) {
System.out.println("DLQ:" + in);
}
}
我想使用 kafka 在 Spring Cloud Stream 中管理一个 DLQ。
application.yaml
server:
port: 8091
eureka:
client:
serviceUrl:
defaultZone: http://IP:8761/eureka
spring:
application:
name: employee-consumer
cloud:
stream:
kafka:
binder:
brokers: IP:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json
bindings:
greetings-out:
consumer:
enableDlq: true
dlqName: dead-out
kafka:
consumer:
group-id: A
正如您在我的配置中看到的那样,我启用了 dlq 并为 dlq 主题设置了一个名称。
为了测试 DLQ 行为,我对某些消息抛出异常
我的监听器组件
@StreamListener("greetings-out")
public void handleGreetingsInput(@Payload Greetings greetings) throws Exception {
logger.info("Greetings input -> {}", greetings);
if (greetings.getMessage().equals("ciao")) {
throw new Exception("eer");
}
}
这样,等于 "ciao" 的消息抛出异常,在日志中我看到它被处理了三次
2018-07-09 13:19:57.256 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener : Greetings input -> com.mitro.model.Greetings@3da9d701[timestamp=0,message=ciao]
2018-07-09 13:19:58.259 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener : Greetings input -> com.mitro.model.Greetings@5bd62aaf[timestamp=0,message=ciao]
2018-07-09 13:20:00.262 INFO 1 --- [container-0-C-1] com.mitro.service.GreetingsListener : Greetings input -> com.mitro.model.Greetings@c26f92b[timestamp=0,message=ciao]
2018-07-09 13:20:00.266 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.mitro.service.GreetingsListener#handleGreetingsInput[1 args]; nested exception is java.lang.Exception: eer, failedMessage=GenericMessage [payload=byte[32], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@510302cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=greetings-out, kafka_receivedTimestamp=1531142397248}]
这对我来说很好,但我不明白为什么会创建一个名为 dead-out 的主题(请看下图)。
我做错了什么?
编辑 1:(仍然没有为 DLQ 创建主题)
server:
port: 8091
eureka:
client:
serviceUrl:
defaultZone: http://IP:8761/eureka
spring:
application:
name: employee-consumer
cloud:
stream:
kafka:
streams:
binder:
serdeError: sendToDlq
binder:
brokers: IP:9092
auto-create-topics: true
bindings:
greetings-out:
destination: greetings-out
contentType: application/json
consumer:
enableDql: true
dlqName: dead-out
autoCommitOnError: true
autoCommitOffset: true
bindings:
greetings-out:
destination: greetings-out
contentType: application/json
consumer:
enableDlq: true
dlqName: dead-out
autoCommitOnError: true
autoCommitOffset: true
kafka:
consumer:
group-id: A
看起来你的属性被颠倒了;公共属性 - destination、contentType - 必须在 spring.cloud.stream.bindings
下。 kafka 特定的属性(enableDlq、dlqName)必须在 spring.clound.stream.kafka.bindings
.
你把它们颠倒了。
编辑
您的(修改后的)配置有两个问题。
- 打字错误
enableDql
而不是enableDlq
- 没有群组 - 您不能与匿名消费者进行 DLQ:
Caused by: java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
这很好用:
spring:
application:
name: employee-consumer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
bindings:
input:
consumer:
enableDlq: true
dlqName: dead-out
autoCommitOnError: true
autoCommitOffset: true
bindings:
input:
group: so51247113
destination: greetings-out
contentType: application/json
和
@SpringBootApplication
@EnableBinding(Sink.class)
public class So51247113Application {
public static void main(String[] args) {
SpringApplication.run(So51247113Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in) {
System.out.println(in);
throw new RuntimeException("fail");
}
@KafkaListener(id = "foo", topics = "dead-out")
public void dlq(Message<?> in) {
System.out.println("DLQ:" + in);
}
}