在 pyspark 中写入镶木地板时忽略缺失值
Ignore missing values when writing to parquet in pyspark
我需要转换 parquet 文件的内部结构。
目前我有一个存储数组或数组的字段。我打算把它变成一个结构数组。
所以从这个:
root
-array
-array
我想要这个:
root
-array
-struct
我正在按以下方式执行转换:
我为新结构定义了一个架构:
newtype = ArrayType(StructType(
[
StructField("one", FloatType()),
StructField("two", FloatType()),
StructField("three", FloatType()),
StructField("four", FloatType()),
StructField("five", FloatType())
]))
我将 'empty' udf 应用于该列。有趣的部分是我为 udf 指定了模式。
def convert(arr):
return arr
df = spark.read.parquet("....")
spark.udf.register(name="convert", f=convert, returnType=newtype)
df = df.withColumn("col", expr("convert(col)"))
最后我把它写回镶木地板。
我遇到的问题是:
Input row doesn't have expected number of values required by the
schema. 5 fields are required while 3 values are provided.
确实如此。有些数组曾经有 3 个值。后来添加了更多值,因此较新的数组有 5 个值。
为什么会发生这种情况? 我将字段定义为可为空,所以我希望它能起作用。 我有哪些选择?
当您将数据转换为新的结构模式时,您必须为每个字段提供一个值。如果您不提供值,Spark 不想假设要输入的值。如果长度错误,只需为剩余值提供 None。此外,您的 convert 函数看起来不像是在处理嵌套数组。这是一个使用 None's.
更新为 pad 的工作示例
from pyspark.sql.types import *
from pyspark.sql.functions import *
old_type = StructType([
StructField("col", ArrayType(ArrayType(FloatType())))
])
new_type = ArrayType(StructType([
StructField("one", FloatType()),
StructField("two", FloatType()),
StructField("three", FloatType()),
StructField("four", FloatType()),
StructField("five", FloatType())
]))
data = [
([[1., 2., 3.], [1., 2., 3., 4., 5.]],)
]
rdd = spark.sparkContext.parallelize(data)
df = sqlContext.createDataFrame(rdd, old_type)
def convert(arr):
vals = []
for v in arr:
padding = [None] * (5 - len(v))
vals.append(v + padding)
return vals
spark.udf.register(name="convert", f=convert, returnType=new_type)
df = df.withColumn("col", expr("convert(col)"))
df.show(10, False)
我需要转换 parquet 文件的内部结构。
目前我有一个存储数组或数组的字段。我打算把它变成一个结构数组。
所以从这个:
root
-array
-array
我想要这个:
root
-array
-struct
我正在按以下方式执行转换:
我为新结构定义了一个架构:
newtype = ArrayType(StructType(
[
StructField("one", FloatType()),
StructField("two", FloatType()),
StructField("three", FloatType()),
StructField("four", FloatType()),
StructField("five", FloatType())
]))
我将 'empty' udf 应用于该列。有趣的部分是我为 udf 指定了模式。
def convert(arr):
return arr
df = spark.read.parquet("....")
spark.udf.register(name="convert", f=convert, returnType=newtype)
df = df.withColumn("col", expr("convert(col)"))
最后我把它写回镶木地板。
我遇到的问题是:
Input row doesn't have expected number of values required by the schema. 5 fields are required while 3 values are provided.
确实如此。有些数组曾经有 3 个值。后来添加了更多值,因此较新的数组有 5 个值。
为什么会发生这种情况? 我将字段定义为可为空,所以我希望它能起作用。 我有哪些选择?
当您将数据转换为新的结构模式时,您必须为每个字段提供一个值。如果您不提供值,Spark 不想假设要输入的值。如果长度错误,只需为剩余值提供 None。此外,您的 convert 函数看起来不像是在处理嵌套数组。这是一个使用 None's.
更新为 pad 的工作示例from pyspark.sql.types import *
from pyspark.sql.functions import *
old_type = StructType([
StructField("col", ArrayType(ArrayType(FloatType())))
])
new_type = ArrayType(StructType([
StructField("one", FloatType()),
StructField("two", FloatType()),
StructField("three", FloatType()),
StructField("four", FloatType()),
StructField("five", FloatType())
]))
data = [
([[1., 2., 3.], [1., 2., 3., 4., 5.]],)
]
rdd = spark.sparkContext.parallelize(data)
df = sqlContext.createDataFrame(rdd, old_type)
def convert(arr):
vals = []
for v in arr:
padding = [None] * (5 - len(v))
vals.append(v + padding)
return vals
spark.udf.register(name="convert", f=convert, returnType=new_type)
df = df.withColumn("col", expr("convert(col)"))
df.show(10, False)