为什么 foreachPartition 对于流式数据集会出错?
Why does foreachPartition error out for streaming datasets?
我正在从 Spark Streaming 迁移到 Structured Streaming,我遇到了以下代码的问题:
def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
inputDataset.foreachPartition { partitionIterator =>
val filteredIterator = partitionIterator.filter(foobar.filter)
...
...
}
}
val streamingQuery = inputDataset
.writeStream
.trigger(ProcessingTime("5 seconds"))
.outputMode("append")
.format("console")
.start
错误如下 AnalysisException
:
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
foreachPartition
不支持流式查询吗?在这种情况下,writeStream.foreach
是实现 foreachPartition
的唯一方法吗?
我想避免在每个事件到来时发送它,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批处理中有 1000 个事件和 5 个分区,则在每个请求正文中并行生成 5 个请求和 200 个事件。
TL;DR 是的。 foreachPartition
操作不受支持,您应该改用 ForeachWriter。
引用 foreachPartition 的 scaladoc:
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function f
to each partition of this Dataset.
您现在可能已经发现,foreach
是一个动作,因此会触发 Spark 执行。
由于您使用流数据集,因此不允许使用 foreach
.
等“传统”方法触发它们的执行
引用结构化流的 Unsupported Operations:
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
流媒体替代方案之一是 foreach
运算符(又名接收器)。这就是 foreachPartition
在 Structured Streaming 中的做法。
引用 Using Foreach:
The foreach operation allows arbitrary operations to be computed on the output data.
To use this, you will have to implement the interface ForeachWriter
, which has methods that get called whenever there is a sequence of rows generated as output after a trigger.
I'd like to avoid sending each event as it comes, but rather accumulate all rows, form one giant POST request body and send it to a HTTP endpoint. So if 1000 events in a batch and 5 partitions, generate 5 requests in parallel with 200 events in each request body.
这看起来像是将数据集写入接收器之前的聚合,不是吗?使用 groupBy
运算符和 collect_list
函数对行进行分组,因此当您 writeStream
时,您将拥有任意数量的组。
除非别无他法,否则我宁愿避免处理称为分区的 RDD 的这种低级特性作为优化写入的一种方式。
我正在从 Spark Streaming 迁移到 Structured Streaming,我遇到了以下代码的问题:
def processDataSet(inputDataset: Dataset[MyMessage], foobar: FooBar) = {
inputDataset.foreachPartition { partitionIterator =>
val filteredIterator = partitionIterator.filter(foobar.filter)
...
...
}
}
val streamingQuery = inputDataset
.writeStream
.trigger(ProcessingTime("5 seconds"))
.outputMode("append")
.format("console")
.start
错误如下 AnalysisException
:
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
foreachPartition
不支持流式查询吗?在这种情况下,writeStream.foreach
是实现 foreachPartition
的唯一方法吗?
我想避免在每个事件到来时发送它,而是累积所有行,形成一个巨大的 POST 请求正文并将其发送到 HTTP 端点。因此,如果一个批处理中有 1000 个事件和 5 个分区,则在每个请求正文中并行生成 5 个请求和 200 个事件。
TL;DR 是的。 foreachPartition
操作不受支持,您应该改用 ForeachWriter。
引用 foreachPartition 的 scaladoc:
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit Applies a function
f
to each partition of this Dataset.
您现在可能已经发现,foreach
是一个动作,因此会触发 Spark 执行。
由于您使用流数据集,因此不允许使用 foreach
.
引用结构化流的 Unsupported Operations:
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
流媒体替代方案之一是 foreach
运算符(又名接收器)。这就是 foreachPartition
在 Structured Streaming 中的做法。
引用 Using Foreach:
The foreach operation allows arbitrary operations to be computed on the output data.
To use this, you will have to implement the interface
ForeachWriter
, which has methods that get called whenever there is a sequence of rows generated as output after a trigger.
I'd like to avoid sending each event as it comes, but rather accumulate all rows, form one giant POST request body and send it to a HTTP endpoint. So if 1000 events in a batch and 5 partitions, generate 5 requests in parallel with 200 events in each request body.
这看起来像是将数据集写入接收器之前的聚合,不是吗?使用 groupBy
运算符和 collect_list
函数对行进行分组,因此当您 writeStream
时,您将拥有任意数量的组。
除非别无他法,否则我宁愿避免处理称为分区的 RDD 的这种低级特性作为优化写入的一种方式。