如何更改 pyspark 中的列元数据?
How to change column metadata in pyspark?
如何更新 PySpark 中的列元数据?
我有与分类(字符串)特征的标称编码相对应的元数据值,我想以自动方式将它们解码回来。除非重新创建架构,否则无法直接在 pyspark API 中写入元数据。是否可以随时随地在 PySpark 中编辑元数据,而无需将数据集转换为 RDD 并将其转换回来,提供完整的模式描述(如所述 )?
示例列表:
# Create DF
df.show()
# +---+-------------+
# | id| features|
# +---+-------------+
# | 0|[1.0,1.0,4.0]|
# | 1|[2.0,2.0,4.0]|
# +---+-------------+
# - That one has all the necessary metadata about what is encoded in feature column
# Slice one feature out
df = VectorSlicer(inputCol='features', outputCol='categoryIndex', indices=[1]).transform(df)
df = df.drop('features')
# +---+-------------+
# | id|categoryIndex|
# +---+-------------+
# | 0| [1.0]|
# | 1| [2.0]|
# +---+-------------+
# categoryIndex now carries metadata about singular array with encoding
# Get rid of the singular array
udf = UserDefinedFunction(lambda x: float(x[0]), returnType=DoubleType())
df2 = df.select(*[udf(column).alias(column) if column == 'categoryIndex' else column for column in df.columns])
# +---+-------------+
# | id|categoryIndex|
# +---+-------------+
# | 0| 1.0|
# | 1| 2.0|
# +---+-------------+
# - Metadata is lost for that one
# Write metadata
extract = {...}
df2.schema.fields[1].metadata = extract(df.schema.fields[1].metadata)
# metadata is readable from df2.schema.fields[1].metadata but is not affective.
# Saving and restoring df from parque destroys the change
# Decode categorical
df = IndexToString(inputCol="categoryIndex", outputCol="category").transform(df)
# ERROR. Was supposed to decode the categorical values
提供了有关如何使用 VectorAssembler、VectorIndexer 以及如何通过使用 StructType 构建完整模式来添加元数据的见解,但尚未回答我的问题。
在这两种情况下都会丢失元数据:
- 当您调用 Python
udf
时,输入 Column
及其元数据与输出 Column
之间没有任何关系。 UserDefinedFunction
(在 Python 和 Scala 中)是 Spark 引擎的黑盒。
将数据直接分配给 Python 模式对象:
df2.schema.fields[1].metadata = extract(df.schema.fields[1].metadata)
根本不是一个有效的方法。 Spark DataFrame
是 JVM 对象的包装器。 Python 包装器中的任何更改对于 JVM 后端都是完全不透明的,并且根本不会传播:
import json
df = spark.createDataFrame([(1, "foo")], ("k", "v"))
df.schema[-1].metadata = {"foo": "bar"}
json.loads(df._jdf.schema().json())
## {'fields': [{'metadata': {}, 'name': 'k', 'nullable': True, 'type': 'long'},
## {'metadata': {}, 'name': 'v', 'nullable': True, 'type': 'string'}],
## 'type': 'struct'}
甚至保存在Python:
df.select("*").schema[-1].metadata
## {}
使用 Spark < 2.2 你可以使用一个小的包装器(取自 Spark Gotchas, maintained by me and @eliasah):
def withMeta(self, alias, meta):
sc = SparkContext._active_spark_context
jmeta = sc._gateway.jvm.org.apache.spark.sql.types.Metadata
return Column(getattr(self._jc, "as")(alias, jmeta.fromJson(json.dumps(meta))))
df.withColumn("foo", withMeta(col("foo"), "", {...}))
使用 Spark >= 2.2 你可以使用 Column.alias
:
df.withColumn("foo", col("foo").alias("", metadata={...}))
如何更新 PySpark 中的列元数据?
我有与分类(字符串)特征的标称编码相对应的元数据值,我想以自动方式将它们解码回来。除非重新创建架构,否则无法直接在 pyspark API 中写入元数据。是否可以随时随地在 PySpark 中编辑元数据,而无需将数据集转换为 RDD 并将其转换回来,提供完整的模式描述(如所述
示例列表:
# Create DF
df.show()
# +---+-------------+
# | id| features|
# +---+-------------+
# | 0|[1.0,1.0,4.0]|
# | 1|[2.0,2.0,4.0]|
# +---+-------------+
# - That one has all the necessary metadata about what is encoded in feature column
# Slice one feature out
df = VectorSlicer(inputCol='features', outputCol='categoryIndex', indices=[1]).transform(df)
df = df.drop('features')
# +---+-------------+
# | id|categoryIndex|
# +---+-------------+
# | 0| [1.0]|
# | 1| [2.0]|
# +---+-------------+
# categoryIndex now carries metadata about singular array with encoding
# Get rid of the singular array
udf = UserDefinedFunction(lambda x: float(x[0]), returnType=DoubleType())
df2 = df.select(*[udf(column).alias(column) if column == 'categoryIndex' else column for column in df.columns])
# +---+-------------+
# | id|categoryIndex|
# +---+-------------+
# | 0| 1.0|
# | 1| 2.0|
# +---+-------------+
# - Metadata is lost for that one
# Write metadata
extract = {...}
df2.schema.fields[1].metadata = extract(df.schema.fields[1].metadata)
# metadata is readable from df2.schema.fields[1].metadata but is not affective.
# Saving and restoring df from parque destroys the change
# Decode categorical
df = IndexToString(inputCol="categoryIndex", outputCol="category").transform(df)
# ERROR. Was supposed to decode the categorical values
在这两种情况下都会丢失元数据:
- 当您调用 Python
udf
时,输入Column
及其元数据与输出Column
之间没有任何关系。UserDefinedFunction
(在 Python 和 Scala 中)是 Spark 引擎的黑盒。 将数据直接分配给 Python 模式对象:
df2.schema.fields[1].metadata = extract(df.schema.fields[1].metadata)
根本不是一个有效的方法。 Spark
DataFrame
是 JVM 对象的包装器。 Python 包装器中的任何更改对于 JVM 后端都是完全不透明的,并且根本不会传播:import json df = spark.createDataFrame([(1, "foo")], ("k", "v")) df.schema[-1].metadata = {"foo": "bar"} json.loads(df._jdf.schema().json()) ## {'fields': [{'metadata': {}, 'name': 'k', 'nullable': True, 'type': 'long'}, ## {'metadata': {}, 'name': 'v', 'nullable': True, 'type': 'string'}], ## 'type': 'struct'}
甚至保存在Python:
df.select("*").schema[-1].metadata ## {}
使用 Spark < 2.2 你可以使用一个小的包装器(取自 Spark Gotchas, maintained by me and @eliasah):
def withMeta(self, alias, meta):
sc = SparkContext._active_spark_context
jmeta = sc._gateway.jvm.org.apache.spark.sql.types.Metadata
return Column(getattr(self._jc, "as")(alias, jmeta.fromJson(json.dumps(meta))))
df.withColumn("foo", withMeta(col("foo"), "", {...}))
使用 Spark >= 2.2 你可以使用 Column.alias
:
df.withColumn("foo", col("foo").alias("", metadata={...}))