PySpark:向 DataFrame 添加更多列的最佳实践
PySpark: Best practice to add more columns to a DataFrame
Spark Dataframes 有一种方法 withColumn
可以一次添加一个新列。要添加多列,需要 withColumn
链。这是执行此操作的最佳做法吗?
我觉得使用mapPartitions
更有优势。假设我有一个由三个 withColumn
组成的链,然后是一个用于根据特定条件删除 Row
的过滤器。这是四种不同的操作(不过我不确定其中是否有广泛的转换)。但是如果我做一个mapPartitions
,我可以一次完成所有的事情。如果我有一个我希望每个 RDD 分区打开一次的数据库连接,它也会有所帮助。
我的问题分为两部分。
第一部分,这是我对 mapPartitions 的实现。这种方法有什么不可预见的问题吗?有没有更优雅的方法来做到这一点?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
第二部分,在 withColumn
和 filter
链上使用 mapPartitions
的权衡是什么?
我在某处读到,将可用方法与 Spark DF 结合使用总是比推出自己的实现更好。如果我的论点是错误的,请告诉我。谢谢!欢迎所有想法。
使用 df.withColumn()
是添加列的最佳方式。它们都是延迟添加的
Are there any unforeseen issues with this approach?
多个。最严重的影响是:
- 与普通
DataFrame
代码相比,内存占用量高几倍,垃圾收集开销也很大。
- 在执行上下文之间移动数据所需的序列化和反序列化成本很高。
- 在查询规划器中引入断点。
- 照原样,
toDF
调用的模式推断成本(如果提供适当的模式可以避免)和所有前面步骤的可能 re-execution。
- 等等...
其中一些可以通过 udf
和 select
/ withColumn
避免,其他则不能。
let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
您的 mapPartitions
没有删除任何操作,也没有提供任何 Spark 规划器无法排除的优化。它唯一的优点是它为昂贵的连接对象提供了一个很好的范围。
I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation
当您开始使用 executor-side Python 逻辑时,您已经偏离了 Spark SQL。使用 udf
、RDD
或新添加的矢量化 udf 都没有关系。归根结底,您应该根据代码的整体结构做出决定——如果它主要是直接在数据上执行的 Python 逻辑,那么坚持使用 RDD
或完全跳过 Spark 可能会更好。
如果它只是逻辑的一小部分,并且不会导致严重的性能问题,请不要担心。
Spark Dataframes 有一种方法 withColumn
可以一次添加一个新列。要添加多列,需要 withColumn
链。这是执行此操作的最佳做法吗?
我觉得使用mapPartitions
更有优势。假设我有一个由三个 withColumn
组成的链,然后是一个用于根据特定条件删除 Row
的过滤器。这是四种不同的操作(不过我不确定其中是否有广泛的转换)。但是如果我做一个mapPartitions
,我可以一次完成所有的事情。如果我有一个我希望每个 RDD 分区打开一次的数据库连接,它也会有所帮助。
我的问题分为两部分。
第一部分,这是我对 mapPartitions 的实现。这种方法有什么不可预见的问题吗?有没有更优雅的方法来做到这一点?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
第二部分,在 withColumn
和 filter
链上使用 mapPartitions
的权衡是什么?
我在某处读到,将可用方法与 Spark DF 结合使用总是比推出自己的实现更好。如果我的论点是错误的,请告诉我。谢谢!欢迎所有想法。
使用 df.withColumn()
是添加列的最佳方式。它们都是延迟添加的
Are there any unforeseen issues with this approach?
多个。最严重的影响是:
- 与普通
DataFrame
代码相比,内存占用量高几倍,垃圾收集开销也很大。 - 在执行上下文之间移动数据所需的序列化和反序列化成本很高。
- 在查询规划器中引入断点。
- 照原样,
toDF
调用的模式推断成本(如果提供适当的模式可以避免)和所有前面步骤的可能 re-execution。 - 等等...
其中一些可以通过 udf
和 select
/ withColumn
避免,其他则不能。
let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions
您的 mapPartitions
没有删除任何操作,也没有提供任何 Spark 规划器无法排除的优化。它唯一的优点是它为昂贵的连接对象提供了一个很好的范围。
I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation
当您开始使用 executor-side Python 逻辑时,您已经偏离了 Spark SQL。使用 udf
、RDD
或新添加的矢量化 udf 都没有关系。归根结底,您应该根据代码的整体结构做出决定——如果它主要是直接在数据上执行的 Python 逻辑,那么坚持使用 RDD
或完全跳过 Spark 可能会更好。
如果它只是逻辑的一小部分,并且不会导致严重的性能问题,请不要担心。