理解 Spark 结构化流并行
Understanding Spark Structured Streaming Parallelism
我是 Spark 世界的新手,并且在一些概念上苦苦挣扎。
使用来自 Kafka 的 Spark Structured Streaming 采购时如何实现并行性?
让我们考虑以下片段代码:
SparkSession spark = SparkSession
.builder()
.appName("myApp")
.getOrCreate();
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
...
gDataset = ds.groupByKey(...)
pDataset = gDataset.mapGroupsWithState(
...
/* process each key - values */
loop values
if value is valid - save key/value result in the HDFS
...
)
StreamingQuery query = pDataset.writeStream()
.outputMode("update")
.format("console")
.start();
//await
query.awaitTermination();
我读到并行度与数据分区数有关,Dataset 的分区数基于 spark.sql.shuffle.partitions
参数。
对于每一批(从Kafka拉取),拉取的项目是否会分配给spark.sql.shuffle.partitions
个数?例如,spark.sql.shuffle.partitions=5
和 Batch1=100
行,我们最终会得到 5 个分区,每个分区有 20 行吗?
考虑到提供的代码片段,由于 groupByKey
后跟 mapGroups/mapGroupsWithState
函数,我们是否仍然利用 Spark 并行性?
更新:
在 gDataset.mapGroupsWithState
内部是我处理每个 key/values 并将结果存储在 HDFS 中的地方。因此,输出接收器仅用于在控制台中输出一些统计信息。
For every Batch (pull from the Kafka), will the pulled items be
divided among the number of spark.sql.shuffle.partitions
?
一旦到达groupByKey
,即洗牌边界,它们将被分开。刚开始取数据的时候,分区数会等于Kafka分区数
Considering the snippet code provided, do we still leverage in the
Spark parallelism due to the groupByKey followed by a
mapGroups/mapGroupsWithState functions
通常是的,但这也取决于您如何设置 Kafka 主题。虽然从代码中看不到,但 Spark 会在内部将不同阶段的数据拆分为更小的任务,并将它们分发给集群中可用的执行器。如果您的 Kafka 主题只有 1 个分区,这意味着在 groupByKey
之前,您的内部流将包含一个分区,该分区不会被并行化,而是在单个执行器上执行。只要您的 Kafka 分区数大于 1,您的处理就会并行。在混洗边界之后,Spark 将重新分区数据以包含 spark.sql.shuffle.partitions
.
指定的分区数量
我是 Spark 世界的新手,并且在一些概念上苦苦挣扎。
使用来自 Kafka 的 Spark Structured Streaming 采购时如何实现并行性?
让我们考虑以下片段代码:
SparkSession spark = SparkSession
.builder()
.appName("myApp")
.getOrCreate();
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
...
gDataset = ds.groupByKey(...)
pDataset = gDataset.mapGroupsWithState(
...
/* process each key - values */
loop values
if value is valid - save key/value result in the HDFS
...
)
StreamingQuery query = pDataset.writeStream()
.outputMode("update")
.format("console")
.start();
//await
query.awaitTermination();
我读到并行度与数据分区数有关,Dataset 的分区数基于 spark.sql.shuffle.partitions
参数。
对于每一批(从Kafka拉取),拉取的项目是否会分配给
spark.sql.shuffle.partitions
个数?例如,spark.sql.shuffle.partitions=5
和Batch1=100
行,我们最终会得到 5 个分区,每个分区有 20 行吗?考虑到提供的代码片段,由于
groupByKey
后跟mapGroups/mapGroupsWithState
函数,我们是否仍然利用 Spark 并行性?
更新:
在 gDataset.mapGroupsWithState
内部是我处理每个 key/values 并将结果存储在 HDFS 中的地方。因此,输出接收器仅用于在控制台中输出一些统计信息。
For every Batch (pull from the Kafka), will the pulled items be divided among the number of
spark.sql.shuffle.partitions
?
一旦到达groupByKey
,即洗牌边界,它们将被分开。刚开始取数据的时候,分区数会等于Kafka分区数
Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions
通常是的,但这也取决于您如何设置 Kafka 主题。虽然从代码中看不到,但 Spark 会在内部将不同阶段的数据拆分为更小的任务,并将它们分发给集群中可用的执行器。如果您的 Kafka 主题只有 1 个分区,这意味着在 groupByKey
之前,您的内部流将包含一个分区,该分区不会被并行化,而是在单个执行器上执行。只要您的 Kafka 分区数大于 1,您的处理就会并行。在混洗边界之后,Spark 将重新分区数据以包含 spark.sql.shuffle.partitions
.