将总和存储在 KTable<String, Integer>
Store sum in KTable<String, Integer>
我正在尝试计算某个主题的信封数量。交易采用 avro 格式。我用this example作为参考。
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, Transaction> transactionKStream = streamsBuilder.stream(INPUT_TOPIC);
final KStream<String, Integer> envelopes = transactionKStream.filter((k, v) -> v.getProduct().toString()
.matches("C4|C5"))
.map((k, v) -> KeyValue.pair("1", v.getAmount()));
final KTable<String, Integer> amount = envelopes
.groupByKey()
.reduce((v1, v2) -> v1 + v2);
我想将总和存储在 KTable<> 中,但是当我将数据发送到输入主题时,消费者崩溃
A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
当 KTable<> 被注释掉时,它运行良好。但不要求和。
groupByKey()
使用默认序列化程序:
groupByKey()
Group the records by their current key into a
KGroupedStream while preserving the original values and default
serializers and deserializers.
您必须使用 groupByKey(Serialized<K,V> serialized)
或 groupByKey(Grouped<K,V> grouped)
。
以下应该可以解决问题:
final KTable<String, Integer> amount = envelopes
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()))
.reduce((v1, v2) -> v1 + v2);
我正在尝试计算某个主题的信封数量。交易采用 avro 格式。我用this example作为参考。
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, Transaction> transactionKStream = streamsBuilder.stream(INPUT_TOPIC);
final KStream<String, Integer> envelopes = transactionKStream.filter((k, v) -> v.getProduct().toString()
.matches("C4|C5"))
.map((k, v) -> KeyValue.pair("1", v.getAmount()));
final KTable<String, Integer> amount = envelopes
.groupByKey()
.reduce((v1, v2) -> v1 + v2);
我想将总和存储在 KTable<> 中,但是当我将数据发送到输入主题时,消费者崩溃
A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
当 KTable<> 被注释掉时,它运行良好。但不要求和。
groupByKey()
使用默认序列化程序:
groupByKey()
Group the records by their current key into a KGroupedStream while preserving the original values and default serializers and deserializers.
您必须使用 groupByKey(Serialized<K,V> serialized)
或 groupByKey(Grouped<K,V> grouped)
。
以下应该可以解决问题:
final KTable<String, Integer> amount = envelopes
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()))
.reduce((v1, v2) -> v1 + v2);