如何使用 Spark Structured Streaming 持续监控目录

How to continuously monitor a directory by using Spark Structured Streaming

我希望 spark 持续监视目录并在文件出现在该目录中时使用 spark.readStream 读取 CSV 文件。

请不要包含 Spark Streaming 的解决方案。我正在寻找一种通过使用 spark 结构化流来实现它的方法。

正如官方所写 documentation 你应该使用 "file" 来源:

File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

取自文档的代码示例:

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

如果不指定trigger,Spark会尽快读取新文件

这是此用例的完整解决方案:

如果您 运行 处于单机模式。您可以增加驱动程序内存:

bin/spark-shell --driver-memory 4G

无需设置执行器内存,因为在独立模式下执行器在驱动程序中运行。

作为完成@T.Gaweda的解决方案,找到以下解决方案:

val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

csvDf.writeStream.format("console").option("truncate","false").start()

现在 spark 将持续监控指定的目录,只要您在该目录中添加任何 csv 文件,您的 DataFrame 操作 "csvDF" 就会对该文件执行。

注意:如果你想让spark推断schema你必须先设置如下配置:

spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true")

spark 是您的 spark 会话。