性能差异 map() 与 withColumn()

Performance difference map() vs withColumn()

我有一个包含 100 多列的 table。我需要从某些列中删除双引号。我找到了 2 种方法,使用 withColumn() 和 map()

使用 withColumn()

cols_to_fix = ["col1", ..., "col20"]
for col in cols_to_fix:
    df = df.withColumn(col, regexp_replace(df[col], "\"", ""))

使用地图()

def remove_quotes(row: Row) -> Row:
    row_as_dict = row.asDict()
    cols_to_fix = ["col1", ..., "col20"]
    for column in cols_to_fix:
        if row_as_dict[column]:
            row_as_dict[column] = re.sub("\"", "", str(row_as_dict[column]))
    return Row(**row_as_dict)
 
df = df.rdd.map(remove_quotes).toDF(df.schema)

这是我的问题。我发现在具有约 2500 万条记录的 table 上使用 map() 比使用 withColumn() 花费的时间大约长 4 倍。如果任何堆栈溢出用户可以解释性能差异的原因,我将非常感激,这样我以后就可以避免类似的陷阱。

首先提一个建议:不要将DataFrame转RDD,直接做df.map(your function here),这样可以节省很多时间。 下一页 https://dzone.com/articles/apache-spark-3-reasons-why-you-should-not-use-rdds 会节省我们很多时间,它的主要结论是 RDD 比 DataFrame/Dataset 慢得多,更不用说从 DataFrame 到 RDD 的转换所用的时间了。

让我们来谈谈 map 和 withColumn,现在 DataFrame 到 RDD 之间没有任何转换。

首先得出结论:map 通常比 withColumn 慢 5 倍。原因是 map 操作总是涉及反序列化和序列化,而 withColumn 可以对感兴趣的列进行操作。 具体来说,map 操作应该将 Row 反序列化为操作将承载的几个部分,

这里有一个例子: 假设我们有一个看起来像

的 DataFrame
+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

然后我们想把users_count列的所有值都加1,我们可以这样做:

df.map(row => {
  val usersCount = row.getInt(1) + 1
  (row.getString(0), usersCount)
}).toDF("language", "users_count_incremented_by_1")

在上面的代码中,我们首先需要反序列化每一行以提取第2列中的值,然后将修改后的值输出并保存为DataFrame(这一步需要序列化(a,b)到 Row(a, b) 中,因为 DataFrame 只是行的数据集)。 有关更详细的解释,请查看以下优秀文章 https://medium.com/@fqaiser94/udfs-vs-map-vs-custom-spark-native-functions-91ab2c154b44

map不能对列本身进行操作,必须对列的值进行操作,获取值需要反序列化,保存为DataFrame需要序列化。

但是 map 还是有很大用处的:在 map 方法的帮助下人们可以实现非常复杂的操作,而如果我们只使用 withColumn.[=19= 就可以完成 built-in 操作]

总而言之,map速度较慢但更灵活,withColumn肯定是最有效的,但功能有限