Kafka 流 API:KStream 到 KTable

Kafka Streams API: KStream to KTable

我有一个 Kafka 主题,我在其中发送位置事件 (key=user_id, value=user_location)。我能够阅读并处理它作为 KStream:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

效果很好,但我想要 KTable 每个用户的最后已知位置。我该怎么做?

我可以写和读中级主题:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

有没有一种简单的方法可以从 KStream 中获取 KTable?这是我第一个使用 Kafka Streams 的应用程序,所以我可能遗漏了一些明显的东西。

更新:

在 Kafka 2.5 中,将添加一个新方法 KStream#toTable(),这将提供一种将 KStream 转换为 KTable 的便捷方法。详情见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

原答案:

目前没有直接的方法可以做到这一点。如 Confluent 常见问题解答中所述,您的方法绝对有效:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This is the simplest approach with regard to the code. However, it has the disadvantages that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written to and re-read from Kafka.

还有一种选择,使用 "dummy-reduce":

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");

This approach is somewhat more complex with regard to the code compared to option 1 but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.

总的来说,你需要自己决定,你更喜欢哪种方法:

In option 2, Kafka Streams will create an internal changelog topic to back up the KTable for fault tolerance. Thus, both approaches require some additional storage in Kafka and result in additional network traffic. Overall, it’s a trade-off between slightly more complex code in option 2 versus manual topic management in option 1.