用另一个(布尔)数组列子集一个数组列
Subset one array column with another (boolean) array column
我有一个这样的数据框(在 Pyspark 2.3.1 中):
from pyspark.sql import Row
my_data = spark.createDataFrame([
Row(a=[9, 3, 4], b=['a', 'b', 'c'], mask=[True, False, False]),
Row(a=[7, 2, 6, 4], b=['w', 'x', 'y', 'z'], mask=[True, False, True, False])
])
my_data.show(truncate=False)
#+------------+------------+--------------------------+
#|a |b |mask |
#+------------+------------+--------------------------+
#|[9, 3, 4] |[a, b, c] |[true, false, false] |
#|[7, 2, 6, 4]|[w, x, y, z]|[true, false, true, false]|
#+------------+------------+--------------------------+
现在我想使用 mask
列来对 a
和 b
列进行子集化:
my_desired_output = spark.createDataFrame([
Row(a=[9], b=['a']),
Row(a=[7, 6], b=['w', 'y'])
])
my_desired_output.show(truncate=False)
#+------+------+
#|a |b |
#+------+------+
#|[9] |[a] |
#|[7, 6]|[w, y]|
#+------+------+
实现此目的的 "idiomatic" 方法是什么?我目前的解决方案涉及 map
-ing 底层 RDD 并使用 Numpy 进行子集化,这似乎不够优雅:
import numpy as np
def subset_with_mask(row):
mask = np.asarray(row.mask)
a_masked = np.asarray(row.a)[mask].tolist()
b_masked = np.asarray(row.b)[mask].tolist()
return Row(a=a_masked, b=b_masked)
my_desired_output = spark.createDataFrame(my_data.rdd.map(subset_with_mask))
这是最好的方法吗,还是有更好的方法(更简洁 and/or 更有效)我可以使用 Spark SQL 工具?
一种选择是使用 UDF,您可以选择根据数组中的数据类型专门化它:
import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
def _mask_list(lst, mask):
return np.asarray(lst)[mask].tolist()
mask_array_int = F.udf(_mask_list, T.ArrayType(T.IntegerType()))
mask_array_str = F.udf(_mask_list, T.ArrayType(T.StringType()))
my_desired_output = my_data
my_desired_output = my_desired_output.withColumn(
'a', mask_array_int(F.col('a'), F.col('mask'))
)
my_desired_output = my_desired_output.withColumn(
'b', mask_array_str(F.col('b'), F.col('mask'))
)
上一个回答中提到的UDFs可能是在Spark 2.4中添加数组函数之前要走的路。为了完整起见,这里有一个"pure SQL" 2.4之前的实现。
from pyspark.sql.functions import *
df = my_data.withColumn("row", monotonically_increasing_id())
df1 = df.select("row", posexplode("a").alias("pos", "a"))
df2 = df.select("row", posexplode("b").alias("pos", "b"))
df3 = df.select("row", posexplode("mask").alias("pos", "mask"))
df1\
.join(df2, ["row", "pos"])\
.join(df3, ["row", "pos"])\
.filter("mask")\
.groupBy("row")\
.agg(collect_list("a").alias("a"), collect_list("b").alias("b"))\
.select("a", "b")\
.show()
输出:
+------+------+
| a| b|
+------+------+
|[7, 6]|[w, y]|
| [9]| [a]|
+------+------+
一个更好的方法是使用 pyspark.sql.functions.expr
, filter
, and transform
:
import pandas as pd
from pyspark.sql import (
functions as F,
SparkSession
)
spark = SparkSession.builder.master('local[4]').getOrCreate()
bool_df = pd.DataFrame([
['a', [0, 1, 2, 3, 4], [True]*4 + [False]],
['b', [5, 6, 7, 8, 9], [False, True, False, True, False]]
], columns=['id', 'int_arr', 'bool_arr'])
bool_sdf = spark.createDataFrame(bool_df)
def filter_with_mask(in_col, mask_col, out_name="masked_arr"):
filt_input = f'arrays_zip({in_col}, {mask_col})'
filt_func = f'x -> x.{mask_col}'
trans_func = f'x -> x.{in_col}'
result = F.expr(f'''transform(
filter({filt_input}, {filt_func}), {trans_func}
)''').alias
return result
使用函数:
bool_sdf.select(
'*', filter_with_mask('int_arr', 'bool_arr', bool_sdf)
).toPandas()
结果:
id int_arr bool_arr masked_arr
a [0, 1, 2, 3, 4] [True, True, True, True, False] [0, 1, 2, 3]
b [5, 6, 7, 8, 9] [False, True, False, True, False] [6, 8]
pyspark >= 2.4.0
和 python >= 3.6
应该可以实现。
我有一个这样的数据框(在 Pyspark 2.3.1 中):
from pyspark.sql import Row
my_data = spark.createDataFrame([
Row(a=[9, 3, 4], b=['a', 'b', 'c'], mask=[True, False, False]),
Row(a=[7, 2, 6, 4], b=['w', 'x', 'y', 'z'], mask=[True, False, True, False])
])
my_data.show(truncate=False)
#+------------+------------+--------------------------+
#|a |b |mask |
#+------------+------------+--------------------------+
#|[9, 3, 4] |[a, b, c] |[true, false, false] |
#|[7, 2, 6, 4]|[w, x, y, z]|[true, false, true, false]|
#+------------+------------+--------------------------+
现在我想使用 mask
列来对 a
和 b
列进行子集化:
my_desired_output = spark.createDataFrame([
Row(a=[9], b=['a']),
Row(a=[7, 6], b=['w', 'y'])
])
my_desired_output.show(truncate=False)
#+------+------+
#|a |b |
#+------+------+
#|[9] |[a] |
#|[7, 6]|[w, y]|
#+------+------+
实现此目的的 "idiomatic" 方法是什么?我目前的解决方案涉及 map
-ing 底层 RDD 并使用 Numpy 进行子集化,这似乎不够优雅:
import numpy as np
def subset_with_mask(row):
mask = np.asarray(row.mask)
a_masked = np.asarray(row.a)[mask].tolist()
b_masked = np.asarray(row.b)[mask].tolist()
return Row(a=a_masked, b=b_masked)
my_desired_output = spark.createDataFrame(my_data.rdd.map(subset_with_mask))
这是最好的方法吗,还是有更好的方法(更简洁 and/or 更有效)我可以使用 Spark SQL 工具?
一种选择是使用 UDF,您可以选择根据数组中的数据类型专门化它:
import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
def _mask_list(lst, mask):
return np.asarray(lst)[mask].tolist()
mask_array_int = F.udf(_mask_list, T.ArrayType(T.IntegerType()))
mask_array_str = F.udf(_mask_list, T.ArrayType(T.StringType()))
my_desired_output = my_data
my_desired_output = my_desired_output.withColumn(
'a', mask_array_int(F.col('a'), F.col('mask'))
)
my_desired_output = my_desired_output.withColumn(
'b', mask_array_str(F.col('b'), F.col('mask'))
)
上一个回答中提到的UDFs可能是在Spark 2.4中添加数组函数之前要走的路。为了完整起见,这里有一个"pure SQL" 2.4之前的实现。
from pyspark.sql.functions import *
df = my_data.withColumn("row", monotonically_increasing_id())
df1 = df.select("row", posexplode("a").alias("pos", "a"))
df2 = df.select("row", posexplode("b").alias("pos", "b"))
df3 = df.select("row", posexplode("mask").alias("pos", "mask"))
df1\
.join(df2, ["row", "pos"])\
.join(df3, ["row", "pos"])\
.filter("mask")\
.groupBy("row")\
.agg(collect_list("a").alias("a"), collect_list("b").alias("b"))\
.select("a", "b")\
.show()
输出:
+------+------+
| a| b|
+------+------+
|[7, 6]|[w, y]|
| [9]| [a]|
+------+------+
一个更好的方法是使用 pyspark.sql.functions.expr
, filter
, and transform
:
import pandas as pd
from pyspark.sql import (
functions as F,
SparkSession
)
spark = SparkSession.builder.master('local[4]').getOrCreate()
bool_df = pd.DataFrame([
['a', [0, 1, 2, 3, 4], [True]*4 + [False]],
['b', [5, 6, 7, 8, 9], [False, True, False, True, False]]
], columns=['id', 'int_arr', 'bool_arr'])
bool_sdf = spark.createDataFrame(bool_df)
def filter_with_mask(in_col, mask_col, out_name="masked_arr"):
filt_input = f'arrays_zip({in_col}, {mask_col})'
filt_func = f'x -> x.{mask_col}'
trans_func = f'x -> x.{in_col}'
result = F.expr(f'''transform(
filter({filt_input}, {filt_func}), {trans_func}
)''').alias
return result
使用函数:
bool_sdf.select(
'*', filter_with_mask('int_arr', 'bool_arr', bool_sdf)
).toPandas()
结果:
id int_arr bool_arr masked_arr
a [0, 1, 2, 3, 4] [True, True, True, True, False] [0, 1, 2, 3]
b [5, 6, 7, 8, 9] [False, True, False, True, False] [6, 8]
pyspark >= 2.4.0
和 python >= 3.6
应该可以实现。