flink什么时候发起RocksDBKeyedStateBackend目录删除?

When does flink initiates RocksDBKeyedStateBackend directory delete?

我的 Flink 作业运行了几天没有任何问题,但几天后它会杀死 tm 并重新启动整个作业。 在日志中我发现了这个, org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 删除现有实例基目录 /tmp/flink-io-4b455efa-bcde-4ef2-aed3-c66ca9d8933e/job_152b986e7e5a6f411780849f13ce4bc8_op_KeyedProcessOperator_a1c286a47e97622aa92a8f6cd4115854__1_4__uuid_4b53ff24-e240-48d6-b438-3ab2d05cbdb8

删除 statestore 文件后,它会抛出此错误,因为我正在从那里的 statestore 获取数据。

java.lang.NullPointerException
    at c.c.w.d.s.b.aggregator.StateProcessFunction.addEvent(StateProcessFunction.java:81)
    at c.cs.w.d.s.b.a.StateProcessFunction.processElement(StateProcessFunction.java:113)
    at c.c.w.d.s.b.a.StateProcessFunction.processElement(ContactStateProcessFunction.java:26)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

问题是何时以及为何启动 flink 后端 statestore 文件删除?日志中是否有未打印的内容?

在更新的 Flink 版本中,该消息已被改写为更清晰:

Closed RocksDB State Backend. Cleaning up RocksDB working directory {}.

发生在RocksDBKeyedStateBackend.dispose(),Flink 关闭时调用。 Flink 仅依赖于 RocksDB 的瞬态。从故障重启后,新的 RocksDB 实例从最近的检查点创建。 (尽管如果您使用本地恢复,情况会稍微复杂一些。)

您使用的是什么版本的 Flink?增量检查点还是完整检查点?本地恢复?也许您遇到了一个错误,此错误已被修复。

发生这种情况是因为并发。 mapstate.get() 抛出异常,有时当 rocksdbbackend 异步线程删除过期数据时它会锁定文件,同时如果你尝试获取它会抛出异常导致 tm 重新启动。