集合的 Flink 状态 TTL 过期

Flink State TTL expiration for Collections

感谢您抽空阅读本文,想请教各位高手关于Flink 1.8.0中的Flink state TTL特性,看完了 这个,对我来说还是很模糊。

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl

我想确定TTL 功能在哪里启用,是在键字段还是在值字段。特别是,假设我有一个像这样的 mapState 结构:

mapState = Map[String,List[String]]
e.g. val mapState = Map("haha" -> List("foo","bar")) in Scala
where "haha" is the key of the mapState and List("foo","bar") is the value

如果我通过 StateTtlConfig 在 mapState 上设置 1 分钟的 TTL,然后立即(少于 1 分钟)写入列表中的值之一,比如 "foo"。

然后 1 分钟后,当 TTL 触发时,键 "haha" 过期还是值 "bar" 过期?

换句话说,如果它在键上过期,我的理解是 mapState 将保持不变

mapState = Map("haha" -> List("foo","bar"))

因为写入值 "foo" 会重置键上的 TTL,因此整个 mapState 保持不变

另一种情况是,如果它在值上过期,则 mapState 将变为

mapState = Map("haha" -> List("foo"))

因为值 "bar" 将在 1 分钟后过期而不被访问。

希望我把问题说清楚了,在此先感谢您提供任何形式的帮助。

使用 TTL 访问状态的代码如下所示:

<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getter.get();
        if (ttlValue == null) {
            return null;
        } else if (expired(ttlValue)) {
            stateClear.run();
            if (!returnExpired) {
                return null;
            }
        } else if (updateTsOnRead) {
            updater.accept(rewrapWithNewTs(ttlValue));
        }
        return ttlValue;
    }

验证值是否过期的方法如下所示:

    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
        return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
    }

这基本上意味着在这种情况下它将检查整个列表而不是单独元素的 TTL。因此,根据 StateTtlConfig 整个列表将过期或整个列表不会过期。

可用的 TTL 配置是 OnReadAndWriteOnCreateAndWrite。所以,基本上为了保持一致,如果您想更新值列表,您需要在 MapState 上使用 put()