管理具有大量内存使用的状态 - 从存储中查询
Manage state with huge memory usage - querying from storage
如果这听起来很蠢,我们深表歉意!我们正在使用 flink 进行异步 IO 调用。很多时候,IO 调用是重复的(同一组参数),我们调用的大约 80% 的 API return 对相同的参数有相同的响应。因此,我们希望避免再次拨打电话。我们认为我们可以使用状态来存储以前的响应并再次使用它们。问题在于,虽然我们的响应可以再次使用,但此类响应的数量很大,因此需要大量内存。有没有办法在需要时将其持久化以驱动和查询?
根本不是一个愚蠢的问题!
一些事实揭示了为什么这并不简单:
- Flink 状态对单个操作员来说是严格本地的。您无法在其他运算符中访问状态。
- Flink 提供了一种可以溢出到磁盘的状态后端,这就是 RocksDB。只有键控状态存储在 RocksDB 中——非键控状态始终存在于堆上。
- 异步 i/o 运算符不能用于键控流——它只适用于非键控上下文。
- 将迭代(作业图中的循环连接)与 DataStream API 一起使用是一个非常糟糕的主意(因为它会破坏检查点)。
当然,缓存不一定是Flink的managed状态
部分选项:
- 不要对缓存使用键控状态。您可以使用类似单独的 RocksDB 实例的缓存,并直接在 async i/o 运算符中实现缓存。如果缓存适合内存,我建议使用 Guava。
- 不要使用异步 i/o。按照@YuvalItzchakov 的建议,在 ProcessFunction 中自行获取和缓存。
- 您可以改用 Stateful Functions。这是一个新的库 API,它位于 Flink 之上,克服了上面列出的一些限制。
- 您可以构建如下图所示的内容。这里缓存在 CoProcessFunction 中以键控状态保存。如果缓存未命中,则使用下游异步 i/o 运算符来获取丢失的数据。然后必须使用外部队列将其循环回缓存,例如 Kafka、Kinesis 或 Pulsar。
+---------------------+ +------+
| +--results from cache+---------------^--> SINK |
+--requests+------> | CoProcessFunction | | +------+
| | |
+--cache misses+--> | cache in RocksDB | +-----------+ |
| +--side output: | fetch via +---+-> loop back
SOURCES +---------------------+ cache misses+---> | async i/o | as 2nd input
+-----------+ to fill cache
如果这听起来很蠢,我们深表歉意!我们正在使用 flink 进行异步 IO 调用。很多时候,IO 调用是重复的(同一组参数),我们调用的大约 80% 的 API return 对相同的参数有相同的响应。因此,我们希望避免再次拨打电话。我们认为我们可以使用状态来存储以前的响应并再次使用它们。问题在于,虽然我们的响应可以再次使用,但此类响应的数量很大,因此需要大量内存。有没有办法在需要时将其持久化以驱动和查询?
根本不是一个愚蠢的问题!
一些事实揭示了为什么这并不简单:
- Flink 状态对单个操作员来说是严格本地的。您无法在其他运算符中访问状态。
- Flink 提供了一种可以溢出到磁盘的状态后端,这就是 RocksDB。只有键控状态存储在 RocksDB 中——非键控状态始终存在于堆上。
- 异步 i/o 运算符不能用于键控流——它只适用于非键控上下文。
- 将迭代(作业图中的循环连接)与 DataStream API 一起使用是一个非常糟糕的主意(因为它会破坏检查点)。
当然,缓存不一定是Flink的managed状态
部分选项:
- 不要对缓存使用键控状态。您可以使用类似单独的 RocksDB 实例的缓存,并直接在 async i/o 运算符中实现缓存。如果缓存适合内存,我建议使用 Guava。
- 不要使用异步 i/o。按照@YuvalItzchakov 的建议,在 ProcessFunction 中自行获取和缓存。
- 您可以改用 Stateful Functions。这是一个新的库 API,它位于 Flink 之上,克服了上面列出的一些限制。
- 您可以构建如下图所示的内容。这里缓存在 CoProcessFunction 中以键控状态保存。如果缓存未命中,则使用下游异步 i/o 运算符来获取丢失的数据。然后必须使用外部队列将其循环回缓存,例如 Kafka、Kinesis 或 Pulsar。
+---------------------+ +------+
| +--results from cache+---------------^--> SINK |
+--requests+------> | CoProcessFunction | | +------+
| | |
+--cache misses+--> | cache in RocksDB | +-----------+ |
| +--side output: | fetch via +---+-> loop back
SOURCES +---------------------+ cache misses+---> | async i/o | as 2nd input
+-----------+ to fill cache