Spring Cloud Stream Kafka 错误通道
Spring Cloud Stream Kafka Error channel
我正在尝试设置绑定以将 Kafka 消息从 Spring Integration errorChannel 转发到自定义通道(用于集中错误处理)。
错误消息被发送到配置的通道,但它们作为 GenericMessage
到达,带有 byte[] 有效负载,其中包含异常详细信息和失败的消息。
我的配置:
spring:
cloud:
stream:
kafka:
bindings:
accountOut.producer:
sync: true
binder:
autoCreateTopics: false
headers:
- spanId
- spanTraceId
- spanSampled
- spanParentSpanId
- spanName
- spanFlags
- eventType
- Authorization
bindings:
error:
destination: test-error
accountOut:
producer.partitionKeyExpression: payload.key
content-type: application/json
destination: account
kafka:
producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
我正在用@StreamListener(target = "kieran-error")
收听
消费者配置 @Input("kieran-error") SubscribableChannel
正在阅读 the docs,我期待消息到达 ErrorMessage
。有什么办法可以实现吗?或者将有效负载配置为作为对象到达?
我使用的版本:
- Spring 引导 1.5.8
- Spring 云 Edgware
- Kafka 11 客户端
- Spring 集成核心 4.3.12
--- 问题更新 ---
我现在意识到我可以配置 Spring 集成以通过监听 errorChannel
转发到 Kafka 主题,例如
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
return handler;
}
但是是否可以在属性 yaml 而不是代码中配置此流程?这就是所有其他 Kafka 配置所在的地方,因此在代码中配置 kafka 模板并不理想。
另一种选择是显式侦听 ErrorMessage 并发送到代码中的 kafka 输出通道:
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
outputChannel.kieranError().send(...)
}
你到底在哪里消费这样的消息?
当 enableDlq
对消费者而言为真时,您描述的消息听起来像是发送到 DLQ 主题的消息;你没有显示消费者配置所以我很难猜。
发送到 destination-specific 错误通道的 ErrorMessage
(并桥接到全局 errorChannel
可以与
一起使用
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
...
}
error:
destination:
是遗留的,旨在让用户代码将消息发送到 errorChannel
将转到该主题。
我正在尝试设置绑定以将 Kafka 消息从 Spring Integration errorChannel 转发到自定义通道(用于集中错误处理)。
错误消息被发送到配置的通道,但它们作为 GenericMessage
到达,带有 byte[] 有效负载,其中包含异常详细信息和失败的消息。
我的配置:
spring:
cloud:
stream:
kafka:
bindings:
accountOut.producer:
sync: true
binder:
autoCreateTopics: false
headers:
- spanId
- spanTraceId
- spanSampled
- spanParentSpanId
- spanName
- spanFlags
- eventType
- Authorization
bindings:
error:
destination: test-error
accountOut:
producer.partitionKeyExpression: payload.key
content-type: application/json
destination: account
kafka:
producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
我正在用@StreamListener(target = "kieran-error")
收听
消费者配置 @Input("kieran-error") SubscribableChannel
正在阅读 the docs,我期待消息到达 ErrorMessage
。有什么办法可以实现吗?或者将有效负载配置为作为对象到达?
我使用的版本:
- Spring 引导 1.5.8
- Spring 云 Edgware
- Kafka 11 客户端
- Spring 集成核心 4.3.12
--- 问题更新 ---
我现在意识到我可以配置 Spring 集成以通过监听 errorChannel
转发到 Kafka 主题,例如
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
return handler;
}
但是是否可以在属性 yaml 而不是代码中配置此流程?这就是所有其他 Kafka 配置所在的地方,因此在代码中配置 kafka 模板并不理想。
另一种选择是显式侦听 ErrorMessage 并发送到代码中的 kafka 输出通道:
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
outputChannel.kieranError().send(...)
}
你到底在哪里消费这样的消息?
当 enableDlq
对消费者而言为真时,您描述的消息听起来像是发送到 DLQ 主题的消息;你没有显示消费者配置所以我很难猜。
发送到 destination-specific 错误通道的 ErrorMessage
(并桥接到全局 errorChannel
可以与
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
...
}
error:
destination:
是遗留的,旨在让用户代码将消息发送到 errorChannel
将转到该主题。