此查询不支持从检查点位置恢复。删除 checkpoint/testmemeory/offsets 重新开始
This query does not support recovering from checkpoint location. Delete checkpoint/testmemeory/offsets to start over
我已经在 Spark 中创建了 In-Memory 表,并尝试在失败后重新启动 Spark 结构化流作业。它得到 "This query does not support recovering from checkpoint location. Delete checkpoint/TEST_IN_MEMORY/offsets to start over."
Checkpoint In-Memory sink 是什么概念?有什么办法可以纠正吗? (我们可以动态删除旧的和新的检查点吗?)
我正在使用 Data Stax 5.1.6 集群,所以我没有选择,我只能使用 Spark 2.0.2 版本。
val kafkaDataFrame_inmemory = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "Localhost:9092")
.option("subscribe", "TEST_IN_MOEMORY")
.option("startingOffsets", "earliest")
.load()
val checkpoint = "C/Users/756661/Desktop/KT DOCS/spark/In_MEM_TABLE"+ UUID.randomUUID.toString
kafkaDataFrame_inmemory
.writeStream
.format("memory")
.option("truncate", false)
.queryName("IN_MEM_TABLE")
.outputMode("update")
.option("checkpointLocation",checkpoint)
.start()
您只需从代码中删除行 .option("checkpointLocation",checkpoint)
并重新开始。
根据错误消息,内存数据源不支持从检查点位置恢复。任何尝试(重新)启动使用 memory
格式且目录已存在的流式查询都将失败。
org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete xxx/offsets to start over.;
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:267)
... 49 elided
并不是说内存数据源不会使用检查点目录。会的,但是会是随机生成的名字。
can we delete the old and the new checkpoint dynamically?
当然,这是启动流式查询的唯一方法。
我已经在 Spark 中创建了 In-Memory 表,并尝试在失败后重新启动 Spark 结构化流作业。它得到 "This query does not support recovering from checkpoint location. Delete checkpoint/TEST_IN_MEMORY/offsets to start over."
Checkpoint In-Memory sink 是什么概念?有什么办法可以纠正吗? (我们可以动态删除旧的和新的检查点吗?)
我正在使用 Data Stax 5.1.6 集群,所以我没有选择,我只能使用 Spark 2.0.2 版本。
val kafkaDataFrame_inmemory = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "Localhost:9092")
.option("subscribe", "TEST_IN_MOEMORY")
.option("startingOffsets", "earliest")
.load()
val checkpoint = "C/Users/756661/Desktop/KT DOCS/spark/In_MEM_TABLE"+ UUID.randomUUID.toString
kafkaDataFrame_inmemory
.writeStream
.format("memory")
.option("truncate", false)
.queryName("IN_MEM_TABLE")
.outputMode("update")
.option("checkpointLocation",checkpoint)
.start()
您只需从代码中删除行 .option("checkpointLocation",checkpoint)
并重新开始。
根据错误消息,内存数据源不支持从检查点位置恢复。任何尝试(重新)启动使用 memory
格式且目录已存在的流式查询都将失败。
org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete xxx/offsets to start over.;
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:267)
... 49 elided
并不是说内存数据源不会使用检查点目录。会的,但是会是随机生成的名字。
can we delete the old and the new checkpoint dynamically?
当然,这是启动流式查询的唯一方法。