kafka streams - 如何为 KTable 设置新密钥

kafka streams - how to set a new key for KTable

我是 Kafka Streams 的新手,我使用的是 1.0.0 版本。我想根据其中一个值为 KTable 设置一个新键。

使用KStream时,可以通过selectKey()这样的方法来完成。

kstream.selectKey ((k,v) -> v.newKey)

但是KTable中没有这样的方法。唯一的方法是将给定的 KTable 转换为 KStream。对这个问题有什么想法吗?它改变了 KTable 设计的关键?

如果要设置新的key,需要重新分组KTable:

KTable newTable = table.groupBy(/*put select key function here*/)
                       .aggregate(...);

因为键对于 KTable(与 KStream 不同)必须是唯一的,所以需要指定一个聚合函数,将具有相同(新)键的所有记录聚合成一个值。

从 Kafka 2.5 开始,Kafka Streams 也支持 KStream#toTable() 运算符。因此,也可以做到table.toStream().selectKey(...).toTable()。两种方法各有利弊。

使用toTable()的主要缺点是它会根据新键对输入数据进行重新分区,这会导致交错写入重新分区主题,从而导致数据乱序。虽然通过 groupBy() 的第一种方法使用相同的实现,但使用聚合函数可以帮助您解决“冲突”的明确性。如果您使用 toTable() 运算符,则会完成基于重新分区主题偏移顺序的“盲”更新插入(这实际上类似于其他答案中的代码示例)。

示例:

Key | Value
 A  | (a,1)
 B  | (a,2)

如果您在 a 上重新键入,您的输出 table 将是两者之一(但未定义为一个):

Key | Value          Key | Value
 a  | 1               a  |  2

“重新键入”a table 的操作在语义上总是 定义错误。

@Matthias 的回答让我走上了正确的道路,但我认为有一段示例代码可能会有所帮助

final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
        // First, going to set the new key to the user's application id
        (userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
        // Initiate the aggregate value
        () -> null,
        // adder (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user,
        // subtractor (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user
);

分组表聚合()文档: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Materialized-

@Allen Underwood 代码帮助了我,如果键是自定义 Pojo,则必须进行一些更改。当我收到 class 转换异常时。以下代码有效

usersKTable.groupBy((k, v) -> KeyValue.pair(v.getCompositeKey(), v),Grouped.with(compositeKeySerde,valueSerde))
                .aggregate(
                        () -> null,
                        (applicationId, value, aggValue) -> value,
                        (applicationId, value, aggValue) -> value,
                        Materialized.with(compositeKeySerde, valueSerde)
                );

对于使用 confluent 5.5.+ 的用户,有一种方法可以从流中提取密钥并直接转换为 KTable:

       KTable<String, User> userTable = builder
            .stream("topic_name", Consumed.with(userIdSerde, userSerde))
            .selectKey((key, value) -> key.getUserId())             
            .toTable( Materialized.with(stringIdSerde, userSerde));

详情可见here

我认为@Matthias 描述的方式 accurate/detailed 不够。这是正确的,但这种限制的根本原因(也存在于 ksqlDB CREATE TABLE 语法中)不仅仅是键对于 KTable 必须是唯一的事实。

独特性本身并不限制KTables。毕竟,任何基础主题都可以而且经常包含具有相同键的消息。 KTable 对此没有问题。它只会为每个密钥强制执行最新状态。这有多种后果,包括 KTable 从聚合函数构建的事实可以根据单个输入消息将多个消息生成到其输出主题中......但是让我们回到你的问题。

因此,KTable 需要知道特定键的哪条消息是最后一条消息,这意味着它是键的最新状态。

Kafka 有哪些顺序保证?正确,基于每个分区。

重新键入消息时会发生什么情况?正确,它们将分布在与输入消息非常不同的分区中。

因此,具有相同密钥的初始消息已由代理本身正确存储到同一分区中(如果您没有对您的自定义 fancy/stupid 执行任何操作 Partitioner) 这样 KTable 总能推断出最新状态。

但是,如果在运行中的 Kafka Streams 应用程序中重新键入消息会发生什么情况?

它们将再次跨分区传播,但现在使用不同的密钥,如果您的应用程序被扩展并且您有多个并行工作的任务您根本无法保证最后一条消息是新密钥实际上是最后一条消息,因为它存储在原始主题 中。单独的任务没有这样的协调。他们不能。否则效率不高。

因此,如果允许重新键入,KTable 将失去其主要语义。