Spring Cloud Stream with Avro 无法正确转换字符串消息
Spring Cloud Stream with Avro cannot correctly convert String message
我遇到一个问题,源发送 GenericMessage [payload=xxxxx, ...],而接收器接收消息为 10,120,120,120,120,120。
这个问题发生在我设置了 Avro 消息转换器之后。如果我删除 Avro 消息转换器并使用 StreamListener 来处理消息转换,它将正常工作。
来源application.properties
spring.cloud.stream.bindings.toGreeting.destination=greeting
spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
接收器应用程序
server.port=8990
spring.cloud.stream.bindings.greeting.destination=greeting
消息转换器
@Configuration
@EnableSchemaRegistryClient
public class MessageConverterConfig {
@Bean
public MessageConverter topic1MessageConverter() throws IOException {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
申请Class
@SpringBootApplication
@EnableSchemaRegistryClient
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}
@EnableSchemaRegistryServer
@EnableSchemaRegistryClient
@SpringBootApplication
public class SinkApplication {
public static void main(String[] args) {
SpringApplication.run(SinkApplication.class, args);
}
}
我缺少配置吗?
谢谢。
如果您正在设置 spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro
,则需要使用 AvroSchemaRegistryClientMessageConverter
(由 SCSt 配置)并且您不必设置 explicit
转换器 topic1MessageConverter
对于 MimeType avro/bytes
。
如果你想使用这个转换器,那么你需要设置spring.cloud.stream.bindings.toGreeting.contentType=avro/bytes
。
这是一个简单的规则:
如果您只想拥有一个可以 serialize/deserialize 来自 avro 的消息转换器,并且您可以在配置 GenericRecords 期间提供模式位置,或者您的 StreamListener 方法具有一种 SpecificRecord 类型的签名。然后选择 AvroSchemaMessageConverter
,像您一样进行设置,但改用 avro/bytes
。我们保留 application/*+avro
用于模式演化支持。
因此,如果您设置 @EnableSchemaRegistryClient
,那么您将委派给外部注册表以获取您的架构。在这种情况下,您不仅需要注册中心,还需要在那里注册的模式。
默认情况下,如果启用 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
,生产者将自动注册 SpecificRecord/GenericRecord 或 Pojos 类型的任何负载。
在那种情况下,制作人实际上会将 header 设置为 application/vnd.user.v1+avro
假设您的主题是 User 并且它是第一个版本。
下游,如果您的消费者也配置了 application/*+avro
contentType,他们将能够读取此 contentType 并推断它是 subject/version 以查询模式服务器并检索适当的模式。
我遇到一个问题,源发送 GenericMessage [payload=xxxxx, ...],而接收器接收消息为 10,120,120,120,120,120。
这个问题发生在我设置了 Avro 消息转换器之后。如果我删除 Avro 消息转换器并使用 StreamListener 来处理消息转换,它将正常工作。
来源application.properties
spring.cloud.stream.bindings.toGreeting.destination=greeting
spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
接收器应用程序
server.port=8990
spring.cloud.stream.bindings.greeting.destination=greeting
消息转换器
@Configuration
@EnableSchemaRegistryClient
public class MessageConverterConfig {
@Bean
public MessageConverter topic1MessageConverter() throws IOException {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
申请Class
@SpringBootApplication
@EnableSchemaRegistryClient
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}
@EnableSchemaRegistryServer
@EnableSchemaRegistryClient
@SpringBootApplication
public class SinkApplication {
public static void main(String[] args) {
SpringApplication.run(SinkApplication.class, args);
}
}
我缺少配置吗? 谢谢。
如果您正在设置 spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro
,则需要使用 AvroSchemaRegistryClientMessageConverter
(由 SCSt 配置)并且您不必设置 explicit
转换器 topic1MessageConverter
对于 MimeType avro/bytes
。
如果你想使用这个转换器,那么你需要设置spring.cloud.stream.bindings.toGreeting.contentType=avro/bytes
。
这是一个简单的规则:
如果您只想拥有一个可以 serialize/deserialize 来自 avro 的消息转换器,并且您可以在配置 GenericRecords 期间提供模式位置,或者您的 StreamListener 方法具有一种 SpecificRecord 类型的签名。然后选择 AvroSchemaMessageConverter
,像您一样进行设置,但改用 avro/bytes
。我们保留 application/*+avro
用于模式演化支持。
因此,如果您设置 @EnableSchemaRegistryClient
,那么您将委派给外部注册表以获取您的架构。在这种情况下,您不仅需要注册中心,还需要在那里注册的模式。
默认情况下,如果启用 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
,生产者将自动注册 SpecificRecord/GenericRecord 或 Pojos 类型的任何负载。
在那种情况下,制作人实际上会将 header 设置为 application/vnd.user.v1+avro
假设您的主题是 User 并且它是第一个版本。
下游,如果您的消费者也配置了 application/*+avro
contentType,他们将能够读取此 contentType 并推断它是 subject/version 以查询模式服务器并检索适当的模式。