Kafka 流聚合抛出异常

Kafka streams aggregate throwing Exception

这是我的 Kafka 流代码,它使用滑动 window 对时间 window 中的所有整数数据求和。

public class KafkaWindowingLIS {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Integer uid = 1;
        long tenSeconds = 1000 * 10;

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");
        KStream<Integer, Integer> integerKStream = dataStream
                .filter((key, val) -> {               //Filter only numbers from Stream
                    try {
                        Integer.parseInt(val);
                        return true;
                    } catch (NumberFormatException e) {
                        return false;
                    }
                })
                .map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));

        TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream
                .groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer()))     //Killed my time
                .windowedBy(TimeWindows.of(tenSeconds));

        timeWindowedKStream.aggregate(
                () -> 0,
                (key, value, aggregate) -> value + aggregate)
                .toStream().print(Printed.toSysOut());


        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);
//        kafkaStreams.cleanUp();
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    }
}

我收到以下异常:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual value type (value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:138)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$WindowStoreReadWriteDecorator.put(ProcessorContextImpl.java:533)
    at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
    ... 5 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
    ... 14 more

我在各个页面中寻找解决方案,看起来我在其中遗漏了一些东西。 任何意见都将受到极大欢迎。 谢谢。

因为您使用 aggregate(),您需要通过 aggregate(..., Materialized.with(...)) 显式设置输出值 serde。输出值类型 可能 与输入值类型不同,因此输入值 serde 不能 被重用。 (由于 Java 类型擦除,Kafka Streams 不知道类型实际上没有改变......)因此,Kafka Streams 回退到配置中的默认 serde。

作为替代方案,您可以使用 reduce() 而不是 aggregate 来解决问题。 reduce()输出类型与输入类型相同,因此输入值serde可以作为输出值。