当我使用多个流时,Kafka Streams StreamsException
Kafka Streams StreamsException when I work with multiple streams
我正在与 Kafka Streams & Kotlin 合作开发一项服务,该服务具有针对三个主题的流。第一个有 Avro 值,另外两个有字符串值。
在我的 properties
文件中,我将 SpecificAvroSerde
作为默认值 Serde,然后我使用 Consumed.with(Serdes.String(), Serdes.String())
来使用字符串值。
val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne)
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicTwoStream = streamsBuilder
.stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
当我将以下流配置为值的默认值时,我看到 Avro 流(第一个)工作正常并使用了我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时出现异常。
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
这是发布到 topicTwo 和 topicThree 的异常:
org.apache.kafka.streams.errors.StreamsException: A serializer (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual value type (value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
PS。它必须是同一服务中的三个流,因为稍后会有连接。
感谢朋友 (Mario Boikov),当 Kafka 进行分组以产生新的 KTable
时,问题就出现了。它不知道要采用哪个序列化程序进行分组,因此它采用默认的 Serde 作为值,在我的例子中是 SpecificAvroSerde
通过为 groupByKey 提供分组所需的序列化程序解决了这个问题:
val topicTwoStream = streamsBuilder
.stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }
val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }
.mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
干杯
我正在与 Kafka Streams & Kotlin 合作开发一项服务,该服务具有针对三个主题的流。第一个有 Avro 值,另外两个有字符串值。
在我的 properties
文件中,我将 SpecificAvroSerde
作为默认值 Serde,然后我使用 Consumed.with(Serdes.String(), Serdes.String())
来使用字符串值。
val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne)
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicTwoStream = streamsBuilder
.stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
当我将以下流配置为值的默认值时,我看到 Avro 流(第一个)工作正常并使用了我在该主题上发布的内容。但是当我使用相同的配置发布到字符串值流时出现异常。
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
这是发布到 topicTwo 和 topicThree 的异常:
org.apache.kafka.streams.errors.StreamsException: A serializer (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual value type (value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
PS。它必须是同一服务中的三个流,因为稍后会有连接。
感谢朋友 (Mario Boikov),当 Kafka 进行分组以产生新的 KTable
时,问题就出现了。它不知道要采用哪个序列化程序进行分组,因此它采用默认的 Serde 作为值,在我的例子中是 SpecificAvroSerde
通过为 groupByKey 提供分组所需的序列化程序解决了这个问题:
val topicTwoStream = streamsBuilder
.stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }
val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }
.mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
干杯