在 pyspark 中写入镶木地板时忽略缺失值

Ignore missing values when writing to parquet in pyspark

我需要转换 parquet 文件的内部结构。

目前我有一个存储数组或数组的字段。我打算把它变成一个结构数组。

所以从这个:

root
  -array
     -array

我想要这个:

root
  -array
     -struct

我正在按以下方式执行转换:

我为新结构定义了一个架构:

newtype = ArrayType(StructType(
        [
            StructField("one", FloatType()),
            StructField("two", FloatType()),
            StructField("three", FloatType()),
            StructField("four", FloatType()),
            StructField("five", FloatType())
        ]))

我将 'empty' udf 应用于该列。有趣的部分是我为 udf 指定了模式。

def convert(arr):
   return arr

df = spark.read.parquet("....")
spark.udf.register(name="convert", f=convert, returnType=newtype)
df = df.withColumn("col", expr("convert(col)"))

最后我把它写回镶木地板。

我遇到的问题是:

Input row doesn't have expected number of values required by the schema. 5 fields are required while 3 values are provided.

确实如此。有些数组曾经有 3 个值。后来添加了更多值,因此较新的数组有 5 个值。

为什么会发生这种情况? 我将字段定义为可为空,所以我希望它能起作用。 我有哪些选择?

当您将数据转换为新的结构模式时,您必须为每个字段提供一个值。如果您不提供值,Spark 不想假设要输入的值。如果长度错误,只需为剩余值提供 None。此外,您的 convert 函数看起来不像是在处理嵌套数组。这是一个使用 None's.

更新为 pad 的工作示例
from pyspark.sql.types import *
from pyspark.sql.functions import * 

old_type = StructType([
    StructField("col", ArrayType(ArrayType(FloatType())))
])

new_type = ArrayType(StructType([
    StructField("one", FloatType()),
    StructField("two", FloatType()),
    StructField("three", FloatType()),
    StructField("four", FloatType()),
    StructField("five", FloatType())
]))

data = [
    ([[1., 2., 3.], [1., 2., 3., 4., 5.]],)
]


rdd = spark.sparkContext.parallelize(data)
df = sqlContext.createDataFrame(rdd, old_type)


def convert(arr):
    vals = []
    for v in arr:
        padding = [None] * (5 - len(v))
        vals.append(v + padding)
    return vals

spark.udf.register(name="convert", f=convert, returnType=new_type)
df = df.withColumn("col", expr("convert(col)"))
df.show(10, False)