为什么 foreachPartition 对于流式数据集会出错?

Why does foreachPartition error out for streaming datasets?

我正在从 Spark Streaming 迁移到 Structured Streaming,我遇到了以下代码的问题:

def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
    inputDataset.foreachPartition { partitionIterator =>
      val filteredIterator = partitionIterator.filter(foobar.filter)
      ...
      ...
    }
}       
val streamingQuery = inputDataset
  .writeStream
  .trigger(ProcessingTime("5 seconds"))
  .outputMode("append")
  .format("console")
  .start

错误如下 AnalysisException:

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

foreachPartition 不支持流式查询吗?在这种情况下,writeStream.foreach 是实现 foreachPartition 的唯一方法吗?

我想避免在每个事件到来时发送它,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批处理中有 1000 个事件和 5 个分区,则在每个请求正文中并行生成 5 个请求和 200 个事件。

TL;DR 是的。 foreachPartition 操作不受支持,您应该改用 ForeachWriter

引用 foreachPartition 的 scaladoc:

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f to each partition of this Dataset.

您现在可能已经发现,foreach 是一个动作,因此会触发 Spark 执行。

由于您使用流数据集,因此不允许使用 foreach.

等“传统”方法触发它们的执行

引用结构化流的 Unsupported Operations:

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

流媒体替代方案之一是 foreach 运算符(又名接收器)。这就是 foreachPartition 在 Structured Streaming 中的做法。

引用 Using Foreach:

The foreach operation allows arbitrary operations to be computed on the output data.

To use this, you will have to implement the interface ForeachWriter, which has methods that get called whenever there is a sequence of rows generated as output after a trigger.


I'd like to avoid sending each event as it comes, but rather accumulate all rows, form one giant POST request body and send it to a HTTP endpoint. So if 1000 events in a batch and 5 partitions, generate 5 requests in parallel with 200 events in each request body.

这看起来像是将数据集写入接收器之前的聚合,不是吗?使用 groupBy 运算符和 collect_list 函数对行进行分组,因此当您 writeStream 时,您将拥有任意数量的组。

除非别无他法,否则我宁愿避免处理称为分区的 RDD 的这种低级特性作为优化写入的一种方式。