何时(如果有的话)在失败的情况下修改流式查询的检查点元数据?

When (if ever) to modify checkpointed metadata of streaming query in case of failure?

我对 Spark 检查点有疑问。我有 spark 流应用程序,我正在使用以下方法管理 Checkpoint n HDFS :-

 val checkpointDirectory = "hdfs://192.168.0.1:8020/markingChecksPoints"
  df.writeStream
        .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
          batchDF
            .write
            .cassandraFormat(
              "table",
              "keyspace",
              "clustername"
            )
            .mode(SaveMode.Append)
            .save()
        }
        .outputMode(OutputMode.Append())
        .option("checkpointLocation", checkpointDirectory)
    }

现在当我 运行 应用程序时,在检查点目录中我有 4 个文件夹:

在偏移量文件夹中-我有每个消耗的偏移量的文件,就像这样

v1
{"batchWatermarkMs":0,"batchTimestampMs":1574680172097,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"30"}}
{"datatopicname":{"23":441210,"8":3094007,"17":44862,"26":0,"11":4302147,"29":0,"2":3758094,"20":6273,"5":4620156,"14":15375428,"4":4511998,"13":10652363,"22":1247616,"7":1787900,"16":1239584,"25":0,"10":3441724,"1":1808759,"28":0,"19":4123,"27":0,"9":3293762,"18":68,"12":4439364,"3":5910468,"21":182,"15":13510271,"6":2510314,"24":0,"0":40337}}

所以,现在我的查询是在失败或任何其他情况下如何修改我的目录,以便在应用程序重新启动时它应该从那个特定点开始?

我知道每当我们重新启动应用程序时,它都会自动从检查点中选择合适的检查点,但以防万一我想从任何特定值或更改启动它。那我该怎么办呢?

checkpointLocation 目录中可能有更多文件(如有状态运算符的 state),但它们的作用正是您所要求的 - 以防流处理引擎出现故障Spark Structured Streaming 将根据检查点元数据恢复流式查询。

由于这些文件是内部文件,因此不建议以任何方式修改这些文件。

不过您可以(这也许是它们易于阅读的原因之一)。无论您是更改现有的偏移文件还是从头开始创建它们都无关紧要。引擎看不出有什么区别。如果文件格式正确,它们将被使用。否则,检查点位置将没有用。

我宁愿使用特定于数据源的选项,例如startingOffsets kafka 数据源从特定偏移量(重新)开始。