将结构列名称转换为镶木地板文件中的行
To convert struct column names into rows in a parquet file
我有一个示例 json 数据文件,如下所示:
{"data_id":"1234","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":true,"familyname":true,"swimming_pool":true}}}
{"data_id":"6789","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":false,"familyname":true}}}
{"data_id":"5678","risk_characteristics":{"indicators":{"alcohol":false}}}
我将 json 文件转换为镶木地板并使用以下代码加载到配置单元中
dataDF = spark.read.json("path/Datasmall.json")
dataDF.write.parquet("data.parquet")
parqFile = spark.read.parquet("data.parquet")
parqFile.write.saveAsTable("indicators_table", format='parquet', mode='append', path='/externalpath/indicators_table/')
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
fromHiveDF = hive_context.table("default.indicators_table")
fromHiveDF.show()
indicatorsDF = fromHiveDF.select('data_id', 'risk_characteristics.indicators')
indicatorsDF.printSchema()
root
|-- data_id: string (nullable = true)
|-- indicators: struct (nullable = true)
| |-- alcohol: boolean (nullable = true)
| |-- house: boolean (nullable = true)
| |-- business: boolean (nullable = true)
| |-- familyname: boolean (nullable = true)
indicatorsDF.show()
+-------+--------------------+
|data_id| indicators|
+-------+--------------------+
| 1234|[true, true, true...|
| 6789|[true, false, tru...|
| 5678| [false,,,,]|
+-------+--------------------+
而不是将数据检索为 select data_id、indicators.alcohol、indicators.house 等,
我只是想得到一个只有以下 3 列的镶木地板数据文件。也就是说 - 结构字段被转换为 indicators_type 列名下的行。
data_id indicators_type indicators_value
1234 alcohol T
1234 house T
1234 business T
1234 familyname T
1234 swimming_ppol T
6789 alcohol T
6789 house F
6789 business T
6789 familyname F
5678 alcohol F
请问如何操作。我正在尝试使用 pyspark 来完成此操作。还有没有一种方法可以在不对文字细节进行硬编码的情况下实现这一目标。在我的实际数据中,结构数据可以扩展到 familyname 之外,甚至可以是 100 个。
非常感谢
使用stack
堆叠列:
df.show()
+-------+--------------------------+
|data_id|indicators |
+-------+--------------------------+
|1234 |[true, true, false, true] |
|6789 |[true, false, true, false]|
+-------+--------------------------+
stack_expr = 'stack(' + str(len(df.select('indicators.*').columns)) + ', ' + ', '.join(["'%s', indicators.%s" % (col,col) for col in df.select('indicators.*').columns]) + ') as (indicators_type, indicators_value)'
df2 = df.selectExpr(
'data_id',
stack_expr
)
df2.show()
+-------+---------------+----------------+
|data_id|indicators_type|indicators_value|
+-------+---------------+----------------+
| 1234| alcohol| true|
| 1234| house| true|
| 1234| business| false|
| 1234| familyname| true|
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
+-------+---------------+----------------+
另一个使用 explode 的解决方案:
val df = spark.sql(""" with t1(
select 1234 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
union
select 6789 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
)
select * from t1
""")
df.show(false)
df.printSchema
+-------+--------------------------+
|data_id|indicators |
+-------+--------------------------+
|6789 |[true, false, true, false]|
|1234 |[true, false, true, false]|
+-------+--------------------------+
root
|-- data_id: integer (nullable = false)
|-- indicators: struct (nullable = false)
| |-- alcohol: boolean (nullable = false)
| |-- house: boolean (nullable = false)
| |-- business: boolean (nullable = false)
| |-- familyname: boolean (nullable = false)
val df2 = df.withColumn("x", explode(array(
map(lit("alcohol") ,col("indicators.alcohol")),
map(lit("house"), col("indicators.house")),
map(lit("business"), col("indicators.business")),
map(lit("familyname"), col("indicators.familyname"))
)))
df2.select(col("data_id"),map_keys(col("x"))(0), map_values(col("x"))(0)).show
+-------+--------------+----------------+
|data_id|map_keys(x)[0]|map_values(x)[0]|
+-------+--------------+----------------+
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
+-------+--------------+----------------+
更新 1:
要动态获取指标结构列,请使用以下方法。
val colsx = df.select("indicators.*").columns
colsx: Array[String] = Array(alcohol, house, business, familyname)
val exp1 = colsx.map( x => s""" map("${x}", indicators.${x}) """ ).mkString(",")
val exp2 = " explode(array( " + exp1 + " )) "
val df2 = df.withColumn("x",expr(exp2))
df2.select(col("data_id"),map_keys(col("x"))(0).as("indicator_key"), map_values(col("x"))(0).as("indicator_value")).show
+-------+-------------+---------------+
|data_id|indicator_key|indicator_value|
+-------+-------------+---------------+
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
+-------+-------------+---------------+
我有一个示例 json 数据文件,如下所示:
{"data_id":"1234","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":true,"familyname":true,"swimming_pool":true}}}
{"data_id":"6789","risk_characteristics":{"indicators":{"alcohol":true,"house":true,"business":false,"familyname":true}}}
{"data_id":"5678","risk_characteristics":{"indicators":{"alcohol":false}}}
我将 json 文件转换为镶木地板并使用以下代码加载到配置单元中
dataDF = spark.read.json("path/Datasmall.json")
dataDF.write.parquet("data.parquet")
parqFile = spark.read.parquet("data.parquet")
parqFile.write.saveAsTable("indicators_table", format='parquet', mode='append', path='/externalpath/indicators_table/')
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
fromHiveDF = hive_context.table("default.indicators_table")
fromHiveDF.show()
indicatorsDF = fromHiveDF.select('data_id', 'risk_characteristics.indicators')
indicatorsDF.printSchema()
root
|-- data_id: string (nullable = true)
|-- indicators: struct (nullable = true)
| |-- alcohol: boolean (nullable = true)
| |-- house: boolean (nullable = true)
| |-- business: boolean (nullable = true)
| |-- familyname: boolean (nullable = true)
indicatorsDF.show()
+-------+--------------------+
|data_id| indicators|
+-------+--------------------+
| 1234|[true, true, true...|
| 6789|[true, false, tru...|
| 5678| [false,,,,]|
+-------+--------------------+
而不是将数据检索为 select data_id、indicators.alcohol、indicators.house 等, 我只是想得到一个只有以下 3 列的镶木地板数据文件。也就是说 - 结构字段被转换为 indicators_type 列名下的行。
data_id indicators_type indicators_value
1234 alcohol T
1234 house T
1234 business T
1234 familyname T
1234 swimming_ppol T
6789 alcohol T
6789 house F
6789 business T
6789 familyname F
5678 alcohol F
请问如何操作。我正在尝试使用 pyspark 来完成此操作。还有没有一种方法可以在不对文字细节进行硬编码的情况下实现这一目标。在我的实际数据中,结构数据可以扩展到 familyname 之外,甚至可以是 100 个。
非常感谢
使用stack
堆叠列:
df.show()
+-------+--------------------------+
|data_id|indicators |
+-------+--------------------------+
|1234 |[true, true, false, true] |
|6789 |[true, false, true, false]|
+-------+--------------------------+
stack_expr = 'stack(' + str(len(df.select('indicators.*').columns)) + ', ' + ', '.join(["'%s', indicators.%s" % (col,col) for col in df.select('indicators.*').columns]) + ') as (indicators_type, indicators_value)'
df2 = df.selectExpr(
'data_id',
stack_expr
)
df2.show()
+-------+---------------+----------------+
|data_id|indicators_type|indicators_value|
+-------+---------------+----------------+
| 1234| alcohol| true|
| 1234| house| true|
| 1234| business| false|
| 1234| familyname| true|
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
+-------+---------------+----------------+
另一个使用 explode 的解决方案:
val df = spark.sql(""" with t1(
select 1234 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
union
select 6789 data_id, named_struct('alcohol',true, 'house',false, 'business', true, 'familyname', false) as indicators
)
select * from t1
""")
df.show(false)
df.printSchema
+-------+--------------------------+
|data_id|indicators |
+-------+--------------------------+
|6789 |[true, false, true, false]|
|1234 |[true, false, true, false]|
+-------+--------------------------+
root
|-- data_id: integer (nullable = false)
|-- indicators: struct (nullable = false)
| |-- alcohol: boolean (nullable = false)
| |-- house: boolean (nullable = false)
| |-- business: boolean (nullable = false)
| |-- familyname: boolean (nullable = false)
val df2 = df.withColumn("x", explode(array(
map(lit("alcohol") ,col("indicators.alcohol")),
map(lit("house"), col("indicators.house")),
map(lit("business"), col("indicators.business")),
map(lit("familyname"), col("indicators.familyname"))
)))
df2.select(col("data_id"),map_keys(col("x"))(0), map_values(col("x"))(0)).show
+-------+--------------+----------------+
|data_id|map_keys(x)[0]|map_values(x)[0]|
+-------+--------------+----------------+
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
+-------+--------------+----------------+
更新 1:
要动态获取指标结构列,请使用以下方法。
val colsx = df.select("indicators.*").columns
colsx: Array[String] = Array(alcohol, house, business, familyname)
val exp1 = colsx.map( x => s""" map("${x}", indicators.${x}) """ ).mkString(",")
val exp2 = " explode(array( " + exp1 + " )) "
val df2 = df.withColumn("x",expr(exp2))
df2.select(col("data_id"),map_keys(col("x"))(0).as("indicator_key"), map_values(col("x"))(0).as("indicator_value")).show
+-------+-------------+---------------+
|data_id|indicator_key|indicator_value|
+-------+-------------+---------------+
| 6789| alcohol| true|
| 6789| house| false|
| 6789| business| true|
| 6789| familyname| false|
| 1234| alcohol| true|
| 1234| house| false|
| 1234| business| true|
| 1234| familyname| false|
+-------+-------------+---------------+