Kafka 流:class 在左连接期间抛出异常
Kafka stream : class cast exception during left join
我是卡夫卡的新手。我正在尝试将 kafka 流(命名为 inputStream)左加入到 kafka-table(命名为 detailTable),其中 kafka-流构建为:
//The consumer to consume the input topic
Consumed<String, NotifyRecipient> inputNotificationEventConsumed = Consumed
.with(Constants.CONSUMER_KEY_SERDE, Constants.CONSUMER_VALUE_SERDE);
//Now create the stream that is directly reading from the topic
KStream<NotifyKey, NotifyVal> initialInputStream =
streamsBuilder.stream(properties.getInputTopic(), inputNotificationEventConsumed);
//Now re-key the above stream for the purpose of left join
KStream<String, NotifyVal> inputStream = initialInputStream
.map((notifyKey,notifyVal) ->
KeyValue.pair(notifyVal.getId(),notifyVal)
);
而 kafka-table 是这样创建的:
//The consumer for the table
Consumed<String, Detail> notifyDetailConsumed =
Consumed.with(Serdes.String(), Constants.DET_CONSUMER_VALUE_SERDE);
//Now consume from the topic into ktable
KTable<String, Detail> detailTable = streamsBuilder
.table(properties.getDetailTopic(), notifyDetailConsumed);
现在我正在尝试将 inputStream 加入到 detailTable 作为:
//Now join
KStream<String,Pair<Long, SendCmd>> joinedStream = inputStream
.leftJoin(detailTable, valJoiner)
.filter((key,value)->value!=null);
我收到一个错误,似乎在连接期间,inputStream 的键和值被尝试投射到默认的key-serde和默认的value-serde并得到一个class投射异常。
不知道如何解决这个问题,需要帮助。
让我知道我是否应该提供更多信息。
因为您使用 map()
,键和值类型可能会发生变化,因此您需要通过 Joined.with(...)
指定正确的 Serdes 作为 .leftJoin()
的第三个参数。
我是卡夫卡的新手。我正在尝试将 kafka 流(命名为 inputStream)左加入到 kafka-table(命名为 detailTable),其中 kafka-流构建为:
//The consumer to consume the input topic
Consumed<String, NotifyRecipient> inputNotificationEventConsumed = Consumed
.with(Constants.CONSUMER_KEY_SERDE, Constants.CONSUMER_VALUE_SERDE);
//Now create the stream that is directly reading from the topic
KStream<NotifyKey, NotifyVal> initialInputStream =
streamsBuilder.stream(properties.getInputTopic(), inputNotificationEventConsumed);
//Now re-key the above stream for the purpose of left join
KStream<String, NotifyVal> inputStream = initialInputStream
.map((notifyKey,notifyVal) ->
KeyValue.pair(notifyVal.getId(),notifyVal)
);
而 kafka-table 是这样创建的:
//The consumer for the table
Consumed<String, Detail> notifyDetailConsumed =
Consumed.with(Serdes.String(), Constants.DET_CONSUMER_VALUE_SERDE);
//Now consume from the topic into ktable
KTable<String, Detail> detailTable = streamsBuilder
.table(properties.getDetailTopic(), notifyDetailConsumed);
现在我正在尝试将 inputStream 加入到 detailTable 作为:
//Now join
KStream<String,Pair<Long, SendCmd>> joinedStream = inputStream
.leftJoin(detailTable, valJoiner)
.filter((key,value)->value!=null);
我收到一个错误,似乎在连接期间,inputStream 的键和值被尝试投射到默认的key-serde和默认的value-serde并得到一个class投射异常。
不知道如何解决这个问题,需要帮助。 让我知道我是否应该提供更多信息。
因为您使用 map()
,键和值类型可能会发生变化,因此您需要通过 Joined.with(...)
指定正确的 Serdes 作为 .leftJoin()
的第三个参数。