KTable 作为带有空键的主题的输入

KTable as input from topic with null keys

我是 Kafka Streams 的新手,遇到了一个问题。

我有两个 tables - 一个用于长期数据 (descriptions),另一个用于实时数据 (live).他们有一个共同的 id.

想法是存储来自 descriptions 的数据(大概在 KTable 中,为每个 id 保留最新的描述)和新消息时出现在 live - 与来自 descriptions 的数据结合在相应的 id 上并进一步发送。

为简单起见,让我们将所有类型设为字符串。

所以我看到的每个教程的基本思想都是这样的:

interface Processor {

        @Input("live")
        KStream<String, String> input();

        @Input("descriptions")
        KTable<String, String> input();

        @Output("output")
        KStream<String, String> output();
    }

然后:

    @StreamListener
    @SendTo("output")
    public KStream<String, String> process(
            @Input("live") KStream<String, String> live,
            @Input("descriptions") KTable<String, String> descriptions) {
        // ...
    }

问题是 descriptions 主题不是 KTable-suitable(空键,只是消息)。

所以我不能将它用作输入,也不能创建任何新的中间主题来存储此 table 中的有效流(基本上是只读的)。

我正在搜索某种内存中的绑定目的地,但无济于事。

我认为可能的方式是创建一个 中间 输出,它只在内存中存储 KTable 或其他东西,然后使用这个 中间 作为 实时 处理中的输入。喜欢:

    @StreamListener("descriptions")
    @SendTo("intermediate")
    public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) {
        // ...
    }

希望这种绑定语义是可能的。

我认为你可以尝试通过引入初始处理器来引入存储 key/value 的中间主题。然后将该流用作常规处理器中输入的 table。这里有一些模板。我正在使用 Spring Cloud Stream 中的新功能模型来编写这些处理器。

@Bean
public Function<KStream<String, String>, KStream<String, String>> processDescriptions() {

        return descriptions -> 
            descriptions.map((key, value) -> {
                Pojo p = parseIntoPojo(value);
                return new KeyValue<>(p.getId(), value);
            })
            .groupByKey()
            .reduce((v1, v2) -> v2)
            .toStream();
}

@Bean
public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> realStream() {

    return (live, description) -> {

    }

}       

第一个处理器接收 description 作为 KStream,然后用密钥丰富它,然后输出为 KStream。现在这个主题既有键又有值,我们可以在下一个处理器中将其用作 KTable。下一个处理器是 java.util.function.BiFunction,它接收两个输入并生成一个输出。输入分别是 KStreamKTable,输出是 KStream

您可以按如下方式在它们上设置目的地:

spring.cloud.stream.function.definition=prorcessDescriptions;realStream

spring.cloud.stream.bindings.processDescriptions-in-0.destinaion=description-topic
spring.cloud.stream.bindings.processDescriptions-out-0.destinaion=description-table-topic

spring.cloud.stream.bindings.realStream-in-0.destinaion=live-topic
spring.cloud.stream.bindings.realStream-in-1.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-out-0.destinaion=output

您也可以使用 StreamListener 方法获得相同的结果。

这种方法的缺点是你需要在 Kafka 中维护一个额外的中间主题,但如果你真的想要它作为一个 KTable 并且底层信息是非键控的,我不认为这里的选项太多了。

如果您不需要顶级描述 KTable,您可以以某种方式将其存储在状态存储中,然后查询将所有存储在单个处理器中。我还没有尝试过,所以你需要尝试一下这个想法。基本上,你会得到两个流,直播和描述

(live, descriptions) -> Reduce key/value for descriptions and keep that in a state store. 
Then, do the processing on live by joining with what is in the state store. 

Kafka Streams 允许多种方式来完成类似的事情。查看他们的参考文档以获取更多信息。

希望这对您有所帮助。