集合的 Flink 状态 TTL 过期
Flink State TTL expiration for Collections
感谢您抽空阅读本文,想请教各位高手关于Flink 1.8.0中的Flink state TTL特性,看完了
这个,对我来说还是很模糊。
我想确定TTL 功能在哪里启用,是在键字段还是在值字段。特别是,假设我有一个像这样的 mapState 结构:
mapState = Map[String,List[String]]
e.g. val mapState = Map("haha" -> List("foo","bar")) in Scala
where "haha" is the key of the mapState and List("foo","bar") is the value
如果我通过 StateTtlConfig 在 mapState 上设置 1 分钟的 TTL,然后立即(少于 1 分钟)写入列表中的值之一,比如 "foo"。
然后 1 分钟后,当 TTL 触发时,键 "haha" 过期还是值 "bar" 过期?
换句话说,如果它在键上过期,我的理解是 mapState 将保持不变
mapState = Map("haha" -> List("foo","bar"))
因为写入值 "foo" 会重置键上的 TTL,因此整个 mapState 保持不变
另一种情况是,如果它在值上过期,则 mapState 将变为
mapState = Map("haha" -> List("foo"))
因为值 "bar" 将在 1 分钟后过期而不被访问。
希望我把问题说清楚了,在此先感谢您提供任何形式的帮助。
使用 TTL 访问状态的代码如下所示:
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
验证值是否过期的方法如下所示:
static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
}
这基本上意味着在这种情况下它将检查整个列表而不是单独元素的 TTL。因此,根据 StateTtlConfig
整个列表将过期或整个列表不会过期。
可用的 TTL 配置是 OnReadAndWrite
和 OnCreateAndWrite
。所以,基本上为了保持一致,如果您想更新值列表,您需要在 MapState
上使用 put()
。
感谢您抽空阅读本文,想请教各位高手关于Flink 1.8.0中的Flink state TTL特性,看完了 这个,对我来说还是很模糊。
我想确定TTL 功能在哪里启用,是在键字段还是在值字段。特别是,假设我有一个像这样的 mapState 结构:
mapState = Map[String,List[String]]
e.g. val mapState = Map("haha" -> List("foo","bar")) in Scala
where "haha" is the key of the mapState and List("foo","bar") is the value
如果我通过 StateTtlConfig 在 mapState 上设置 1 分钟的 TTL,然后立即(少于 1 分钟)写入列表中的值之一,比如 "foo"。
然后 1 分钟后,当 TTL 触发时,键 "haha" 过期还是值 "bar" 过期?
换句话说,如果它在键上过期,我的理解是 mapState 将保持不变
mapState = Map("haha" -> List("foo","bar"))
因为写入值 "foo" 会重置键上的 TTL,因此整个 mapState 保持不变
另一种情况是,如果它在值上过期,则 mapState 将变为
mapState = Map("haha" -> List("foo"))
因为值 "bar" 将在 1 分钟后过期而不被访问。
希望我把问题说清楚了,在此先感谢您提供任何形式的帮助。
使用 TTL 访问状态的代码如下所示:
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
验证值是否过期的方法如下所示:
static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
}
这基本上意味着在这种情况下它将检查整个列表而不是单独元素的 TTL。因此,根据 StateTtlConfig
整个列表将过期或整个列表不会过期。
可用的 TTL 配置是 OnReadAndWrite
和 OnCreateAndWrite
。所以,基本上为了保持一致,如果您想更新值列表,您需要在 MapState
上使用 put()
。