如何将空数组转换为空值?

How to convert empty arrays to nulls?

我有以下数据框,我需要将空数组转换为 null。

+----+---------+-----------+
|  id|count(AS)|count(asdr)|
+----+---------+-----------+
|1110| [12, 45]|   [50, 55]|     
|1111|       []|         []|    
|1112| [45, 46]|   [50, 50]|   
|1113|       []|         []|
+----+---------+-----------+

我试过下面的代码,但没有用。

df.na.fill("null").show()

预期输出应该是

+----+---------+-----------+
|  id|count(AS)|count(asdr)|
+----+---------+-----------+
|1110| [12, 45]|   [50, 55]|     
|1111|     NUll|       NUll|    
|1112| [45, 46]|   [50, 50]|   
|1113|     NUll|       NUll|
+----+---------+-----------+

你可以用 selectExpr:

df_filled = df.selectExpr(
    "id",
    "if(size(column1)<=0, null, column1)",
    "if(size(column2)<=0, null, column2)",
    ...
)

您需要检查数组类型列的 size。喜欢:

df.show()
+----+---+
|  id|arr|
+----+---+
|1110| []|
+----+---+

df.withColumn("arr", when(size(col("arr")) == 0 , lit(None)).otherwise(col("arr") ) ).show()

+----+----+
|  id| arr|
+----+----+
|1110|null|
+----+----+

这里没有像 df.na.fill 这样简单的解决方案。一种方法是遍历所有相关列并在适当的地方替换值。在 scala 中使用 foldLeft 的示例:

val columns = df.schema.filter(_.dataType.typeName == "array").map(_.name)

val df2 = columns.foldLeft(df)((acc, colname) => acc.withColumn(colname, 
    when(size(col(colname)) === 0, null).otherwise(col(colname))))

首先,提取所有数组类型的列,然后迭代这些列。由于 size 函数仅为数组类型的列定义,因此这是必要的步骤(并避免循环遍历所有列)。

使用数据框:

+----+--------+-----+
|  id|    col1| col2|
+----+--------+-----+
|1110|[12, 11]|   []|
|1111|      []| [11]|
|1112|   [123]|[321]|
+----+--------+-----+

结果如下:

+----+--------+-----+
|  id|    col1| col2|
+----+--------+-----+
|1110|[12, 11]| null|
|1111|    null| [11]|
|1112|   [123]|[321]|
+----+--------+-----+

我认为 na.fill 不可能做到这一点,但这应该适合您。该代码将所有空的 ArrayType 列转换为 null,并保持其他列不变:

import spark.implicits._
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.functions._

val df = Seq(
  (110, Seq.empty[Int]),
  (111, Seq(1,2,3))
).toDF("id","arr")

// get names of array-type columns
val arrColsNames = df.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)

// map all empty arrays to nulls
val emptyArraysAsNulls = arrColsNames.map(n => when(size(col(n))>0,col(n)).as(n))

// non-array-type columns, keep them as they are
val keepCols = df.columns.filterNot(arrColsNames.contains).map(col)

df
  .select((keepCols ++ emptyArraysAsNulls):_*)
  .show()

+---+---------+
| id|      arr|
+---+---------+
|110|     null|
|111|[1, 2, 3]|
+---+---------+
df.withColumn("arr", when(size(col("arr")) == 0, lit(None)).otherwise(col("arr") ) ).show()

请记住,它在 pyspark 中也不起作用。

对于给定的 dataframe,您可以简单地执行以下操作

from pyspark.sql import functions as F
df.withColumn("count(AS)", F.when((F.size(F.col("count(AS)")) == 0), F.lit(None)).otherwise(F.col("count(AS)"))) \
    .withColumn("count(asdr)", F.when((F.size(F.col("count(asdr)")) == 0), F.lit(None)).otherwise(F.col("count(asdr)"))).show()

您应该将 dataframe 输出为

+----+---------+-----------+
|  id|count(AS)|count(asdr)|
+----+---------+-----------+
|1110| [12, 45]|   [50, 55]|
|1111|     null|       null|
|1112| [45, 46]|   [50, 50]|
|1113|     null|       null|
+----+---------+-----------+

已更新

如果您有两个以上的数组列并且您想动态应用上述逻辑,您可以使用以下逻辑

from pyspark.sql import functions as F
for c in df.dtypes:
    if "array" in c[1]:
        df = df.withColumn(c[0], F.when((F.size(F.col(c[0])) == 0), F.lit(None)).otherwise(F.col(c[0])))
df.show()

这里,
df.dtypes 将为您提供具有列名和数据类型的元组数组。至于问题中的数据框,它将是

[('id', 'bigint'), ('count(AS)', 'array<bigint>'), ('count(asdr)', 'array<bigint>')]

withColumn 仅应用于 array("array" in c[1]) 其中 F.size(F.col(c[0])) == 0when 函数的条件检查对于数组的大小。如果条件为真,即空数组,则填充 None 否则填充原始值。循环应用于所有数组列。

参考Ramesh Maharajans 上述解决方案。我找到了另一种使用 UDF 的解决方案。希望这对您的数据框的多个规则有所帮助。

df

|store|   1|   2|   3|
+-----+----+----+----+
|  103|[90]|  []|  []|
|  104|  []|[67]|[90]|
|  101|[34]|  []|  []|
|  102|[35]|  []|  []|
+-----+----+----+----+

使用下面的代码,导入import pyspark.sql.functions as psf 此代码适用于 pyspark

def udf1(x :list):
    if x==[]: return "null"
    else: return x
udf2 = udf(udf1, ArrayType(IntegerType()))

for c in df.dtypes:
    if "array" in c[1]:
        df=df.withColumn(c[0],udf2(psf.col(c[0])))
df.show()

输出

|store|   1|   2|   3|
+-----+----+----+----+
|  103|[90]|null|null|
|  104|null|[67]|[90]|
|  101|[34]|null|null|
|  102|[35]|null|null|
+-----+----+----+----+