Kafka 流聚合抛出异常
Kafka streams aggregate throwing Exception
这是我的 Kafka 流代码,它使用滑动 window 对时间 window 中的所有整数数据求和。
public class KafkaWindowingLIS {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Integer uid = 1;
long tenSeconds = 1000 * 10;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");
KStream<Integer, Integer> integerKStream = dataStream
.filter((key, val) -> { //Filter only numbers from Stream
try {
Integer.parseInt(val);
return true;
} catch (NumberFormatException e) {
return false;
}
})
.map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));
TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream
.groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer())) //Killed my time
.windowedBy(TimeWindows.of(tenSeconds));
timeWindowedKStream.aggregate(
() -> 0,
(key, value, aggregate) -> value + aggregate)
.toStream().print(Printed.toSysOut());
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);
// kafkaStreams.cleanUp();
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
我收到以下异常:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual value type (value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:138)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$WindowStoreReadWriteDecorator.put(ProcessorContextImpl.java:533)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
... 5 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
... 14 more
我在各个页面中寻找解决方案,看起来我在其中遗漏了一些东西。
任何意见都将受到极大欢迎。
谢谢。
因为您使用 aggregate()
,您需要通过 aggregate(..., Materialized.with(...))
显式设置输出值 serde。输出值类型 可能 与输入值类型不同,因此输入值 serde 不能 被重用。 (由于 Java 类型擦除,Kafka Streams 不知道类型实际上没有改变......)因此,Kafka Streams 回退到配置中的默认 serde。
作为替代方案,您可以使用 reduce()
而不是 aggregate
来解决问题。 reduce()
输出类型与输入类型相同,因此输入值serde可以作为输出值。
这是我的 Kafka 流代码,它使用滑动 window 对时间 window 中的所有整数数据求和。
public class KafkaWindowingLIS {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-windowing-lis");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Integer uid = 1;
long tenSeconds = 1000 * 10;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> dataStream = builder.stream("kafka-windowing-lis");
KStream<Integer, Integer> integerKStream = dataStream
.filter((key, val) -> { //Filter only numbers from Stream
try {
Integer.parseInt(val);
return true;
} catch (NumberFormatException e) {
return false;
}
})
.map((key, val) -> new KeyValue<Integer, Integer>(uid, Integer.parseInt(val)));
TimeWindowedKStream<Integer, Integer> timeWindowedKStream = integerKStream
.groupBy((k, v) -> k, Serialized.with(Serdes.Integer(), Serdes.Integer())) //Killed my time
.windowedBy(TimeWindows.of(tenSeconds));
timeWindowedKStream.aggregate(
() -> 0,
(key, value, aggregate) -> value + aggregate)
.toStream().print(Printed.toSysOut());
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);
// kafkaStreams.cleanUp();
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
我收到以下异常:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual value type (value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:138)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$WindowStoreReadWriteDecorator.put(ProcessorContextImpl.java:533)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)
... 5 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
... 14 more
我在各个页面中寻找解决方案,看起来我在其中遗漏了一些东西。 任何意见都将受到极大欢迎。 谢谢。
因为您使用 aggregate()
,您需要通过 aggregate(..., Materialized.with(...))
显式设置输出值 serde。输出值类型 可能 与输入值类型不同,因此输入值 serde 不能 被重用。 (由于 Java 类型擦除,Kafka Streams 不知道类型实际上没有改变......)因此,Kafka Streams 回退到配置中的默认 serde。
作为替代方案,您可以使用 reduce()
而不是 aggregate
来解决问题。 reduce()
输出类型与输入类型相同,因此输入值serde可以作为输出值。