在 spring 云流中使用 PollableMessageSource 输入时如何使用 avro 本机解码器?

How to use avro native decoder when using a PollableMessageSource input in spring cloud stream?

我正在使用 PollableMessageSource 输入来读取 Kafka 主题。有关该主题的消息在 Avro 中。 use-native-decoding 在发布这些消息时设置为 true。

这就是我的投票方式:

pollableChannels.inputChannel().poll(this::processorMethodName,
        new ParameterizedTypeReference<TypeClassName>() {
        });

pollableChannels只是这个接口的注入实例:

public interface PollableChannels {
  @Input("input-channel")
  PollableMessageSource inputChannel();
}

看到TypeClassName格式不对(嵌套的objects被误置为null)后,开始调试poll方法,发现是依赖于 contentType header 到 select 的转换器,并且由于尚未设置(因为消息已被本地编码),它会退回到使用 ApplicationJsonMessageMarshallingConverter这显然不是正确的选择。

如果我使用常规的 streamListener,use-native-decoding 配置 属性 很正常,所以消息似乎可以正常发布。

因此,我的主要问题是如何在使用可轮询消费者时强制进行本机解码? 我的 borader 问题可能是询问 spring.cloud.stream.bindings.channel-name.consumer 下的属性在使用可轮询消费者时是否得到尊重?

Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1

更新:

相关配置:

spring:
  cloud.stream:
    bindings:  
      input-channel:
        content-type: application/*+avro
        destination: "topic-name"
        group: "group-name"
        consumer:
          partitioned: true
          concurrency: 3
          max-attempts: 1
          use-native-decoding: true
    kafka:
      binder:
        configuration:
          key.serializer: org.apache.kafka.common.serialization.StringSerializer
          value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

ParameterzedTypeReference。参数旨在帮助消息转换器将有效负载转换为所需的类型。使用本机解码时,“转换”由反序列化器完成,不需要转换。

因此,只需删除 poll() 方法的第二个参数即可跳过转换。

也就是说,从版本 3.0.8(和 Spring Framework 5.2.9)开始,转换是 no-op,如下例所示。

但是,省略参数以避免任何转换尝试仍然更有效。

else if (targetClass.isInstance(payload)) {
    return payload;
}

我刚刚测试了它没有任何问题(在 3.0.8 上测试过,但我不认为这方面有任何变化)。事实上,对于这种情况,您甚至不需要 useNativeDecoding

public class Foo {

    private String bar;

    public Foo() {
    }

    public Foo(String bar) {
        this.bar = bar;
    }

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}


@SpringBootApplication
@EnableBinding(Polled.class)
public class So64554618Application {

    public static void main(String[] args) {
        SpringApplication.run(So64554618Application.class, args);
    }

    @Autowired
    PollableMessageSource source;


    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input", "{\"bar\":\"baz\"}".getBytes());
            Thread.sleep(5_000);
            source.poll(msg -> {
                System.out.println(msg);
            }, new ParameterizedTypeReference<Foo>() { });
        };
    }

}

interface Polled {

    @Input
    PollableMessageSource input();

}
#spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.group=foo

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
GenericMessage [payload=Foo [bar=baz], headers={kafka_offset=2, ...