用另一个(布尔)数组列子集一个数组列

Subset one array column with another (boolean) array column

我有一个这样的数据框(在 Pyspark 2.3.1 中):

from pyspark.sql import Row

my_data = spark.createDataFrame([
  Row(a=[9, 3, 4], b=['a', 'b', 'c'], mask=[True, False, False]),
  Row(a=[7, 2, 6, 4], b=['w', 'x', 'y', 'z'], mask=[True, False, True, False])
])
my_data.show(truncate=False)
#+------------+------------+--------------------------+
#|a           |b           |mask                      |
#+------------+------------+--------------------------+
#|[9, 3, 4]   |[a, b, c]   |[true, false, false]      |
#|[7, 2, 6, 4]|[w, x, y, z]|[true, false, true, false]|
#+------------+------------+--------------------------+

现在我想使用 mask 列来对 ab 列进行子集化:

my_desired_output = spark.createDataFrame([
  Row(a=[9], b=['a']),
  Row(a=[7, 6], b=['w', 'y'])
])
my_desired_output.show(truncate=False)
#+------+------+
#|a     |b     |
#+------+------+
#|[9]   |[a]   |
#|[7, 6]|[w, y]|
#+------+------+

实现此目的的 "idiomatic" 方法是什么?我目前的解决方案涉及 map-ing 底层 RDD 并使用 Numpy 进行子集化,这似乎不够优雅:

import numpy as np

def subset_with_mask(row):
    mask = np.asarray(row.mask)
    a_masked = np.asarray(row.a)[mask].tolist()
    b_masked = np.asarray(row.b)[mask].tolist()
    return Row(a=a_masked, b=b_masked)

my_desired_output = spark.createDataFrame(my_data.rdd.map(subset_with_mask))

这是最好的方法吗,还是有更好的方法(更简洁 and/or 更有效)我可以使用 Spark SQL 工具?

一种选择是使用 UDF,您可以选择根据数组中的数据类型专门化它:

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T

def _mask_list(lst, mask):
    return np.asarray(lst)[mask].tolist()

mask_array_int = F.udf(_mask_list, T.ArrayType(T.IntegerType()))
mask_array_str = F.udf(_mask_list, T.ArrayType(T.StringType()))

my_desired_output = my_data
my_desired_output = my_desired_output.withColumn(
    'a', mask_array_int(F.col('a'), F.col('mask'))
)
my_desired_output = my_desired_output.withColumn(
    'b', mask_array_str(F.col('b'), F.col('mask'))
)

上一个回答中提到的UDFs可能是在Spark 2.4中添加数组函数之前要走的路。为了完整起见,这里有一个"pure SQL" 2.4之前的实现。

from pyspark.sql.functions import *

df = my_data.withColumn("row", monotonically_increasing_id())

df1 = df.select("row", posexplode("a").alias("pos", "a"))
df2 = df.select("row", posexplode("b").alias("pos", "b"))
df3 = df.select("row", posexplode("mask").alias("pos", "mask"))

df1\
    .join(df2, ["row", "pos"])\
    .join(df3, ["row", "pos"])\
    .filter("mask")\
    .groupBy("row")\
    .agg(collect_list("a").alias("a"), collect_list("b").alias("b"))\
    .select("a", "b")\
    .show()

输出:

+------+------+
|     a|     b|
+------+------+
|[7, 6]|[w, y]|
|   [9]|   [a]|
+------+------+

一个更好的方法是使用 pyspark.sql.functions.expr, filter, and transform:

import pandas as pd
from pyspark.sql import (
    functions as F,
    SparkSession
)

spark = SparkSession.builder.master('local[4]').getOrCreate()

bool_df = pd.DataFrame([
    ['a', [0, 1, 2, 3, 4], [True]*4 + [False]],
    ['b', [5, 6, 7, 8, 9], [False, True, False, True, False]]
], columns=['id', 'int_arr', 'bool_arr'])
bool_sdf = spark.createDataFrame(bool_df)

def filter_with_mask(in_col, mask_col, out_name="masked_arr"):
    filt_input = f'arrays_zip({in_col}, {mask_col})'
    filt_func = f'x -> x.{mask_col}'
    trans_func = f'x -> x.{in_col}'
    
    result = F.expr(f'''transform(
        filter({filt_input}, {filt_func}), {trans_func}
    )''').alias
    return result

使用函数:

bool_sdf.select(
    '*', filter_with_mask('int_arr', 'bool_arr', bool_sdf)
).toPandas()

结果:

id         int_arr                          bool_arr   masked_arr
 a [0, 1, 2, 3, 4]   [True, True, True, True, False] [0, 1, 2, 3]
 b [5, 6, 7, 8, 9] [False, True, False, True, False]       [6, 8]

pyspark >= 2.4.0python >= 3.6 应该可以实现。