KStream 和 KTable 之间的时间语义
Time semantics between KStream and KTable
我正在尝试构建以下拓扑:
使用 Debezium 连接器,我正在拉 2 tables(我们称它们为 tables A 和 DA)。根据 DBZ,存储 table 行的主题具有结构 { before: "...", after: "..." }.
我的拓扑中的第一步是从这两个 "table" 主题创建 "clean" KStreams。那里的子拓扑大概是这样的:
private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
StreamsBuilder builder, Properties streamsConfig) {
return builder
.stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
.filter((key, envelope) -> [ some filtering condition ] )
.map((key, envelope) -> [ maps to TABLE_A.Value ] )
.through(tableRowByIdTopicName);
}
请注意,我明确指定了记录时间,因为 table 行在最初发布后将被 CDC 编辑 "years"。该函数目前所做的是伪造从 2010-01-01 开始的时间,并使用 AtomicInteger
,为每个消耗的实体增加 1 毫秒。它对 tables A 这样做,但对 DA 不这样做(稍后我会解释原因)。
拓扑的第2阶段是根据"cleaned"话题为tableA搭建1个KTable,像这样:
private static KTable<String, EntityInfoList> getEntityInfoListById(
KStream<String, TABLE_A.Value> tableAByIdStream) {
return tableAByIdStream
.map((key, value) -> [ some mapping ] )
.groupByKey()
.aggregate(() -> [ builds up a EntityInfoList object ] ));
}
- 最后,在 KTable 准备就绪后,我将像这样通过 DA 使用 KStream 加入他们:
private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
KStream<String, Table_DA.Value> tableDAStream,
KTable<String, EntityInfoList> tableA_KTable) {
KStream<String, Table_DA>[] branches = tableDAStream.branch(
(key, value) -> [ some logic ],
(key, value) -> true);
KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
.join(
tableA_KTable,
(streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
.flatMap((key, listValue) -> [ some logic to flatten it ]));
[ similar logic with branch[1] ]
}
我的问题是,尽管事实上我是 "faking" 来自 Table_A 主题的记录的时间(我已经验证他们正在使用 kafkacat 引用 2010/01/01) Table_DA(连接的流端)中的条目在今天“2019/08/14”附近有时间戳),Kafka Streams 似乎没有从 Table_DA 中读取任何条目KStream 直到它已将 Table_A 中的所有记录提取到 KTable 中。
因此,我没有得到我期望的所有 "join hits",而且它也是不确定的。我根据这句话的理解是相反的:
For stream-table join, Kafka Stream align record processing ordered based on record timestamps. Thus, the update to the table are aligned with the records of you stream.
到目前为止,我的经验是这不会发生。我还可以很容易地看到我的应用程序如何在消耗完 Table_DA 流中的所有条目(恰好小 10 倍)后继续通过 Table_A 主题方式进行搅动。
我是不是做错了什么?
时间戳同步是 2.1.0 版本之前的最大努力(参见 https://issues.apache.org/jira/browse/KAFKA-3514)。
从 2.1.0 开始,时间戳是严格同步的。但是,如果一个输入没有任何数据,Kafka Streams 将 "enforce" 按照 KIP-353 中的描述进行处理,以避免永远阻塞。如果你有突发输入并且想要 "block" 处理一段时间,如果一个输入没有数据,你可以增加配置参数 max.task.idle.ms
(默认为 0
),如 2.1.0 中引入的那样KIP-353.
我正在尝试构建以下拓扑:
使用 Debezium 连接器,我正在拉 2 tables(我们称它们为 tables A 和 DA)。根据 DBZ,存储 table 行的主题具有结构 { before: "...", after: "..." }.
我的拓扑中的第一步是从这两个 "table" 主题创建 "clean" KStreams。那里的子拓扑大概是这样的:
private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
StreamsBuilder builder, Properties streamsConfig) {
return builder
.stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
.filter((key, envelope) -> [ some filtering condition ] )
.map((key, envelope) -> [ maps to TABLE_A.Value ] )
.through(tableRowByIdTopicName);
}
请注意,我明确指定了记录时间,因为 table 行在最初发布后将被 CDC 编辑 "years"。该函数目前所做的是伪造从 2010-01-01 开始的时间,并使用
AtomicInteger
,为每个消耗的实体增加 1 毫秒。它对 tables A 这样做,但对 DA 不这样做(稍后我会解释原因)。拓扑的第2阶段是根据"cleaned"话题为tableA搭建1个KTable,像这样:
private static KTable<String, EntityInfoList> getEntityInfoListById(
KStream<String, TABLE_A.Value> tableAByIdStream) {
return tableAByIdStream
.map((key, value) -> [ some mapping ] )
.groupByKey()
.aggregate(() -> [ builds up a EntityInfoList object ] ));
}
- 最后,在 KTable 准备就绪后,我将像这样通过 DA 使用 KStream 加入他们:
private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
KStream<String, Table_DA.Value> tableDAStream,
KTable<String, EntityInfoList> tableA_KTable) {
KStream<String, Table_DA>[] branches = tableDAStream.branch(
(key, value) -> [ some logic ],
(key, value) -> true);
KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
.join(
tableA_KTable,
(streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
.flatMap((key, listValue) -> [ some logic to flatten it ]));
[ similar logic with branch[1] ]
}
我的问题是,尽管事实上我是 "faking" 来自 Table_A 主题的记录的时间(我已经验证他们正在使用 kafkacat 引用 2010/01/01) Table_DA(连接的流端)中的条目在今天“2019/08/14”附近有时间戳),Kafka Streams 似乎没有从 Table_DA 中读取任何条目KStream 直到它已将 Table_A 中的所有记录提取到 KTable 中。
因此,我没有得到我期望的所有 "join hits",而且它也是不确定的。我根据
For stream-table join, Kafka Stream align record processing ordered based on record timestamps. Thus, the update to the table are aligned with the records of you stream.
到目前为止,我的经验是这不会发生。我还可以很容易地看到我的应用程序如何在消耗完 Table_DA 流中的所有条目(恰好小 10 倍)后继续通过 Table_A 主题方式进行搅动。
我是不是做错了什么?
时间戳同步是 2.1.0 版本之前的最大努力(参见 https://issues.apache.org/jira/browse/KAFKA-3514)。
从 2.1.0 开始,时间戳是严格同步的。但是,如果一个输入没有任何数据,Kafka Streams 将 "enforce" 按照 KIP-353 中的描述进行处理,以避免永远阻塞。如果你有突发输入并且想要 "block" 处理一段时间,如果一个输入没有数据,你可以增加配置参数 max.task.idle.ms
(默认为 0
),如 2.1.0 中引入的那样KIP-353.