使用 Python SDK 在 Spark 上使用 运行 Apache Beam wordcount 管道时并行度低

Low parallelism when running Apache Beam wordcount pipeline on Spark with Python SDK

我对 Spark 集群配置和 运行宁 Pyspark 管道非常有经验,但我才刚刚开始使用 Beam。因此,我正在尝试在 Spark PortableRunner(运行ning 在同一个小型 Spark 集群之上,对 Pyspark 和 Beam python SDK 进行比较,4 个工人,每个工人有 4 个内核和 8GB 内存),我已经为一个相当大的数据集确定了一个字数统计工作,将结果存储在 Parquet table.

我因此下载了 50GB 的维基百科文本文件,分成大约 100 个未压缩的文件,并将它们存储在目录 /mnt/nfs_drive/wiki_files/ 中(/mnt/nfs_drive 是安装在所有工作人员上的 NFS 驱动器)。

首先,我运行正在使用以下 Pyspark wordcount 脚本:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')

脚本 运行 非常好,可以在大约 8 分钟内将 parquet 文件输出到所需位置。主要阶段(读取和拆分令牌)被划分为合理数量的任务,以便高效地使用集群:

我现在正尝试使用 Beam 和 portable 运行ner 实现相同的效果。首先,我使用以下命令启动了 Spark 作业服务器(在 Spark 主节点上):

docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077

然后,在主节点和工作节点上,我运行如下设置 SDK Harness:

 docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool

现在 Spark 集群已设置为 运行 Beam 管道,我可以提交以下脚本:

import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000",
    "--job_name=WordCountBeam"
])


wiki_files = '/mnt/nfs_drive/wiki_files/*'

p = beam.Pipeline(options=options)
beam_counts = (
    p
    | fileio.MatchFiles(wiki_files)
    | beam.Map(lambda x: x.path)
    | beam.io.ReadAllFromText()
    | 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
    | beam.combiners.Count.PerElement()
    | beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)


_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
      pyarrow.schema(
          [('word', pyarrow.binary()), ('count', pyarrow.int64())]
      )
)

result = p.run().wait_until_finish()

代码提交成功,我可以在Spark上看到作业UI并且worker正在执行。但是,即使离开 运行ning 超过 1 小时,它也不会产生任何输出!

因此我想确保我的设置没有问题,所以我 运行 在较小的数据集(只有 1 个 Wiki 文件)上使用了完全相同的脚本。这在大约 3.5 分钟内成功完成(同一数据集上的 Spark wordcount 需要 16 秒!)。

我想知道 Beam 怎么会那么 慢,所以我开始查看 Beam 管道通过作业服务器提交给 Spark 的 DAG。我注意到 Spark 作业大部分时间都在以下阶段:

这只是分为 2 个任务,如下所示:

打印调试行显示此任务是执行“繁重任务”(即从 wiki 文件中读取行和拆分标记)的地方 - 但是,由于这仅发生在 2 个任务中,因此将分配工作最多2个工人。同样有趣的是,运行ning 在 50GB 的大型数据集上 完全 相同的 DAG,完全 相同数量的任务。

我不确定如何进行下一步。似乎 Beam 管道降低了并行度,但我不确定这是否是由于作业服务器对管道的次优转换,或者我是否应该以其他方式指定我的 PTransforms 以增加 Spark 上的并行度.

感谢任何建议!

管道的文件IO部分可以通过使用apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*')来简化。

Fusion 是可能阻止并行性的另一个原因。解决方案是在读入所有文件后输入 apache_beam.transforms.util.Reshuffle

花了一段时间,但我弄清楚了问题所在和解决方法。

根本问题出在 Beam 的便携式运行器中,特别是 Beam 作业转换为 Spark 作业的地方。

翻译代码(由作业服务器执行)根据对 sparkContext().defaultParallelism() 的调用将阶段拆分为任务。作业服务器没有明确配置默认并行度(并且不允许用户通过管道选项设置它),因此它回退,理论上,根据数字配置并行度执行者(请参阅此处的解释 https://spark.apache.org/docs/latest/configuration.html#execution-behavior)。这个似乎是调用defaultParallelism()时翻译代码的目标。

现在,在实践中,众所周知,当依赖回退机制时,过早调用 sparkContext().defaultParallelism() 会导致数量低于预期,因为执行者可能还没有在上下文中注册。特别是,过早调用 defaultParallelism() 将得到 2 个结果,并且阶段将仅分为 2 个任务。

因此,我的“肮脏黑客”解决方法包括修改作业服务器的源代码,方法是在实例化 SparkContext 之后和执行任何其他操作之前简单地添加 3 秒的延迟:

$ git diff                                                                                                                                                                                                                                                                                                                         v2.25.0
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index aa12192..faaa4d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -95,7 +95,13 @@ public final class SparkContextFactory {
       conf.setAppName(contextOptions.getAppName());
       // register immutable collections serializers because the SDK uses them.
       conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
-      return new JavaSparkContext(conf);
+      JavaSparkContext jsc = new JavaSparkContext(conf);
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+      }
+      return jsc;
     }
   }
 }

重新编译作业服务器并使用此更改启动它后,对 defaultParallelism() 的所有调用都已完成 执行程序注册后,阶段很好地划分在 16 个任务中(与执行者的数量相同)。正如预期的那样,现在工作完成得更快,因为有更多的工作人员在做这项工作(但它仍然比纯 Spark 字数慢 3 倍)。

虽然这可行,但这当然不是一个很好的解决方案。更好的解决方案是以下之一:

  • 更改翻译引擎,使其能够以更稳健的方式根据可用执行程序的数量推断出任务数量;
  • 允许用户通过管道选项配置作业服务器用于翻译作业的默认并行度(这是 Flink 便携式运行器所做的)。

在出现更好的解决方案之前,它显然会阻止在生产集群中使用 Beam Spark 作业服务器。我会 post 将问题提交给 Beam 的票务队列,以便可以实施更好的解决方案(希望很快)。