在pyspark中将两个数据帧中的一个数据帧作为单独的子列

Making one dataframe out of two dataframes as separate subcolumns in pyspark

我想将两个数据框合二为一,所以每个都是子列,而不是数据框的连接。所以我有两个数据框,stat1_df 和 stat2_df,它们看起来像这样:

root
 |-- max_scenes: integer (nullable = true)
 |-- median_scenes: double (nullable = false)
 |-- avg_scenes: double (nullable = true)

+----------+-------------+------------------+
|max_scenes|median_scenes|avg_scenes        |
+----------+-------------+------------------+
|97        |7.0          |10.806451612903226|
|97        |7.0          |10.806451612903226|
|97        |7.0          |10.806451612903226|
|97        |7.0          |10.806451612903226|
+----------+-------------+------------------+


root
 |-- max: double (nullable = true)
 |-- type: string (nullable = true)

+-----+-----------+
|max  |type       |
+-----+-----------+
|10.0 |small      |
|25.0 |medium     |
|50.0 |large      |
|250.0|extra_large|
+-----+-----------+

,我希望 result_df 为:

root
 |-- some_statistics1: struct (nullable = true)
 |    |-- max_scenes: integer (nullable = true)
      |-- median_scenes: double (nullable = false)
      |-- avg_scenes: double (nullable = true)
 |-- some_statistics2: struct (nullable = true)
 |    |-- max: double (nullable = true)
      |-- type: string (nullable = true)

有没有办法把这两个如图所示? stat1_df 和 stat2_df 是简单的数据帧,没有数组,嵌套的 columns.Final 数据帧被写入 mongodb。如果有任何其他问题,我会在这里回答。

检查下面的代码。

在两个 DataFrame 中添加 id 列,将所有列移动到结构中,然后使用 join 两个 DataFrame 的

scala> val dfa = Seq(("10","8.9","7.9")).toDF("max_scenes","median_scenes","avg_scenes")
dfa: org.apache.spark.sql.DataFrame = [max_scenes: string, median_scenes: string ... 1 more field]

scala> dfa.show(false)
+----------+-------------+----------+
|max_scenes|median_scenes|avg_scenes|
+----------+-------------+----------+
|10        |8.9          |7.9       |
+----------+-------------+----------+


scala> dfa.printSchema
root
 |-- max_scenes: string (nullable = true)
 |-- median_scenes: string (nullable = true)
 |-- avg_scenes: string (nullable = true)


scala> val mdfa = dfa.select(struct($"*").as("some_statistics1")).withColumn("id",monotonically_increasing_id)
mdfa: org.apache.spark.sql.DataFrame = [some_statistics1: struct<max_scenes: string, median_scenes: string ... 1 more field>, id: bigint]

scala> mdfa.printSchema
root
 |-- some_statistics1: struct (nullable = false)
 |    |-- max_scenes: string (nullable = true)
 |    |-- median_scenes: string (nullable = true)
 |    |-- avg_scenes: string (nullable = true)
 |-- id: long (nullable = false)


scala> mdfa.show(false)
+----------------+---+
|some_statistics1|id |
+----------------+---+
|[10,8.9,7.9]    |0  |
+----------------+---+


scala> val dfb = Seq(("11.2","sample")).toDF("max","type")
dfb: org.apache.spark.sql.DataFrame = [max: string, type: string]

scala> dfb.printSchema
root
 |-- max: string (nullable = true)
 |-- type: string (nullable = true)


scala> dfb.show(false)
+----+------+
|max |type  |
+----+------+
|11.2|sample|
+----+------+


scala> val mdfb = dfb.select(struct($"*").as("some_statistics2")).withColumn("id",monotonically_increasing_id)
mdfb: org.apache.spark.sql.DataFrame = [some_statistics2: struct<max: string, type: string>, id: bigint]

scala> mdfb.printSchema
root
 |-- some_statistics2: struct (nullable = false)
 |    |-- max: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- id: long (nullable = false)


scala> mdfb.show(false)
+----------------+---+
|some_statistics2|id |
+----------------+---+
|[11.2,sample]   |0  |
+----------------+---+


scala> mdfa.join(mdfb,Seq("id"),"inner").drop("id").printSchema
root
 |-- some_statistics1: struct (nullable = false)
 |    |-- max_scenes: string (nullable = true)
 |    |-- median_scenes: string (nullable = true)
 |    |-- avg_scenes: string (nullable = true)
 |-- some_statistics2: struct (nullable = false)
 |    |-- max: string (nullable = true)
 |    |-- type: string (nullable = true)


scala> mdfa.join(mdfb,Seq("id"),"inner").drop("id").show(false)
+----------------+----------------+
|some_statistics1|some_statistics2|
+----------------+----------------+
|[10,8.9,7.9]    |[11.2,sample]   |
+----------------+----------------+