KTable状态存储无限保留

KTable state store infinite retention

我们有以下高级 DSL 处理拓扑:

TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);

KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");


KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");

KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")

简而言之,我们上面所做的是:

  1. 使用跳跃计算事件 window
  2. 在生成的 KTables 之间进行左连接(由于业务逻辑而左)
  3. 对键和值进行分组和更改:获取键的一个组件(长整型)并移动到值
  4. 将生成的 KTable 聚合到最终的 KTable,聚合对象是从 T 到步骤 1 中连接的两个计数器的映射。请注意,映射的大小不超过 600,通常更少。

想法是创建 windowed 事件计数并使用这些 windowed 键进行连接和聚合操作(在 KTable 的情况下没有 windows这样的操作)

问题是这样的: joinaggregate 操作的状态存储没有保留机制,导致磁盘(RocksDB)space 爆炸。

更具体地说: (跳跃)windows 导致键上的笛卡尔积并且没有删除旧 windows.

的机制

如果 KTable 键没有被 window编辑,而只有足够多的唯一键,也会出现同样的问题

请注意,支持表 1 和表 2 的状态存储没有 space 问题,这是因为 DSL 为它们提供了一个 windowed 存储,它管理删除旧的 windows。 在连接和聚合中,我们将 windowed 键视为 "any old key" 并且 DSL 做同样的事情并使用非 windowed KeyValueStore.

此问题与以下内容相关:KAFKA-4212, KAFKA-4273, confluent forum question

这里有什么误解的概念吗? 是否有使用 DSL 实现此拓扑的简单方法? 如果没有,建议使用低级别 API 实现它的方法是什么?

我想你可以这样做:

StreamsBuilder builder = new StreamBuilder();
KStream<K,V> streams = builder.stream(/* pattern for both streams */);

KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
    /* custom Transformer that set the weaker grouping key right here
       and puts the extracted component into the value before the aggregation;
       additionally (that's why we need a Transformer) get the topic name from
       context object and enrich the value accordingly (ie, third String argument in the output Tuple */);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
    timeWindow,
    /* initializer: return an empty Map;
       aggregator:
       for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
         if not, add new map entry with Pair(0,0)
       take the corresponding Pair from the Map and increase one
       counter depending on the original topic that
       is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);

示例:

假设两个输入流 s1s2 具有以下记录 (<TS,key,value>):

s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

在您的原始程序中,您首先会分别计算两个流(假设翻滚 window 大小为 5)得到(省略 TS):

<W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>  
and
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1>  

左连接后得到(所有记录处理完毕后的结果,省略中间值):

<<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>

现在您使用"weaker key"重新分组,将关键部分提取到值中,并根据提取的关键部分将所有条目放入映射中。假设我们根据 "char" 和 "number" 拆分密钥(即,k1 拆分为 k,因为 smallerKey1 是提取进入值的 Long)。聚合后你得到(我将地图表示为 (k1 -> v1, k2 - v2):

<<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>

如果这是一个正确的例子(我可能没有理解你的问题描述)。您可以使用 transform/groupBy/aggregate 执行相同的操作,如上所述。输入是:

s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

transform的结果是(包括TS):

<1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
and
<1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>

Note, that Transform actually processes both streams as "one stream" because we used Pattern subscription -- thus, the output is just one stream with interleaving records from both original streams.

您现在应用相同的 window 和聚合结果(TS 省略)——我们通过交替处理每个原始输入流的一条记录来显示结果)作为 inputRecord ==> outputRecord

<1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
<1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
<2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
<2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
<3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
<3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
<6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
<11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
<12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>

如果将此结果的每个键的最新记录与上面的结果进行比较,您会发现两者相同。