使用自定义键创建 KTable 时出错

Error while creating KTable with custom key

用例 - 有一个带有消息的主题(空,元数据)。我需要使用键 (metadata.entity_id) 和值作为元数据从主题创建一个 Ktable。此 table 稍后将用于与具有相同密钥的流进行连接。

    private final static String KAFKA_BROKERS = "localhost:9092";
    private final static String APPLICATION_ID = "TestMetadataTable";
    private final static String AUTO_OFFSET_RESET_CONFIG = "earliest";
    private final static String METADATA_TOPIC = "test-metadata-topic";


    public static void main (String args[]) {

        //Setting the Stream configuration params.
        final Properties kafkaStreamConfiguration = new Properties();
        kafkaStreamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        kafkaStreamConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, APPLICATION_ID);
        kafkaStreamConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);

        kafkaStreamConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);      

      //Creating Serdes for MetricMetadata
        GenericJsonSerializer<MetricMetadata> metadataJsonSerializer = new GenericJsonSerializer<MetricMetadata>();
        GenericJsonDeserializer<MetricMetadata> metadataJsonDeserializer = new GenericJsonDeserializer<MetricMetadata>(MetricMetadata.class);
        Serde<MetricMetadata> metadataSerde = Serdes.serdeFrom(metadataJsonSerializer, metadataJsonDeserializer);


        //Creating kafka stream.
        final StreamsBuilder builder = new StreamsBuilder();

       KTable<String, MetricMetadata> metaTable = builder.table(METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
                .groupBy((key, value) -> KeyValue.pair(value.getEntity_id(), value))            
                .aggregate( () -> null,
                         (key, value, aggValue) -> value,
                         (key, value, aggValue) -> value
                        );

        final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamConfiguration);
        streams.start();        
    }

一旦我将消息推送到主题 - METADATA_TOPIC。这导致以下错误。我在这里错过了什么吗?卡夫卡流 2.2.0

Exception in thread "TestMetadataTable-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store test-metadata-topic-STATE-STORE-0000000000
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:471)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:95)
    at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:72)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:102)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal[=11=](CachingKeyValueStore.java:79)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:127)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:72)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:224)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
    ... 10 more
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 28 more

在这种情况下,您需要通过 GroupedKTable.groupBy() 操作提供 Serdes,因为调用 groupBy 会触发重新分区。您还需要为状态存储的聚合操作提供相同的 Serdes

此外,由于密钥是 null,我认为您最初应该使用 KStream。然后调用groupByKey(你仍然需要通过Grouped提供Serdes),聚合会给你你想要的KTable

在我的脑海中,这样的事情应该可行

builder.stream((METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
        .selectKey((key, value) -> KeyValue.pair(value.getEntity_id(), value))
        .groupByKey(Grouped.with(Serdes.String(), metadataSerde))
        .aggregate( () -> null,
            (key, value, aggValue) -> value,
            (key, value, aggValue) -> value,
            Materialized.with(Serdes.String(), metadataSerde)
        );