在 spring 云流中使用 PollableMessageSource 输入时如何使用 avro 本机解码器?
How to use avro native decoder when using a PollableMessageSource input in spring cloud stream?
我正在使用 PollableMessageSource
输入来读取 Kafka 主题。有关该主题的消息在 Avro 中。 use-native-decoding
在发布这些消息时设置为 true。
这就是我的投票方式:
pollableChannels.inputChannel().poll(this::processorMethodName,
new ParameterizedTypeReference<TypeClassName>() {
});
pollableChannels
只是这个接口的注入实例:
public interface PollableChannels {
@Input("input-channel")
PollableMessageSource inputChannel();
}
看到TypeClassName
格式不对(嵌套的objects被误置为null)后,开始调试poll
方法,发现是依赖于 contentType
header 到 select 的转换器,并且由于尚未设置(因为消息已被本地编码),它会退回到使用 ApplicationJsonMessageMarshallingConverter
这显然不是正确的选择。
如果我使用常规的 streamListener,use-native-decoding
配置 属性 很正常,所以消息似乎可以正常发布。
因此,我的主要问题是如何在使用可轮询消费者时强制进行本机解码?
我的 borader 问题可能是询问 spring.cloud.stream.bindings.channel-name.consumer
下的属性在使用可轮询消费者时是否得到尊重?
Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1
更新:
相关配置:
spring:
cloud.stream:
bindings:
input-channel:
content-type: application/*+avro
destination: "topic-name"
group: "group-name"
consumer:
partitioned: true
concurrency: 3
max-attempts: 1
use-native-decoding: true
kafka:
binder:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
ParameterzedTypeReference
。参数旨在帮助消息转换器将有效负载转换为所需的类型。使用本机解码时,“转换”由反序列化器完成,不需要转换。
因此,只需删除 poll()
方法的第二个参数即可跳过转换。
也就是说,从版本 3.0.8(和 Spring Framework 5.2.9)开始,转换是 no-op,如下例所示。
但是,省略参数以避免任何转换尝试仍然更有效。
else if (targetClass.isInstance(payload)) {
return payload;
}
我刚刚测试了它没有任何问题(在 3.0.8 上测试过,但我不认为这方面有任何变化)。事实上,对于这种情况,您甚至不需要 useNativeDecoding
。
public class Foo {
private String bar;
public Foo() {
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
@SpringBootApplication
@EnableBinding(Polled.class)
public class So64554618Application {
public static void main(String[] args) {
SpringApplication.run(So64554618Application.class, args);
}
@Autowired
PollableMessageSource source;
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "{\"bar\":\"baz\"}".getBytes());
Thread.sleep(5_000);
source.poll(msg -> {
System.out.println(msg);
}, new ParameterizedTypeReference<Foo>() { });
};
}
}
interface Polled {
@Input
PollableMessageSource input();
}
#spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.group=foo
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
GenericMessage [payload=Foo [bar=baz], headers={kafka_offset=2, ...
我正在使用 PollableMessageSource
输入来读取 Kafka 主题。有关该主题的消息在 Avro 中。 use-native-decoding
在发布这些消息时设置为 true。
这就是我的投票方式:
pollableChannels.inputChannel().poll(this::processorMethodName,
new ParameterizedTypeReference<TypeClassName>() {
});
pollableChannels
只是这个接口的注入实例:
public interface PollableChannels {
@Input("input-channel")
PollableMessageSource inputChannel();
}
看到TypeClassName
格式不对(嵌套的objects被误置为null)后,开始调试poll
方法,发现是依赖于 contentType
header 到 select 的转换器,并且由于尚未设置(因为消息已被本地编码),它会退回到使用 ApplicationJsonMessageMarshallingConverter
这显然不是正确的选择。
如果我使用常规的 streamListener,use-native-decoding
配置 属性 很正常,所以消息似乎可以正常发布。
因此,我的主要问题是如何在使用可轮询消费者时强制进行本机解码?
我的 borader 问题可能是询问 spring.cloud.stream.bindings.channel-name.consumer
下的属性在使用可轮询消费者时是否得到尊重?
Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1
更新:
相关配置:
spring:
cloud.stream:
bindings:
input-channel:
content-type: application/*+avro
destination: "topic-name"
group: "group-name"
consumer:
partitioned: true
concurrency: 3
max-attempts: 1
use-native-decoding: true
kafka:
binder:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
ParameterzedTypeReference
。参数旨在帮助消息转换器将有效负载转换为所需的类型。使用本机解码时,“转换”由反序列化器完成,不需要转换。
因此,只需删除 poll()
方法的第二个参数即可跳过转换。
也就是说,从版本 3.0.8(和 Spring Framework 5.2.9)开始,转换是 no-op,如下例所示。
但是,省略参数以避免任何转换尝试仍然更有效。
else if (targetClass.isInstance(payload)) {
return payload;
}
我刚刚测试了它没有任何问题(在 3.0.8 上测试过,但我不认为这方面有任何变化)。事实上,对于这种情况,您甚至不需要 useNativeDecoding
。
public class Foo {
private String bar;
public Foo() {
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
@SpringBootApplication
@EnableBinding(Polled.class)
public class So64554618Application {
public static void main(String[] args) {
SpringApplication.run(So64554618Application.class, args);
}
@Autowired
PollableMessageSource source;
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("input", "{\"bar\":\"baz\"}".getBytes());
Thread.sleep(5_000);
source.poll(msg -> {
System.out.println(msg);
}, new ParameterizedTypeReference<Foo>() { });
};
}
}
interface Polled {
@Input
PollableMessageSource input();
}
#spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.bindings.input.group=foo
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
GenericMessage [payload=Foo [bar=baz], headers={kafka_offset=2, ...