Spring 使用本机编码的 Cloud Stream 动态目标 Avro 无法正常工作
Spring Cloud Stream dynamic destinations Avro with the native encoding is not working
我一直在尝试使用 Spring Cloud Stream 的动态目标功能来发布 Avro 格式的消息。但是,由于我使用的是原生编码(Confluent Avro serializer),消息转换器无法处理这种情况。显然,当我使用静态目标时,我能够通过在 "bindings" 级别使用 use-native-encoding: true
参数来管理本机编码。但是,对于动态目的地,我似乎没有这样的能力。
private boolean publishMessage(byte[] record, String target, String contentType, Schema schema) {
return resolver.resolveDestination(target)
.send(MessageBuilder
.createMessage(record, new MessageHeaders(
Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
如果我使用以下内容类型为 "application/*+avro" 的方法和 byte [] 格式的记录,将抛出以下异常:
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5c778504]; nested exception is org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: \"bytes\"
如果您错过本机编码 属性。
通常会发生此异常
如果我在使用以下方法发布消息之前尝试将字节数组反序列化为通用记录,则无法为其找到合适的消息转换器。
public static GenericRecord bytesToGenericAvro(byte[] bytes, Schema schema) {
DatumReader<GenericRecord>
datumReader = new GenericDatumReader<>(schema);
GenericRecord record = null;
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
bais.reset();
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(bais, null);
try {
record = datumReader.read(null, binaryDecoder);
} catch (IOException e) {
log.error("Unable to deserialize byte array to avro generic record", e.getMessage());
} finally {
try {
bais.close();
} catch (IOException e) {
log.warn("Unable to close ByteArrayInputStream", e.getMessage());
}
}
return record;
}
更新:
添加此 bean 后仍然面临同样的问题。 Spring Cloud Stream 尝试将消息转换为 Avro 时抛出异常!
@Bean
public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
producerProperties.setUseNativeEncoding(true);
producerProperties.setErrorChannelEnabled(true);
producerProperties.setPartitionCount(3);
});
}
异常:
failed to send Message to channel 'output1'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={...}, headers={contentType=application/*+avro, id=c22bf171-c6ae-cedb-b0be-3aa0fcbdf762, timestamp=1567053746112}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:388)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:422)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.example.controller.PublisherController.publishMessage(PublisherController.java:90)
at com.example.controller.PublisherController.replayRecord(PublisherController.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:853)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1587)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
您可以修改动态绑定的绑定属性,添加 NewDestinationBindingCallback
bean 并将其传递给解析器。参见 the documentation。
If the channel names are known in advance, you can configure the producer properties as with any other destination. Alternatively, if you register a NewDestinationBindingCallback<>
bean, it is invoked just before the binding is created. The callback takes the generic type of the extended producer properties used by the binder. It has one method:
void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
The following example shows how to use the RabbitMQ binder:
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
If you need to support dynamic destinations with multiple binder types, use Object for the generic type and cast the extended argument as needed.
编辑
这是解析器中的一个错误;在创建和配置通道之前,不会调用回调来更新属性。它适用于大多数属性,但不是这个。
这是一个解决方法:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57688303Application {
public static void main(String[] args) {
SpringApplication.run(So57688303Application.class, args);
}
@Bean
public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
producerProperties.setUseNativeEncoding(true);
producerProperties.setErrorChannelEnabled(true);
producerProperties.setPartitionCount(3);
extendedProducerProperties.getConfiguration().put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
MySerializer.class.getName());
});
}
@Bean
public ApplicationRunner runner(BinderAwareChannelResolver resolver) {
return args -> {
MessageChannel channel = resolver.resolveDestination("dynamic");
((AbstractMessageChannel) channel).removeInterceptor(0); // only need to do this on the first resolution
channel.send(new GenericMessage<>("foo"));
};
}
public static class MySerializer implements Serializer<String> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, String data) {
System.out.println("In my serializer with data of type " + data.getClass().getSimpleName());
return data.getBytes();
}
@Override
public void close() {
}
}
}
和
In my serializer with data of type String
我一直在尝试使用 Spring Cloud Stream 的动态目标功能来发布 Avro 格式的消息。但是,由于我使用的是原生编码(Confluent Avro serializer),消息转换器无法处理这种情况。显然,当我使用静态目标时,我能够通过在 "bindings" 级别使用 use-native-encoding: true
参数来管理本机编码。但是,对于动态目的地,我似乎没有这样的能力。
private boolean publishMessage(byte[] record, String target, String contentType, Schema schema) {
return resolver.resolveDestination(target)
.send(MessageBuilder
.createMessage(record, new MessageHeaders(
Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
如果我使用以下内容类型为 "application/*+avro" 的方法和 byte [] 格式的记录,将抛出以下异常:
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5c778504]; nested exception is org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: \"bytes\"
如果您错过本机编码 属性。
通常会发生此异常如果我在使用以下方法发布消息之前尝试将字节数组反序列化为通用记录,则无法为其找到合适的消息转换器。
public static GenericRecord bytesToGenericAvro(byte[] bytes, Schema schema) {
DatumReader<GenericRecord>
datumReader = new GenericDatumReader<>(schema);
GenericRecord record = null;
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
bais.reset();
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(bais, null);
try {
record = datumReader.read(null, binaryDecoder);
} catch (IOException e) {
log.error("Unable to deserialize byte array to avro generic record", e.getMessage());
} finally {
try {
bais.close();
} catch (IOException e) {
log.warn("Unable to close ByteArrayInputStream", e.getMessage());
}
}
return record;
}
更新: 添加此 bean 后仍然面临同样的问题。 Spring Cloud Stream 尝试将消息转换为 Avro 时抛出异常!
@Bean
public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
producerProperties.setUseNativeEncoding(true);
producerProperties.setErrorChannelEnabled(true);
producerProperties.setPartitionCount(3);
});
}
异常:
failed to send Message to channel 'output1'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={...}, headers={contentType=application/*+avro, id=c22bf171-c6ae-cedb-b0be-3aa0fcbdf762, timestamp=1567053746112}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:388)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:422)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.example.controller.PublisherController.publishMessage(PublisherController.java:90)
at com.example.controller.PublisherController.replayRecord(PublisherController.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:853)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1587)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
您可以修改动态绑定的绑定属性,添加 NewDestinationBindingCallback
bean 并将其传递给解析器。参见 the documentation。
If the channel names are known in advance, you can configure the producer properties as with any other destination. Alternatively, if you register a
NewDestinationBindingCallback<>
bean, it is invoked just before the binding is created. The callback takes the generic type of the extended producer properties used by the binder. It has one method:
void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
The following example shows how to use the RabbitMQ binder:
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
If you need to support dynamic destinations with multiple binder types, use Object for the generic type and cast the extended argument as needed.
编辑
这是解析器中的一个错误;在创建和配置通道之前,不会调用回调来更新属性。它适用于大多数属性,但不是这个。
这是一个解决方法:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So57688303Application {
public static void main(String[] args) {
SpringApplication.run(So57688303Application.class, args);
}
@Bean
public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
producerProperties.setUseNativeEncoding(true);
producerProperties.setErrorChannelEnabled(true);
producerProperties.setPartitionCount(3);
extendedProducerProperties.getConfiguration().put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
MySerializer.class.getName());
});
}
@Bean
public ApplicationRunner runner(BinderAwareChannelResolver resolver) {
return args -> {
MessageChannel channel = resolver.resolveDestination("dynamic");
((AbstractMessageChannel) channel).removeInterceptor(0); // only need to do this on the first resolution
channel.send(new GenericMessage<>("foo"));
};
}
public static class MySerializer implements Serializer<String> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, String data) {
System.out.println("In my serializer with data of type " + data.getClass().getSimpleName());
return data.getBytes();
}
@Override
public void close() {
}
}
}
和
In my serializer with data of type String