重命名 spark 数据框中的嵌套字段
Rename nested field in spark dataframe
在 Spark 中有一个数据帧 df
:
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
如何将字段 array_field.a
重命名为 array_field.a_renamed
?
[更新]:
.withColumnRenamed()
不适用于嵌套字段,所以我尝试了这种不安全且不安全的方法:
# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'
ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'
# Then set dataframe's schema with altered schema
df._schema = schema
我知道设置私有属性不是一个好的做法,但我不知道设置 df 模式的其他方法
我认为我的方向是正确的,但是 df.printSchema()
仍然显示 array_field.a
的旧名称,尽管 df.schema == schema
是 True
Python
无法修改单个嵌套字段。你必须重新创建一个完整的结构。在这种特殊情况下,最简单的解决方案是使用 cast
.
先导入一堆:
from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
ArrayType, LongType, StringType, StructField, StructType)
和示例数据:
Record = namedtuple("Record", ["a", "b", "c"])
df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])
让我们确认架构与您的情况相同:
df.printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
您可以将新模式定义为字符串:
str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select(col("array_field").cast(str_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
或 DataType
:
struct_schema = ArrayType(StructType([
StructField("a_renamed", StringType()),
StructField("b", LongType()),
StructField("c", LongType())
]))
df.select(col("array_field").cast(struct_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
斯卡拉
在 Scala 中可以使用相同的技术:
case class Record(a: String, b: Long, c: Long)
val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")
val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select($"array_field".cast(strSchema))
或
import org.apache.spark.sql.types._
val structSchema = ArrayType(StructType(Seq(
StructField("a_renamed", StringType),
StructField("b", LongType),
StructField("c", LongType)
)))
df.select($"array_field".cast(structSchema))
可能的改进:
如果您使用富有表现力的数据操作或 JSON 处理库,则可以更容易地将数据类型转储到 dict
或 JSON 字符串并从那里获取它,例如 (Python / toolz
):
from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter
# Update name to "a_updated" if name is "a"
rename_field = update_in(
keys=["name"], func=lambda x: "a_updated" if x == "a" else x)
updated_schema = pipe(
# Get schema of the field as a dict
df.schema["array_field"].jsonValue(),
# Update fields with rename
update_in(
keys=["type", "elementType", "fields"],
func=lambda x: pipe(x, map(rename_field), list)),
# Load schema from dict
StructField.fromJson,
# Get data type
attrgetter("dataType"))
df.select(col("array_field").cast(updated_schema)).printSchema()
您可以递归数据框的架构以创建具有所需更改的新架构。
PySpark 中的模式是一个 StructType,它包含一个 StructField 列表,每个 StructField 可以包含一些原始类型或另一个 StructType。
这意味着我们可以根据类型是否为 StructType 来决定是否要递归。
下面是一个带注释的示例实现,向您展示了如何实现上述想法。
# Some imports
from pyspark.sql.types import DataType, StructType, ArrayType
from copy import copy
# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
# Returns a new sanitized field name (this function can be anything really)
def sanitizeFieldName(s: str) -> str:
return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
.replace("[", "_").replace("]", "_").replace(".", "_")
# We call this on all fields to create a copy and to perform any
# changes we might want to do to the field.
def sanitizeField(field: StructField) -> StructField:
field = copy(field)
field.name = sanitizeFieldName(field.name)
# We recursively call cleanSchema on all types
field.dataType = cleanSchema(field.dataType)
return field
def cleanSchema(dataType: [DataType]) -> [DataType]:
dataType = copy(dataType)
# If the type is a StructType we need to recurse otherwise
# we can return since we've reached the leaf node
if isinstance(dataType, StructType):
# We call our sanitizer for all top level fields
dataType.fields = [sanitizeField(f) for f in dataType.fields]
elif isinstance(dataType, ArrayType):
dataType.elementType = cleanSchema(dataType.elementType)
return dataType
# Now since we have the new schema we can create a new DataFrame
# by using the old Frame's RDD as data and the new schema as the
# schema for the data
return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
我找到了一种比@zero323 提供的方法更简单的方法,沿着这条线
@MaxPY:
Pyspark 2.4:
# Get the schema from the dataframe df
schema = df.schema
# Override `fields` with a list of new StructField, equals to the previous but for the names
schema.fields = (list(map(lambda field:
StructField(field.name + "_renamed", field.dataType), schema.fields)))
# Override also `names` with the same mechanism
schema.names = list(map(lambda name: name + "_renamed", table_schema.names))
现在 df.schema
将打印所有更新的名称。
另一个更简单的解决方案,如果它对你有用,就像对我一样有用,那就是展平结构,然后重命名:
使用 Scala:
val df_flat = df.selectExpr("array_field.*")
现在重命名有效
val df_renamed = df_flat.withColumnRenamed("a", "a_renamed")
当然,这只适用于您不需要层次结构的情况(尽管我想如果需要可以重新创建它)
使用 Leo C 提供的答案:,我构建了我认为更 human-friendly/pythoniac 的脚本:
import pyspark.sql.types as sql_types
path_table = "<PATH_TO_DATA>"
table_name = "<TABLE_NAME>"
def recur_rename(schema: StructType, old_char, new_char):
schema_new = []
for struct_field in schema:
if type(struct_field.dataType)==sql_types.StructType:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.StructType(recur_rename(struct_field.dataType, old_char, new_char)), struct_field.nullable, struct_field.metadata))
elif type(struct_field.dataType)==sql_types.ArrayType:
if type(struct_field.dataType.elementType)==sql_types.StructType:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.ArrayType(sql_types.StructType(recur_rename(struct_field.dataType.elementType, old_char, new_char)),True), struct_field.nullable, struct_field.metadata)) # Recursive call to loop over all Array elements
else:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType.elementType, struct_field.nullable, struct_field.metadata)) # If ArrayType only has one field, it is no sense to use an Array so Array is exploded
else:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType, struct_field.nullable, struct_field.metadata))
return schema_new
def rename_columns(schema: StructType, old_char, new_char):
return sql_types.StructType(recur_rename(schema, old_char, new_char))
df = spark.read.format("json").load(path_table) # Read data whose schema has to be changed.
newSchema = rename_columns(df.schema, ":", "_") # Replace special characters in schema (More special characters not allowed in Spark/Hive meastore: ':', ',', ';')
df2= spark.read.format("json").schema(newSchema).load(path_table) # Read data with new schema.
我认为代码不言自明(此外,它有注释)但它所做的是递归循环架构中的所有字段,将“old_char”替换为“new_char”他们每个人。如果字段类型是嵌套类型(StructType 或 ArrayType),则进行新的递归调用。
在 Spark 中有一个数据帧 df
:
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
如何将字段 array_field.a
重命名为 array_field.a_renamed
?
[更新]:
.withColumnRenamed()
不适用于嵌套字段,所以我尝试了这种不安全且不安全的方法:
# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'
ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'
# Then set dataframe's schema with altered schema
df._schema = schema
我知道设置私有属性不是一个好的做法,但我不知道设置 df 模式的其他方法
我认为我的方向是正确的,但是 df.printSchema()
仍然显示 array_field.a
的旧名称,尽管 df.schema == schema
是 True
Python
无法修改单个嵌套字段。你必须重新创建一个完整的结构。在这种特殊情况下,最简单的解决方案是使用 cast
.
先导入一堆:
from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
ArrayType, LongType, StringType, StructField, StructType)
和示例数据:
Record = namedtuple("Record", ["a", "b", "c"])
df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])
让我们确认架构与您的情况相同:
df.printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
您可以将新模式定义为字符串:
str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select(col("array_field").cast(str_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
或 DataType
:
struct_schema = ArrayType(StructType([
StructField("a_renamed", StringType()),
StructField("b", LongType()),
StructField("c", LongType())
]))
df.select(col("array_field").cast(struct_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
斯卡拉
在 Scala 中可以使用相同的技术:
case class Record(a: String, b: Long, c: Long)
val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")
val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select($"array_field".cast(strSchema))
或
import org.apache.spark.sql.types._
val structSchema = ArrayType(StructType(Seq(
StructField("a_renamed", StringType),
StructField("b", LongType),
StructField("c", LongType)
)))
df.select($"array_field".cast(structSchema))
可能的改进:
如果您使用富有表现力的数据操作或 JSON 处理库,则可以更容易地将数据类型转储到 dict
或 JSON 字符串并从那里获取它,例如 (Python / toolz
):
from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter
# Update name to "a_updated" if name is "a"
rename_field = update_in(
keys=["name"], func=lambda x: "a_updated" if x == "a" else x)
updated_schema = pipe(
# Get schema of the field as a dict
df.schema["array_field"].jsonValue(),
# Update fields with rename
update_in(
keys=["type", "elementType", "fields"],
func=lambda x: pipe(x, map(rename_field), list)),
# Load schema from dict
StructField.fromJson,
# Get data type
attrgetter("dataType"))
df.select(col("array_field").cast(updated_schema)).printSchema()
您可以递归数据框的架构以创建具有所需更改的新架构。
PySpark 中的模式是一个 StructType,它包含一个 StructField 列表,每个 StructField 可以包含一些原始类型或另一个 StructType。
这意味着我们可以根据类型是否为 StructType 来决定是否要递归。
下面是一个带注释的示例实现,向您展示了如何实现上述想法。
# Some imports
from pyspark.sql.types import DataType, StructType, ArrayType
from copy import copy
# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
# Returns a new sanitized field name (this function can be anything really)
def sanitizeFieldName(s: str) -> str:
return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
.replace("[", "_").replace("]", "_").replace(".", "_")
# We call this on all fields to create a copy and to perform any
# changes we might want to do to the field.
def sanitizeField(field: StructField) -> StructField:
field = copy(field)
field.name = sanitizeFieldName(field.name)
# We recursively call cleanSchema on all types
field.dataType = cleanSchema(field.dataType)
return field
def cleanSchema(dataType: [DataType]) -> [DataType]:
dataType = copy(dataType)
# If the type is a StructType we need to recurse otherwise
# we can return since we've reached the leaf node
if isinstance(dataType, StructType):
# We call our sanitizer for all top level fields
dataType.fields = [sanitizeField(f) for f in dataType.fields]
elif isinstance(dataType, ArrayType):
dataType.elementType = cleanSchema(dataType.elementType)
return dataType
# Now since we have the new schema we can create a new DataFrame
# by using the old Frame's RDD as data and the new schema as the
# schema for the data
return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
我找到了一种比@zero323 提供的方法更简单的方法,沿着这条线 @MaxPY:
Pyspark 2.4:
# Get the schema from the dataframe df
schema = df.schema
# Override `fields` with a list of new StructField, equals to the previous but for the names
schema.fields = (list(map(lambda field:
StructField(field.name + "_renamed", field.dataType), schema.fields)))
# Override also `names` with the same mechanism
schema.names = list(map(lambda name: name + "_renamed", table_schema.names))
现在 df.schema
将打印所有更新的名称。
另一个更简单的解决方案,如果它对你有用,就像对我一样有用,那就是展平结构,然后重命名:
使用 Scala:
val df_flat = df.selectExpr("array_field.*")
现在重命名有效
val df_renamed = df_flat.withColumnRenamed("a", "a_renamed")
当然,这只适用于您不需要层次结构的情况(尽管我想如果需要可以重新创建它)
使用 Leo C 提供的答案:
import pyspark.sql.types as sql_types
path_table = "<PATH_TO_DATA>"
table_name = "<TABLE_NAME>"
def recur_rename(schema: StructType, old_char, new_char):
schema_new = []
for struct_field in schema:
if type(struct_field.dataType)==sql_types.StructType:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.StructType(recur_rename(struct_field.dataType, old_char, new_char)), struct_field.nullable, struct_field.metadata))
elif type(struct_field.dataType)==sql_types.ArrayType:
if type(struct_field.dataType.elementType)==sql_types.StructType:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.ArrayType(sql_types.StructType(recur_rename(struct_field.dataType.elementType, old_char, new_char)),True), struct_field.nullable, struct_field.metadata)) # Recursive call to loop over all Array elements
else:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType.elementType, struct_field.nullable, struct_field.metadata)) # If ArrayType only has one field, it is no sense to use an Array so Array is exploded
else:
schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType, struct_field.nullable, struct_field.metadata))
return schema_new
def rename_columns(schema: StructType, old_char, new_char):
return sql_types.StructType(recur_rename(schema, old_char, new_char))
df = spark.read.format("json").load(path_table) # Read data whose schema has to be changed.
newSchema = rename_columns(df.schema, ":", "_") # Replace special characters in schema (More special characters not allowed in Spark/Hive meastore: ':', ',', ';')
df2= spark.read.format("json").schema(newSchema).load(path_table) # Read data with new schema.
我认为代码不言自明(此外,它有注释)但它所做的是递归循环架构中的所有字段,将“old_char”替换为“new_char”他们每个人。如果字段类型是嵌套类型(StructType 或 ArrayType),则进行新的递归调用。