使用实时窗口聚合更新数据库
Updating database with real-time windowed aggregations
我有一个流媒体管道,它使用流媒体源 (Kafka) 并将写入云SQL 数据库。目标是通过 Key/Value 过去一小时内收到的记录总和聚合来实时更新 CloudSQL 数据库。
例如。在过去的一个小时内收到了 3 条 KV <001,3>
、<001,4>
、<001,2>
的记录,数据库应该有一条记录 001, 9
。不包括超过一小时的记录。
我目前的解决方案是在 KafkaIO.read
:
之后将一个 SlidingWindow 变成一个 GroupByKey
.apply(Window.into(SlidingWindows
.of(Duration.standardSeconds(3600))
.every(Duration.standardSeconds(20)))
).apply(GroupByKey.create())
接着是对每个键求和的 ParDo,然后更新 SQL 数据库。
结果是我的 CloudSQL 数据库每 20 秒更新一次最近一小时的每个键的聚合,这满足了功能要求。问题是 CloudSQL 的 upserts 数量,这导致:大多数 KV 输出与之前的 window 相同,因此每个 window 每 20 秒触发一个小时的交易(~ 500k).
仅当使用具有该键的记录时才触发每个 KV 输出是有意义的,或者避免输出自上次 window 以来未更改的 KV。或者,在 CloudSQL 插入之前的某种过滤器接受所有内容并且只输出更改的 KV。这可能还是有其他解决方案?
一种可能的探索方法是利用滑动 window 聚合下游的状态 API。
然而,流入 this 的元素是无序的,因此您不能只存储元素并将其与传入值进行比较。
- 您需要将 DoFn 中的每个元素添加到 BagState(作为时间戳值)。
- 设置一个计时器,然后在 OnTimer() 函数期间读取 bagstate 中的所有元素,对它们进行排序并输出您需要的值。您还需要将 max(timestamp) 值存储在 ValueState 对象中,以便下次调用 OnTimer 时可以使用它。
用于 State API DoFn 的 window 的大小是任意的,它越大,不需要的 upserts 就越少。不利的一面是,window 越大,您将在 ValueState 中保留的键就越多,这些键可能不再需要。避免使用 Global Window,因为这将要求您构建 GC 函数,因为 window 永远不会过期并且您的密钥 space 将永远增长。
我有一个流媒体管道,它使用流媒体源 (Kafka) 并将写入云SQL 数据库。目标是通过 Key/Value 过去一小时内收到的记录总和聚合来实时更新 CloudSQL 数据库。
例如。在过去的一个小时内收到了 3 条 KV <001,3>
、<001,4>
、<001,2>
的记录,数据库应该有一条记录 001, 9
。不包括超过一小时的记录。
我目前的解决方案是在 KafkaIO.read
:
.apply(Window.into(SlidingWindows
.of(Duration.standardSeconds(3600))
.every(Duration.standardSeconds(20)))
).apply(GroupByKey.create())
接着是对每个键求和的 ParDo,然后更新 SQL 数据库。
结果是我的 CloudSQL 数据库每 20 秒更新一次最近一小时的每个键的聚合,这满足了功能要求。问题是 CloudSQL 的 upserts 数量,这导致:大多数 KV 输出与之前的 window 相同,因此每个 window 每 20 秒触发一个小时的交易(~ 500k).
仅当使用具有该键的记录时才触发每个 KV 输出是有意义的,或者避免输出自上次 window 以来未更改的 KV。或者,在 CloudSQL 插入之前的某种过滤器接受所有内容并且只输出更改的 KV。这可能还是有其他解决方案?
一种可能的探索方法是利用滑动 window 聚合下游的状态 API。
然而,流入 this 的元素是无序的,因此您不能只存储元素并将其与传入值进行比较。
- 您需要将 DoFn 中的每个元素添加到 BagState(作为时间戳值)。
- 设置一个计时器,然后在 OnTimer() 函数期间读取 bagstate 中的所有元素,对它们进行排序并输出您需要的值。您还需要将 max(timestamp) 值存储在 ValueState 对象中,以便下次调用 OnTimer 时可以使用它。
用于 State API DoFn 的 window 的大小是任意的,它越大,不需要的 upserts 就越少。不利的一面是,window 越大,您将在 ValueState 中保留的键就越多,这些键可能不再需要。避免使用 Global Window,因为这将要求您构建 GC 函数,因为 window 永远不会过期并且您的密钥 space 将永远增长。