有什么方法可以提高 PySpark 输出的效率吗?

Is there any way to increase the efficiency of PySpark outputs?

我正在尝试测试 PySpark 迭代某些非常大(10 GB 到 1 TB)数据的能力。对于大多数脚本,我发现 PySpark 的效率与 Scala 代码大致相同。在其他情况下(如下面的代码),我会遇到严重的速度问题,速度会慢 10 到 12 倍。

path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext   

df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))\
 .flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))\
 .filter(lambda r: r[-1] != None)\
 .countByValue()

print([((x[0], x[1], x[2]), y) for x, y in rddx.items()]) 

我们认为我们已经将问题隔离到 .countByValue()(returns 一个 defaultdict),但是应用 countItems() 或 reduceByKey() 会产生几乎相同的结果。我们也 99% 确定问题不在于 ExtractDomain 或 CreateAVertexFromSourceUrlAndContent(不是函数的真实名称,只是伪代码以使其易于理解)。

所以我的问题是第一个

  1. 这段代码中有什么我可以做的来减少时间吗?
  2. PySpark 是否从根本上比它的 Scala 慢得多 对方?
  3. 有没有办法复制平面图 改为使用 PySpark 数据帧(了解数据帧是 通常比 Pyspark 中的 RDD 快)?

这里最大的问题可能是通信 - Spark SQL(柱状格式)-> 普通 Scala 对象 -> pickle (Pyrolite) -> 套接字 -> unpickle -> 普通 Python 对象。这是大量的复制、转换和移动内容。

there a way to replicate the flatmap using PySpark dataframes instead

是的。它被称为 explode - 但公平地说它也很慢。

understanding that dataframes are generally faster than RDD in Pyspark

这通常是正确的(Scala 和 Python 两者),但您可能需要 udf 来实现 ExtractDomainCreateAVertexFromSourceUrlAndContent - 这是另一件缓慢的事情。仅根据您可能可以使用的名称 parse_url_tuple.

Is PySpark fundamentally that much slower than its Scala counterpart?

有点慢。通常在调整良好的代码上不会那么慢。但是实现细节不同——Scala 和 Python 中的同一组操作可以用不同的方式实现。

is there anything in this code that I can do to reduce the time?

我建议先分析。一旦确定哪个部分负责(转换、合并),您就可以尝试将其作为目标。