如何从检查点数据重新启动 pyspark 流式查询?

How to restart pyspark streaming query from checkpoint data?

我正在使用 pyspark 2.2.0 创建一个 spark 流应用程序

我可以创建流式查询

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
      .builder \
      .appName("StreamingApp") \
      .getOrCreate()

staticDataFrame = spark.read.format("parquet")\
.option("inferSchema","true").load("processed/Nov18/")

staticSchema = staticDataFrame.schema
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("parquet")\
.load("processed/Nov18/")

daily_trs=streamingDataFrame.select("shift","date","time")
.groupBy("date","shift")\
.count("shift")

writer = df.writeStream\
   .format("parquet")\
   .option("path","data")\
   .option("checkpointLocation","data/checkpoints")\
   .queryName("streamingData")\
   .outputMode("append")

query = writer.start()
query.awaitTermination()

查询正在流式传输,任何附加到 "processed/Nov18" 的文件都将被处理并存储到 "data/"

如果流式传输失败我想重新启动相同的查询

解决路径

  1. 根据官方文档,我可以获得一个可用于重新启动查询的id https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html?highlight=streamingquery#pyspark.sql.streaming.StreamingQuery.id

  2. pyspark.streaming 模块包含具有 class 方法

    的 StreamingContext class

    class方法 getActiveOrCreate(checkpointPath, setupFunc) https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.getOrCreate

这些方法可以用在什么地方吗?

如果有人有任何生产就绪流媒体应用程序的用例可供参考?

您应该使用可用的检查点目录简单地(重新)启动 pyspark 应用程序,其余的由 Spark Structured Streaming 完成。无需更改。

If anyone has any use case of production ready streaming app for reference ?

我会在 Spark users mailing list 上问。