何时(如果有的话)在失败的情况下修改流式查询的检查点元数据?
When (if ever) to modify checkpointed metadata of streaming query in case of failure?
我对 Spark 检查点有疑问。我有 spark 流应用程序,我正在使用以下方法管理 Checkpoint n HDFS :-
val checkpointDirectory = "hdfs://192.168.0.1:8020/markingChecksPoints"
df.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF
.write
.cassandraFormat(
"table",
"keyspace",
"clustername"
)
.mode(SaveMode.Append)
.save()
}
.outputMode(OutputMode.Append())
.option("checkpointLocation", checkpointDirectory)
}
现在当我 运行 应用程序时,在检查点目录中我有 4 个文件夹:
commits
offsets
metadata
sources
在偏移量文件夹中-我有每个消耗的偏移量的文件,就像这样
v1
{"batchWatermarkMs":0,"batchTimestampMs":1574680172097,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"30"}}
{"datatopicname":{"23":441210,"8":3094007,"17":44862,"26":0,"11":4302147,"29":0,"2":3758094,"20":6273,"5":4620156,"14":15375428,"4":4511998,"13":10652363,"22":1247616,"7":1787900,"16":1239584,"25":0,"10":3441724,"1":1808759,"28":0,"19":4123,"27":0,"9":3293762,"18":68,"12":4439364,"3":5910468,"21":182,"15":13510271,"6":2510314,"24":0,"0":40337}}
所以,现在我的查询是在失败或任何其他情况下如何修改我的目录,以便在应用程序重新启动时它应该从那个特定点开始?
我知道每当我们重新启动应用程序时,它都会自动从检查点中选择合适的检查点,但以防万一我想从任何特定值或更改启动它。那我该怎么办呢?
- 我可以简单地编辑这个 "offsets" 最后创建的文件吗?
- 首先删除检查点目录并使用自定义检查点重新启动应用程序运行,以便从那以后创建新的检查点目录。
checkpointLocation
目录中可能有更多文件(如有状态运算符的 state
),但它们的作用正是您所要求的 - 以防流处理引擎出现故障Spark Structured Streaming 将根据检查点元数据恢复流式查询。
由于这些文件是内部文件,因此不建议以任何方式修改这些文件。
不过您可以(这也许是它们易于阅读的原因之一)。无论您是更改现有的偏移文件还是从头开始创建它们都无关紧要。引擎看不出有什么区别。如果文件格式正确,它们将被使用。否则,检查点位置将没有用。
我宁愿使用特定于数据源的选项,例如startingOffsets
kafka
数据源从特定偏移量(重新)开始。
我对 Spark 检查点有疑问。我有 spark 流应用程序,我正在使用以下方法管理 Checkpoint n HDFS :-
val checkpointDirectory = "hdfs://192.168.0.1:8020/markingChecksPoints"
df.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF
.write
.cassandraFormat(
"table",
"keyspace",
"clustername"
)
.mode(SaveMode.Append)
.save()
}
.outputMode(OutputMode.Append())
.option("checkpointLocation", checkpointDirectory)
}
现在当我 运行 应用程序时,在检查点目录中我有 4 个文件夹:
commits
offsets
metadata
sources
在偏移量文件夹中-我有每个消耗的偏移量的文件,就像这样
v1
{"batchWatermarkMs":0,"batchTimestampMs":1574680172097,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"30"}}
{"datatopicname":{"23":441210,"8":3094007,"17":44862,"26":0,"11":4302147,"29":0,"2":3758094,"20":6273,"5":4620156,"14":15375428,"4":4511998,"13":10652363,"22":1247616,"7":1787900,"16":1239584,"25":0,"10":3441724,"1":1808759,"28":0,"19":4123,"27":0,"9":3293762,"18":68,"12":4439364,"3":5910468,"21":182,"15":13510271,"6":2510314,"24":0,"0":40337}}
所以,现在我的查询是在失败或任何其他情况下如何修改我的目录,以便在应用程序重新启动时它应该从那个特定点开始?
我知道每当我们重新启动应用程序时,它都会自动从检查点中选择合适的检查点,但以防万一我想从任何特定值或更改启动它。那我该怎么办呢?
- 我可以简单地编辑这个 "offsets" 最后创建的文件吗?
- 首先删除检查点目录并使用自定义检查点重新启动应用程序运行,以便从那以后创建新的检查点目录。
checkpointLocation
目录中可能有更多文件(如有状态运算符的 state
),但它们的作用正是您所要求的 - 以防流处理引擎出现故障Spark Structured Streaming 将根据检查点元数据恢复流式查询。
由于这些文件是内部文件,因此不建议以任何方式修改这些文件。
不过您可以(这也许是它们易于阅读的原因之一)。无论您是更改现有的偏移文件还是从头开始创建它们都无关紧要。引擎看不出有什么区别。如果文件格式正确,它们将被使用。否则,检查点位置将没有用。
我宁愿使用特定于数据源的选项,例如startingOffsets
kafka
数据源从特定偏移量(重新)开始。