性能差异 map() 与 withColumn()
Performance difference map() vs withColumn()
我有一个包含 100 多列的 table。我需要从某些列中删除双引号。我找到了 2 种方法,使用 withColumn() 和 map()
使用 withColumn()
cols_to_fix = ["col1", ..., "col20"]
for col in cols_to_fix:
df = df.withColumn(col, regexp_replace(df[col], "\"", ""))
使用地图()
def remove_quotes(row: Row) -> Row:
row_as_dict = row.asDict()
cols_to_fix = ["col1", ..., "col20"]
for column in cols_to_fix:
if row_as_dict[column]:
row_as_dict[column] = re.sub("\"", "", str(row_as_dict[column]))
return Row(**row_as_dict)
df = df.rdd.map(remove_quotes).toDF(df.schema)
这是我的问题。我发现在具有约 2500 万条记录的 table 上使用 map() 比使用 withColumn() 花费的时间大约长 4 倍。如果任何堆栈溢出用户可以解释性能差异的原因,我将非常感激,这样我以后就可以避免类似的陷阱。
首先提一个建议:不要将DataFrame转RDD,直接做df.map(your function here)
,这样可以节省很多时间。
下一页
https://dzone.com/articles/apache-spark-3-reasons-why-you-should-not-use-rdds
会节省我们很多时间,它的主要结论是 RDD 比 DataFrame/Dataset 慢得多,更不用说从 DataFrame 到 RDD 的转换所用的时间了。
让我们来谈谈 map 和 withColumn,现在 DataFrame 到 RDD 之间没有任何转换。
首先得出结论:map 通常比 withColumn 慢 5 倍。原因是 map 操作总是涉及反序列化和序列化,而 withColumn 可以对感兴趣的列进行操作。
具体来说,map 操作应该将 Row 反序列化为操作将承载的几个部分,
这里有一个例子:
假设我们有一个看起来像
的 DataFrame
+--------+-----------+
|language|users_count|
+--------+-----------+
| Java| 20000|
| Python| 100000|
| Scala| 3000|
+--------+-----------+
然后我们想把users_count列的所有值都加1,我们可以这样做:
df.map(row => {
val usersCount = row.getInt(1) + 1
(row.getString(0), usersCount)
}).toDF("language", "users_count_incremented_by_1")
在上面的代码中,我们首先需要反序列化每一行以提取第2列中的值,然后将修改后的值输出并保存为DataFrame(这一步需要序列化(a,b)到 Row(a, b) 中,因为 DataFrame 只是行的数据集)。
有关更详细的解释,请查看以下优秀文章
https://medium.com/@fqaiser94/udfs-vs-map-vs-custom-spark-native-functions-91ab2c154b44
map
不能对列本身进行操作,必须对列的值进行操作,获取值需要反序列化,保存为DataFrame
需要序列化。
但是 map 还是有很大用处的:在 map 方法的帮助下人们可以实现非常复杂的操作,而如果我们只使用 withColumn
.[=19= 就可以完成 built-in 操作]
总而言之,map速度较慢但更灵活,withColumn
肯定是最有效的,但功能有限
我有一个包含 100 多列的 table。我需要从某些列中删除双引号。我找到了 2 种方法,使用 withColumn() 和 map()
使用 withColumn()
cols_to_fix = ["col1", ..., "col20"]
for col in cols_to_fix:
df = df.withColumn(col, regexp_replace(df[col], "\"", ""))
使用地图()
def remove_quotes(row: Row) -> Row:
row_as_dict = row.asDict()
cols_to_fix = ["col1", ..., "col20"]
for column in cols_to_fix:
if row_as_dict[column]:
row_as_dict[column] = re.sub("\"", "", str(row_as_dict[column]))
return Row(**row_as_dict)
df = df.rdd.map(remove_quotes).toDF(df.schema)
这是我的问题。我发现在具有约 2500 万条记录的 table 上使用 map() 比使用 withColumn() 花费的时间大约长 4 倍。如果任何堆栈溢出用户可以解释性能差异的原因,我将非常感激,这样我以后就可以避免类似的陷阱。
首先提一个建议:不要将DataFrame转RDD,直接做df.map(your function here)
,这样可以节省很多时间。
下一页
https://dzone.com/articles/apache-spark-3-reasons-why-you-should-not-use-rdds
会节省我们很多时间,它的主要结论是 RDD 比 DataFrame/Dataset 慢得多,更不用说从 DataFrame 到 RDD 的转换所用的时间了。
让我们来谈谈 map 和 withColumn,现在 DataFrame 到 RDD 之间没有任何转换。
首先得出结论:map 通常比 withColumn 慢 5 倍。原因是 map 操作总是涉及反序列化和序列化,而 withColumn 可以对感兴趣的列进行操作。 具体来说,map 操作应该将 Row 反序列化为操作将承载的几个部分,
这里有一个例子: 假设我们有一个看起来像
的 DataFrame+--------+-----------+
|language|users_count|
+--------+-----------+
| Java| 20000|
| Python| 100000|
| Scala| 3000|
+--------+-----------+
然后我们想把users_count列的所有值都加1,我们可以这样做:
df.map(row => {
val usersCount = row.getInt(1) + 1
(row.getString(0), usersCount)
}).toDF("language", "users_count_incremented_by_1")
在上面的代码中,我们首先需要反序列化每一行以提取第2列中的值,然后将修改后的值输出并保存为DataFrame(这一步需要序列化(a,b)到 Row(a, b) 中,因为 DataFrame 只是行的数据集)。 有关更详细的解释,请查看以下优秀文章 https://medium.com/@fqaiser94/udfs-vs-map-vs-custom-spark-native-functions-91ab2c154b44
map
不能对列本身进行操作,必须对列的值进行操作,获取值需要反序列化,保存为DataFrame
需要序列化。
但是 map 还是有很大用处的:在 map 方法的帮助下人们可以实现非常复杂的操作,而如果我们只使用 withColumn
.[=19= 就可以完成 built-in 操作]
总而言之,map速度较慢但更灵活,withColumn
肯定是最有效的,但功能有限