Spring Kafka、Spring Cloud Stream 和 Avro 兼容性未知魔法字节
Spring Kafka, Spring Cloud Stream, and Avro compatibility Unknown magic byte
我在反序列化来自 Kafka 主题的消息时遇到问题。消息已使用 spring-cloud-stream 和 Apache Avro 序列化。我正在使用 Spring Kafka 阅读它们并尝试反序列化它们。如果我使用 spring-cloud 来生成和使用消息,那么我可以很好地反序列化消息。问题是当我使用 Spring Kafka 使用它们然后尝试反序列化时。
我正在使用架构注册表(用于开发的 spring-boot 架构注册表,以及生产中的 Confluent 架构),但反序列化问题似乎发生在事件调用架构注册表之前。
很难 post 这个问题的所有相关代码,所以我 post 在 git 中心的回购中编辑了它:https://github.com/robjwilkins/avro-example
我在主题中发送的对象只是一个简单的 pojo:
@Data
public class Request {
private String message;
}
在 Kafka 上生成消息的代码如下所示:
@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {
private final MessageChannels messageChannels;
@GetMapping("/produce")
public void produceMessage() {
Request request = new Request();
request.setMessage("hello world");
Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
log.debug("sending message");
messageChannels.testRequest().send(requestMessage);
}
}
和application.yaml:
spring:
application.name: avro-producer
kafka:
bootstrap-servers: localhost:9092
consumer.group-id: avro-producer
cloud:
stream:
schema-registry-client.endpoint: http://localhost:8071
schema.avro.dynamic-schema-generation-enabled: true
kafka:
binder:
brokers: ${spring.kafka.bootstrap-servers}
bindings:
test-request:
destination: test-request
contentType: application/*+avro
那我有一个消费者:
@Slf4j
@Component
public class TopicListener {
@KafkaListener(topics = {"test-request"})
public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
log.info("listenForMessage. got a message: {}", consumerRecord);
consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
}
private String asString(byte[] byteArray) {
return new String(byteArray, Charset.defaultCharset());
}
}
并且消费的项目有application.yaml配置:
spring:
application.name: avro-consumer
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: avro-consumer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url: http://localhost:8071
当消费者收到一条消息时,它会导致异常:
2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我已经完成了反序列化代码到抛出这个异常的地步
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0) {
throw new SerializationException("Unknown magic byte!");
} else {
return buffer;
}
}
这是因为反序列化器检查序列化对象(字节数组)的字节内容并期望它为 0,但事实并非如此。因此,我质疑序列化对象的 spring-cloud-stream MessageConverter 是否与我用来反序列化对象的 io.confluent 对象兼容。如果它们不兼容,我该怎么办?
感谢您的帮助。
您应该通过在配置中创建 DefaultKafkaConsumerFactory
和您的 TopicListener
bean 来显式定义反序列化器,如下所示:
@Configuration
@EnableKafka
public class TopicListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value(("${spring.kafka.consumer.group-id}"))
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.wilkins.avro.consumer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public TopicListener topicListener() {
return new TopicListener();
}
}
您可以将绑定配置为在本机使用 Kafka 序列化程序。
将生产者 属性 useNativeEncoding
设置为 true
并使用 ...producer.configuration
Kafka 属性配置序列化器。
编辑
示例:
spring:
cloud:
stream:
# Generic binding properties
bindings:
input:
consumer:
use-native-decoding: true
destination: so54448732
group: so54448732
output:
destination: so54448732
producer:
use-native-encoding: true
# Kafka-specific binding properties
kafka:
bindings:
input:
consumer:
configuration:
value.deserializer: com.example.FooDeserializer
output:
producer:
configuration:
value.serializer: com.example.FooSerializer
这个问题的症结在于生产者使用spring-cloud-stream将post消息发送到Kafka,而消费者使用spring-kaka。原因是:
- 现有系统已经完善,使用spring-cloud-stream
- 新消费者需要使用相同的方法收听多个主题,仅绑定到主题名称的 csv 列表
- 需要一次使用一组消息,而不是单独使用,因此它们的内容可以批量写入数据库。
Spring-cloud-stream 当前不允许消费者将一个侦听器绑定到多个主题,并且无法一次使用一组消息(除非我弄错了)。
我找到了一个解决方案,它不需要对使用 spring-cloud-stream 将消息发布到 Kafka 的生产者代码进行任何更改。 Spring-cloud-stream 使用 MessageConverter
来管理序列化和反序列化。在 AbstractAvroMessageConverter
中有方法:convertFromInternal
和 convertToInternal
处理转换 to/from 字节数组。我的解决方案是扩展此代码(创建扩展 AvroSchemaRegistryClientMessageConverter
的 class),因此我可以重用大部分 spring-cloud-stream 功能,但具有可访问的接口来自我的 spring-kafka KafkaListener
。然后我修改了我的 TopicListener 以使用此 class 进行转换:
转换器:
@Component
@Slf4j
public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter {
public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) {
super(schemaRegistryClient, new NoOpCacheManager());
}
public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass,
Object conversionHint) {
T result;
try {
byte[] payload = (byte[]) consumerRecord.value();
Map<String, String> headers = new HashMap<>();
consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value())));
MimeType mimeType = messageMimeType(conversionHint, headers);
if (mimeType == null) {
return null;
}
Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
@SuppressWarnings("unchecked")
DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
result = (T) reader.read(null, decoder);
}
catch (IOException e) {
throw new RuntimeException("Failed to read payload", e);
}
return result;
}
private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) {
MimeType mimeType;
try {
String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
log.debug("contentType: {}", contentType);
mimeType = MimeType.valueOf(contentType);
} catch (InvalidMimeTypeException e) {
log.error("Exception getting object MimeType from contentType header", e);
if (conversionHint instanceof MimeType) {
mimeType = (MimeType) conversionHint;
}
else {
return null;
}
}
return mimeType;
}
private String asString(byte[] byteArray) {
String theString = new String(byteArray, Charset.defaultCharset());
return theString.replace("\"", "");
}
}
修改后TopicListener
:
@Slf4j
@Component
@RequiredArgsConstructor
public class TopicListener {
private final AvroKafkaMessageConverter messageConverter;
@KafkaListener(topics = {"test-request"})
public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) {
log.info("listenForMessage. got a message: {}", consumerRecord);
Request request = messageConverter.convertFromInternal(
consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr"));
log.info("request message: {}", request.getMessage());
}
}
此解决方案一次仅使用一条消息,但可以轻松修改以使用成批消息。
完整的解决方案在这里:https://github.com/robjwilkins/avro-example/tree/develop
感谢这让我使用 nativeencoding 和 spring 节省了时间:
云:
流:
通用绑定属性
bindings:
input:
consumer:
use-native-decoding: true
destination: so54448732
group: so54448732
output:
destination: so54448732
producer:
use-native-encoding: true
Kafka 特定的绑定属性
kafka:
bindings:
input:
consumer:
configuration:
value.deserializer: com.example.FooDeserializer
output:
producer:
configuration:
value.serializer: com.example.FooSerializer
我在反序列化来自 Kafka 主题的消息时遇到问题。消息已使用 spring-cloud-stream 和 Apache Avro 序列化。我正在使用 Spring Kafka 阅读它们并尝试反序列化它们。如果我使用 spring-cloud 来生成和使用消息,那么我可以很好地反序列化消息。问题是当我使用 Spring Kafka 使用它们然后尝试反序列化时。
我正在使用架构注册表(用于开发的 spring-boot 架构注册表,以及生产中的 Confluent 架构),但反序列化问题似乎发生在事件调用架构注册表之前。
很难 post 这个问题的所有相关代码,所以我 post 在 git 中心的回购中编辑了它:https://github.com/robjwilkins/avro-example
我在主题中发送的对象只是一个简单的 pojo:
@Data
public class Request {
private String message;
}
在 Kafka 上生成消息的代码如下所示:
@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {
private final MessageChannels messageChannels;
@GetMapping("/produce")
public void produceMessage() {
Request request = new Request();
request.setMessage("hello world");
Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
log.debug("sending message");
messageChannels.testRequest().send(requestMessage);
}
}
和application.yaml:
spring:
application.name: avro-producer
kafka:
bootstrap-servers: localhost:9092
consumer.group-id: avro-producer
cloud:
stream:
schema-registry-client.endpoint: http://localhost:8071
schema.avro.dynamic-schema-generation-enabled: true
kafka:
binder:
brokers: ${spring.kafka.bootstrap-servers}
bindings:
test-request:
destination: test-request
contentType: application/*+avro
那我有一个消费者:
@Slf4j
@Component
public class TopicListener {
@KafkaListener(topics = {"test-request"})
public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
log.info("listenForMessage. got a message: {}", consumerRecord);
consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
}
private String asString(byte[] byteArray) {
return new String(byteArray, Charset.defaultCharset());
}
}
并且消费的项目有application.yaml配置:
spring:
application.name: avro-consumer
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: avro-consumer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url: http://localhost:8071
当消费者收到一条消息时,它会导致异常:
2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我已经完成了反序列化代码到抛出这个异常的地步
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0) {
throw new SerializationException("Unknown magic byte!");
} else {
return buffer;
}
}
这是因为反序列化器检查序列化对象(字节数组)的字节内容并期望它为 0,但事实并非如此。因此,我质疑序列化对象的 spring-cloud-stream MessageConverter 是否与我用来反序列化对象的 io.confluent 对象兼容。如果它们不兼容,我该怎么办?
感谢您的帮助。
您应该通过在配置中创建 DefaultKafkaConsumerFactory
和您的 TopicListener
bean 来显式定义反序列化器,如下所示:
@Configuration
@EnableKafka
public class TopicListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value(("${spring.kafka.consumer.group-id}"))
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.wilkins.avro.consumer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public TopicListener topicListener() {
return new TopicListener();
}
}
您可以将绑定配置为在本机使用 Kafka 序列化程序。
将生产者 属性 useNativeEncoding
设置为 true
并使用 ...producer.configuration
Kafka 属性配置序列化器。
编辑
示例:
spring:
cloud:
stream:
# Generic binding properties
bindings:
input:
consumer:
use-native-decoding: true
destination: so54448732
group: so54448732
output:
destination: so54448732
producer:
use-native-encoding: true
# Kafka-specific binding properties
kafka:
bindings:
input:
consumer:
configuration:
value.deserializer: com.example.FooDeserializer
output:
producer:
configuration:
value.serializer: com.example.FooSerializer
这个问题的症结在于生产者使用spring-cloud-stream将post消息发送到Kafka,而消费者使用spring-kaka。原因是:
- 现有系统已经完善,使用spring-cloud-stream
- 新消费者需要使用相同的方法收听多个主题,仅绑定到主题名称的 csv 列表
- 需要一次使用一组消息,而不是单独使用,因此它们的内容可以批量写入数据库。
Spring-cloud-stream 当前不允许消费者将一个侦听器绑定到多个主题,并且无法一次使用一组消息(除非我弄错了)。
我找到了一个解决方案,它不需要对使用 spring-cloud-stream 将消息发布到 Kafka 的生产者代码进行任何更改。 Spring-cloud-stream 使用 MessageConverter
来管理序列化和反序列化。在 AbstractAvroMessageConverter
中有方法:convertFromInternal
和 convertToInternal
处理转换 to/from 字节数组。我的解决方案是扩展此代码(创建扩展 AvroSchemaRegistryClientMessageConverter
的 class),因此我可以重用大部分 spring-cloud-stream 功能,但具有可访问的接口来自我的 spring-kafka KafkaListener
。然后我修改了我的 TopicListener 以使用此 class 进行转换:
转换器:
@Component
@Slf4j
public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter {
public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) {
super(schemaRegistryClient, new NoOpCacheManager());
}
public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass,
Object conversionHint) {
T result;
try {
byte[] payload = (byte[]) consumerRecord.value();
Map<String, String> headers = new HashMap<>();
consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value())));
MimeType mimeType = messageMimeType(conversionHint, headers);
if (mimeType == null) {
return null;
}
Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
@SuppressWarnings("unchecked")
DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
result = (T) reader.read(null, decoder);
}
catch (IOException e) {
throw new RuntimeException("Failed to read payload", e);
}
return result;
}
private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) {
MimeType mimeType;
try {
String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
log.debug("contentType: {}", contentType);
mimeType = MimeType.valueOf(contentType);
} catch (InvalidMimeTypeException e) {
log.error("Exception getting object MimeType from contentType header", e);
if (conversionHint instanceof MimeType) {
mimeType = (MimeType) conversionHint;
}
else {
return null;
}
}
return mimeType;
}
private String asString(byte[] byteArray) {
String theString = new String(byteArray, Charset.defaultCharset());
return theString.replace("\"", "");
}
}
修改后TopicListener
:
@Slf4j
@Component
@RequiredArgsConstructor
public class TopicListener {
private final AvroKafkaMessageConverter messageConverter;
@KafkaListener(topics = {"test-request"})
public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) {
log.info("listenForMessage. got a message: {}", consumerRecord);
Request request = messageConverter.convertFromInternal(
consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr"));
log.info("request message: {}", request.getMessage());
}
}
此解决方案一次仅使用一条消息,但可以轻松修改以使用成批消息。
完整的解决方案在这里:https://github.com/robjwilkins/avro-example/tree/develop
感谢这让我使用 nativeencoding 和 spring 节省了时间: 云: 流:
通用绑定属性
bindings:
input:
consumer:
use-native-decoding: true
destination: so54448732
group: so54448732
output:
destination: so54448732
producer:
use-native-encoding: true
Kafka 特定的绑定属性
kafka:
bindings:
input:
consumer:
configuration:
value.deserializer: com.example.FooDeserializer
output:
producer:
configuration:
value.serializer: com.example.FooSerializer