从 HDFS 源流式传输时如何 运行 多个批次?
How to run multiple batches when streaming from HDFS source?
我有一个这样的数据集val df = spark.readStream.schema(s).parquet ("/path/to/file").where("Foo > 0").groupBy("bar").agg(expr("sum(Foo)"))
。数据集有超过 100 万条记录,Parquet 文件包含 1 个分区。
我从 df.writeStream.outputMode("update").format("console").start
开始直播。
然后 Spark 一次处理整个文件。但是我希望 Spark 在更新结果的同时 "splits" 文件和一次处理每个拆分的方式,就像我输入新单词时更新结果的单词计数示例一样。
我尝试添加 trigger(Trigger.ProcessingTime("x seconds"))
但没有成功。
Then Spark processes the entire file at once. But I expect that Spark some how "splits" the file and processes each split at a time while updating the result, just like the words count example updating result when I input a new word.
这就是 Spark Structured Streaming 处理文件的方式。它会立即处理它们,并且再也不会考虑它们。它 "split" 将文件分成几部分(好吧,那应该在存储的手中,例如 HDFS,实际上不是 Spark 本身),但它是在幕后这样做的。
请注意,一旦文件被处理,该文件将永远不会被再次处理。
I tried adding trigger(Trigger.ProcessingTime("x seconds"))
but it didn't work.
好吧,它做到了,但不是你想要的。
DataStreamWriter.trigger sets the trigger for the stream query. The default value is ProcessingTime(0) and it will run the query as fast as possible.
查阅 DataStreamWriter 的 scaladoc。
我有一个这样的数据集val df = spark.readStream.schema(s).parquet ("/path/to/file").where("Foo > 0").groupBy("bar").agg(expr("sum(Foo)"))
。数据集有超过 100 万条记录,Parquet 文件包含 1 个分区。
我从 df.writeStream.outputMode("update").format("console").start
开始直播。
然后 Spark 一次处理整个文件。但是我希望 Spark 在更新结果的同时 "splits" 文件和一次处理每个拆分的方式,就像我输入新单词时更新结果的单词计数示例一样。
我尝试添加 trigger(Trigger.ProcessingTime("x seconds"))
但没有成功。
Then Spark processes the entire file at once. But I expect that Spark some how "splits" the file and processes each split at a time while updating the result, just like the words count example updating result when I input a new word.
这就是 Spark Structured Streaming 处理文件的方式。它会立即处理它们,并且再也不会考虑它们。它 "split" 将文件分成几部分(好吧,那应该在存储的手中,例如 HDFS,实际上不是 Spark 本身),但它是在幕后这样做的。
请注意,一旦文件被处理,该文件将永远不会被再次处理。
I tried adding
trigger(Trigger.ProcessingTime("x seconds"))
but it didn't work.
好吧,它做到了,但不是你想要的。
DataStreamWriter.trigger sets the trigger for the stream query. The default value is ProcessingTime(0) and it will run the query as fast as possible.
查阅 DataStreamWriter 的 scaladoc。