'NoneType' object is not iterable 错误在 PySpark DataFrame 的 ArrayType 上使用 udf

'NoneType' object is not iterable error using udf on ArrayType in PySpark DataFrame

我有一个具有以下架构的数据框

hello.printSchema()
root
 |-- list_a: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- list_b: array (nullable = true)
 |    |-- element: integer (containsNull = true)

和以下示例数据

hello.take(2)
[Row(list_a=[7, 11, 1, 14, 13, 15,999], list_b=[15, 13, 7, 11, 1, 14]),
 Row(list_a=[7, 11, 1, 14, 13, 15], list_b=[11, 1, 7, 14, 15, 13, 12])]

期望的输出

  1. 排序 list_alist_b
  2. 创建一个新列 list_diff,这样 list_diff = list(set(list_a) - set(list_b)) 如果不存在此类差异,则为空 ArrayType。

我试过的方法是UDF。

中所述,我正在尝试使用以下 UDF

sort_udf=udf(lambda x: sorted(x), ArrayType(IntegerType()))
differencer=udf(lambda x,y: [elt for elt in x if elt not in y], ArrayType(IntegerType()))

看起来 python 列表操作不受支持。

hello = hello.withColumn('sorted', sort_udf(hello.list_a))
hello = hello.withColumn('difference', differencer(hello.list_a, hello.list_b))

以上操作导致如下错误

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
[Redacted Stack Trace]
TypeError: 'NoneType' object is not iterable

我在这里遗漏了什么吗?

错误信息:

TypeError: 'NoneType' object is not iterable

是一个 python 异常(与 spark 错误相反),这意味着您的代码在 udf 中失败。您的问题是您的 DataFrame 中有一些 null 值。因此,当您调用 udf 时,您可能会将 None 值传递给 sorted:

>>> sorted(None)
TypeErrorTraceback (most recent call last)
<ipython-input-72-edb1060f46c4> in <module>()
----> 1 sorted(None)

TypeError: 'NoneType' object is not iterable

解决这个问题的方法是让您的 udf 对不良输入具有鲁棒性。在您的情况下,您可以更改函数以处理 null 输入,如下所示:

# return None if input is None
sort_udf = udf(lambda x: sorted(x) if x is not None else None, ArrayType(IntegerType()))

# return None if either x or y are None
differencer = udf(
    lambda x,y: [e for e in x if e not in y] if x is not None and y is not None else None,
    ArrayType(IntegerType())
)

然而,sort_udf 函数不是必需的,因为您可以使用 pyspark.sql.functions.sort_array() 代替。