Spring 云流活页夹
Spring Cloud Stream Binder
如有任何想法,我将不胜感激,我正在尝试将测试编写为;
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private CustomBindings cBindings;
/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}
@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);
}
@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {
}
}
和
spring.cloud.stream.bindings.output.destination=输出
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-模式=原始
spring.cloud.stream.bindings.output.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
仍然得到
消息处理程序发生错误 [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b];嵌套异常是 org.apache.kafka.common.errors.SerializationException:无法将 class [B 的值转换为 value.serializer 中指定的 class org.apache.kafka.common.serialization.StringSerializer,failedMessage=GenericMessage [payload=byte[8 ], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]
这对我来说没有意义
从外观上看,这不是 Kafka Streams 应用程序,而是带有 Kafka 绑定器的常规 Spring Cloud Stream 应用程序。因此,您不需要这两个属性。
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
此外,为了修复您的错误,您需要从您的配置中删除此行:spring.cloud.stream.bindings.output.producer.use-native-encoding=true
。
通过将本机编码设置为 true,您要求 Kafka 执行将依赖默认 ByteArraySerializer
的序列化。如果您真的想要本机序列化,则需要设置适当的值序列化程序 (StringSerializer
)。但由于这是一个测试,我建议您删除此 属性 并查看您的测试是否通过。
如有任何想法,我将不胜感激,我正在尝试将测试编写为;
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private CustomBindings cBindings;
/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}
@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);
}
@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {
}
}
和
spring.cloud.stream.bindings.output.destination=输出 spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-模式=原始 spring.cloud.stream.bindings.output.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
仍然得到
消息处理程序发生错误 [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b];嵌套异常是 org.apache.kafka.common.errors.SerializationException:无法将 class [B 的值转换为 value.serializer 中指定的 class org.apache.kafka.common.serialization.StringSerializer,failedMessage=GenericMessage [payload=byte[8 ], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]
这对我来说没有意义
从外观上看,这不是 Kafka Streams 应用程序,而是带有 Kafka 绑定器的常规 Spring Cloud Stream 应用程序。因此,您不需要这两个属性。
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
此外,为了修复您的错误,您需要从您的配置中删除此行:spring.cloud.stream.bindings.output.producer.use-native-encoding=true
。
通过将本机编码设置为 true,您要求 Kafka 执行将依赖默认 ByteArraySerializer
的序列化。如果您真的想要本机序列化,则需要设置适当的值序列化程序 (StringSerializer
)。但由于这是一个测试,我建议您删除此 属性 并查看您的测试是否通过。