重新运行后 Flink 状态为空(重新初始化)
Flink state empty (reinitialized) after rerun
我正在尝试连接两个流,首先是坚持 MapValueState
:
RocksDB
将数据保存在检查点文件夹中,但新建运行 后,state
为空。我 运行 它在本地和 flink 集群中取消在集群中提交并简单地在本地重新 运行
env.setStateBackend(new RocksDBStateBackend(..)
env.enableCheckpointing(1000)
...
val productDescriptionStream: KeyedStream[ProductDescription, String] = env.addSource(..)
.keyBy(_.id)
val productStockStream: KeyedStream[ProductStock, String] = env.addSource(..)
.keyBy(_.id)
和
productDescriptionStream
.connect(productStockStream)
.process(ProductProcessor())
.setParallelism(1)
env.execute("Product aggregator")
产品处理器
case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product]{
private[this] lazy val stateDescriptor: MapStateDescriptor[String, ProductDescription] =
new MapStateDescriptor[String, ProductDescription](
"productDescription",
createTypeInformation[String],
createTypeInformation[ProductDescription]
)
private[this] lazy val states: MapState[String, ProductDescription] = getRuntimeContext.getMapState(stateDescriptor)
override def processElement1(value: ProductDescription,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context,out: Collector[Product]
): Unit = {
states.put(value.id, value)
}}
override def processElement2(value: ProductStock,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]
): Unit = {
if (states.contains(value.id)) {
val product =Product(
id = value.id,
description = Some(states.get(value.id).description),
stock = Some(value.stock),
updatedAt = value.updatedAt)
out.collect(product )
}}
检查点是由 Flink 创建的,用于从故障中恢复,而不是用于在手动关闭后恢复。当作业被取消时,Flink 的默认行为是删除检查点。由于作业不会再失败,因此不需要恢复。
您有多种选择:
(1) 在取消作业时将检查点配置为 retain checkpoints:
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
然后当您重新启动作业时,您需要指明您希望它从特定检查点重新启动:
flink run -s <checkpoint-path> ...
否则,无论何时开始作业,它都会以空状态后端开始。
(2) 不要取消作业,而是使用 stop with savepoint:
flink stop [-p targetDirectory] [-d] <jobID>
之后您将再次需要使用 flink run -s ...
从保存点恢复。
使用保存点停止是比依赖最近的检查点回退到更干净的方法。
(3) 或者您可以使用 Ververica Platform Community Edition,这将抽象级别提高到您不必自己管理这些细节的程度。
我正在尝试连接两个流,首先是坚持 MapValueState
:
RocksDB
将数据保存在检查点文件夹中,但新建运行 后,state
为空。我 运行 它在本地和 flink 集群中取消在集群中提交并简单地在本地重新 运行
env.setStateBackend(new RocksDBStateBackend(..)
env.enableCheckpointing(1000)
...
val productDescriptionStream: KeyedStream[ProductDescription, String] = env.addSource(..)
.keyBy(_.id)
val productStockStream: KeyedStream[ProductStock, String] = env.addSource(..)
.keyBy(_.id)
和
productDescriptionStream
.connect(productStockStream)
.process(ProductProcessor())
.setParallelism(1)
env.execute("Product aggregator")
产品处理器
case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product]{
private[this] lazy val stateDescriptor: MapStateDescriptor[String, ProductDescription] =
new MapStateDescriptor[String, ProductDescription](
"productDescription",
createTypeInformation[String],
createTypeInformation[ProductDescription]
)
private[this] lazy val states: MapState[String, ProductDescription] = getRuntimeContext.getMapState(stateDescriptor)
override def processElement1(value: ProductDescription,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context,out: Collector[Product]
): Unit = {
states.put(value.id, value)
}}
override def processElement2(value: ProductStock,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]
): Unit = {
if (states.contains(value.id)) {
val product =Product(
id = value.id,
description = Some(states.get(value.id).description),
stock = Some(value.stock),
updatedAt = value.updatedAt)
out.collect(product )
}}
检查点是由 Flink 创建的,用于从故障中恢复,而不是用于在手动关闭后恢复。当作业被取消时,Flink 的默认行为是删除检查点。由于作业不会再失败,因此不需要恢复。
您有多种选择:
(1) 在取消作业时将检查点配置为 retain checkpoints:
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
然后当您重新启动作业时,您需要指明您希望它从特定检查点重新启动:
flink run -s <checkpoint-path> ...
否则,无论何时开始作业,它都会以空状态后端开始。
(2) 不要取消作业,而是使用 stop with savepoint:
flink stop [-p targetDirectory] [-d] <jobID>
之后您将再次需要使用 flink run -s ...
从保存点恢复。
使用保存点停止是比依赖最近的检查点回退到更干净的方法。
(3) 或者您可以使用 Ververica Platform Community Edition,这将抽象级别提高到您不必自己管理这些细节的程度。