Spring Cloud StreamListener @Output KStream Serdes 似乎不起作用

Spring Cloud StreamListener @Output KStream Serdes dont seem to work

我有一个 Stream Listener 作为

@StreamListener(target = "requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
    // Predicate<UUID, Event> isAccount = (key, value) ->
    // value.getEntity().getClass().equals(Account.class);

    // @formatter:off
    return events
            //.filter(isAccount)
            .peek((key, value) -> {
                log.debug("Processing {} {}", key, value);
            });
            /*
            .filter(isAccount)
            .map((key, value) -> process(value))

            .peek((key, value) -> {
                log.debug("Processed {} {}", key, value);
            });
            */
    // @formatter:on

}

其中@Input("requesti")配置如下;

spring.cloud.stream.kafka.streams.bindings.requesti.consumer.application-id=repo-event-consumer
spring.cloud.stream.bindings.requesti.destination=request
spring.cloud.stream.bindings.requesti.content-type=application/json
spring.cloud.stream.bindings.requesti.consumer.header-mode=raw

@output("responseo")配置如下

spring.cloud.stream.kafka.streams.bindings.responseo.consumer.application-id=repo-response-producer
spring.cloud.stream.bindings.responseo.destination=response
spring.cloud.stream.bindings.responseo.content-type=application/json
spring.cloud.stream.bindings.responseo.producer.header-mode=raw
spring.cloud.stream.bindings.responseo.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.responseo.producer.key-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.bindings.responseo.producer.value-serde=org.springframework.kafka.support.serializer.JsonSerde

我的处理器收到一个请求,也可以发送输出,但输出如下

[Producer clientId=repo-event-consumer-49827b40-2357-4af0-8103-228343faa59e-StreamThread-1-producer]发送记录ProducerRecord(topic=response, partition =null, headers=RecordHeaders(headers = [RecordHeader(key = TypeId, value = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97 , 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99 , 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), 键=[B@ 6a5e4294, value=[B@5a0852e1, timestamp=1551093349173) with callback org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1@336dbba5 to topic response partition 2

有几件事让我感到困惑 Producer Record id 不是 "repo-response-producer" 其次 key-serde/value-serde 没有被使用,在我看来它应该是

发送记录 ProducerRecord(topic=request, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = Key_TypeId, value = [106, 97 , 118, 97, 46, 117, 116, 105, 108, 46, 85, 85, 73, 68]), RecordHeader(key = TypeId, value = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), key=6f0f50e2-3add-4d22-a370-cac66d016af0, value=Account() with callback org.springframework.kafka.core.KafkaTemplate$$Lambda$582/533392019@85ab964 to topic request partition 2

默认的 serdeConfig 是

    spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

Repo

这是一个示例,它演示了 JsonSerde 使用 Kafka Streams 活页夹在出站上的工作:https://github.com/schacko-samples/json-serde-example。 运行 示例并确保它有效。 查看 application.yml 以了解配置详细信息。我在提供的 README 中添加了一些细节。