管理具有大量内存使用的状态 - 从存储中查询

Manage state with huge memory usage - querying from storage

如果这听起来很蠢,我们深表歉意!我们正在使用 flink 进行异步 IO 调用。很多时候,IO 调用是重复的(同一组参数),我们调用的大约 80% 的 API return 对相同的参数有相同的响应。因此,我们希望避免再次拨打电话。我们认为我们可以使用状态来存储以前的响应并再次使用它们。问题在于,虽然我们的响应可以再次使用,但此类响应的数量很大,因此需要大量内存。有没有办法在需要时将其持久化以驱动和查询?

根本不是一个愚蠢的问题!

一些事实揭示了为什么这并不简单:

  1. Flink 状态对单个操作员来说是严格本地的。您无法在其他运算符中访问状态。
  2. Flink 提供了一种可以溢出到磁盘的状态后端,这就是 RocksDB。只有键控状态存储在 RocksDB 中——非键控状态始终存在于堆上。
  3. 异步 i/o 运算符不能用于键控流——它只适用于非键控上下文。
  4. 将迭代(作业图中的循环连接)与 DataStream API 一起使用是一个非常糟糕的主意(因为它会破坏检查点)。

当然,缓存不一定是Flink的managed状态

部分选项:

  • 不要对缓存使用键控状态。您可以使用类似单独的 RocksDB 实例的缓存,并直接在 async i/o 运算符中实现缓存。如果缓存适合内存,我建议使用 Guava。
  • 不要使用异步 i/o。按照@YuvalItzchakov 的建议,在 ProcessFunction 中自行获取和缓存。
  • 您可以改用 Stateful Functions。这是一个新的库 API,它位于 Flink 之上,克服了上面列出的一些限制。
  • 您可以构建如下图所示的内容。这里缓存在 CoProcessFunction 中以键控状态保存。如果缓存未命中,则使用下游异步 i/o 运算符来获取丢失的数据。然后必须使用外部队列将其循环回缓存,例如 Kafka、Kinesis 或 Pulsar。
                    +---------------------+                                       +------+
                    |                     +--results from cache+---------------^--> SINK |
+--requests+------> |  CoProcessFunction  |                                    |  +------+
                    |                     |                                    |
+--cache misses+--> |  cache in RocksDB   |                    +-----------+   |
                    |                     +--side output:      | fetch via +---+-> loop back
     SOURCES        +---------------------+  cache misses+---> | async i/o |       as 2nd input
                                                               +-----------+       to fill cache