Spring Cloud Stream DLQ 本机编码问题:有效负载不是字节 []

Spring Cloud Stream DLQ Native encoding issue: Payload is not byte[]

由于我们在消费者 (Spring Cloud Stream 2.2) 上启用了本机解码以使我们的应用程序与其他工具的集成变得困难,因此出现了以下错误:

Caused by: java.lang.IllegalArgumentException: Native decoding is used on the consumer. Payload is not byte[] and no serializer is set on the DLQ producer.
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.ensureDlqMessageCanBeProperlySerialized(KafkaMessageChannelBinder.java:1037)
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler(KafkaMessageChannelBinder.java:905)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:461)

我曾尝试为 dlq 设置本机编码,甚至设置 avro 序列化程序,但没有成功。我怀疑我没有为它使用正确的 property/address:

spring:
  cloud.stream:
    kafka:
      bindings:
        input:
          consumer:
            enableDlq: true
            dlqName: dlq # Twitter crawler dead letter queue kafka topic
            dlqProducerProperties:
              useNativeEncoding: true
              value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  1. 你需要一个密钥序列化器而不是反序列化器
  2. 属性需要进入通用 configuration
spring.cloud.stream.kafka.bindings.input.consumer.dlq-producer-properties.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.input.consumer.dlq-producer-properties.configuration.value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq: true
spring.cloud.stream.bindings.input.consumer.use-native-decoding=true