Spark:如何将多行转换为多列的单行?

Spark: How to convert multiple rows into single row with multiple columns?

注意:这只是一个简单的示例数据。与真正的板球队相比没有意义。

我有一个 JSON 文件,如下所示:

{
  "someID": "a5cf4922f4e3f45",
  "payload": {
    "teamID": "1",
    "players": [
      {
        "type": "Batsman",
        "name": "Amar",
        "address": {
          "state": "Gujarat"
        }
      },
      {
        "type": "Bowler",
        "name": "Akbar",
        "address": {
          "state": "Telangana"
        }
      },
      {
        "type": "Fielder",
        "name": "Antony",
        "address": {
          "state": "Kerala"
        }
      }
    ]
  }
}

我用下面的代码分解了这个:

df_record = spark.read.json("path-to-file.json",multiLine=True)

df_player_dtls = df_record.select("payload.teamID", explode("payload.players").alias("xplayers")) \
                          .select("teamID", \
                                  "xplayers.type", \
                                  "xplayers.name", \
                                  "xplayers.address.state")

df_player_dtls.createOrReplaceTempView("t_player_dtls")

spark.sql("SELECT * FROM t_player_dtls").show()

所以当前的输出看起来像:

+--------+---------+--------+------------+
| TeamID |  Type   |  Name  |   State    |
+--------+---------+--------+------------+
|      1 | Batsman | Amar   | Gujarat    |
|      1 | Bowler  | Akbar  | Telangana  |
|      1 | Fielder | Antony | Kerala     |
|      2 | Batsman | John   | Queensland |
|      2 | Bowler  | Smith  | Perth      |
+--------+---------+--------+------------+

我想把它转换成下面显示的格式:

+--------+--------------+---------------+-------------+--------------+--------------+---------------+
| TeamID | Batsman.Name | Batsman.State | Bowler.Name | Bowler.State | Fielder.Name | Fielder.State |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
|      1 | Amar         | Gujarat       | Akbar       | Telangana    | Antony       | Kerala        |
|      2 | John         | Queensland    | Smith       | Perth        | null         | null          |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+

每队只能每种类型的球员,每队最多四种类型的球员击球手、投球手、外野手和守门员)。所以每支球队的球员人数最多为四人。因此,保存此数据的最终 table 有九列(一列用于球队 ID 和四名球员的姓名和状态)。

是否可以在 Spark 中完成此操作?我是 Spark 的新手,因此非常感谢能解释这些步骤的答案。

使用 SQL 是可能的,这不是最有效的方法(UDF 会是),但它有效。抱歉,它是 Scala-ish。

val res = spark.sql(
        """select teamID
          |, Batsman.name as `Batsman.name`, Batsman.state as `Batsman.state`
          |, Bowler.name as `Bowler.name`, Bowler.state as `Bowler.state`
          |, Fielder.name as `Fielder.name`, Fielder.state as `Fielder.state`
          |from (
          |   select teamID,
          |     max(case type when 'Batsman' then info end) as Batsman
          |     , max(case type when 'Bowler' then info end) as Bowler
          |     , max(case type when 'Fielder' then info end) as Fielder
          |     from (select teamID, type, struct(name, state) as info from t_player_dtls) group by teamID
      |)""".stripMargin)

我使用 group by 围绕 teamID 列旋转数据,max 将 select 一个不为空的值,case语句将只允许一条记录变成max。为了简化最大案例组合,我使用了 struct 函数,它创建了一个复合列 info ,由我们稍后想要提升到平面模式中的有效载荷组成。

UDF 会更有效率,但我不熟悉python。

更新 两种解决方案(SQL 和 pivot)都使用 explodegroupBy 组合,@Anshuman 更容易编码,具有以下执行计划:

SQL

== Physical Plan ==
SortAggregate(key=[teamID#10], functions=[max(CASE WHEN (type#16 = Batsman) THEN info#31 END), max(CASE WHEN (type#16 = Bowler) THEN info#31 END), max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(teamID#10, 200)
      +- SortAggregate(key=[teamID#10], functions=[partial_max(CASE WHEN (type#16 = Batsman) THEN info#31 END), partial_max(CASE WHEN (type#16 = Bowler) THEN info#31 END), partial_max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
     +- *Sort [teamID#10 ASC NULLS FIRST], false, 0
        +- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, named_struct(name, xplayers#12.name, state, xplayers#12.address.state) AS info#31]
           +- Generate explode(payload#4.players), true, false, [xplayers#12]
              +- *Project [payload#4]
                 +- Scan ExistingRDD[payload#4,someID#5]

枢轴

== Physical Plan ==
SortAggregate(key=[TeamID#10], functions=[first(if ((Type#16 <=> Batsman)) Name#17 else null, true), first(if ((Type#16 <=> Batsman)) State#18 else null, true), first(if ((Type#16 <=> Bowler)) Name#17 else null, true), first(if ((Type#16 <=> Bowler)) State#18 else null, true), first(if ((Type#16 <=> Fielder)) Name#17 else null, true), first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(TeamID#10, 200)
  +- SortAggregate(key=[TeamID#10], functions=[partial_first(if ((Type#16 <=> Batsman)) Name#17 else null, true), partial_first(if ((Type#16 <=> Batsman)) State#18 else null, true), partial_first(if ((Type#16 <=> Bowler)) Name#17 else null, true), partial_first(if ((Type#16 <=> Bowler)) State#18 else null, true), partial_first(if ((Type#16 <=> Fielder)) Name#17 else null, true), partial_first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
     +- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
        +- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, xplayers#12.name AS name#17, xplayers#12.address.state AS state#18]
           +- Generate explode(payload#4.players), true, false, [xplayers#12]
              +- *Project [payload#4]
                 +- Scan ExistingRDD[payload#4,someID#5]

两者都导致随机播放 (Exchange hashpartitioning(TeamID#10, 200)*)。

如果性能是您的目标,那么您可以使用这种 Scala 方法(我不知道 Python)

import org.apache.spark.sql.functions._

  val df_record = spark.read.json(Seq(row_1, row_2).toDS)

  //Define your custom player types, as many as needed
  val playerTypes = Seq("Batsman", "Bowler", "Fielder")

  //Return type for the UDF
  val returnType = StructType(playerTypes.flatMap(t => Seq(StructField(s"$t.Name", StringType), StructField(s"$t.State", StringType))))

  val unpackPlayersUDF = udf( (players: Seq[Row]) => {
    val playerValues: Map[String, Row] = players.map(p => (p.getAs[String]("type"), p)).toMap
    val arrangedValues = playerTypes.flatMap { t =>
      val playerRow = playerValues.get(t) //if type does not exist, than value will be None, which is null
      Seq(
        playerRow.map(_.getAs[String]("name"))
        , playerRow.map(_.getAs[Row]("address").getAs[String]("state"))
      )
    }
    Row(arrangedValues: _*)
  }
  , returnType)

  val udfRes = df_record
    .withColumn("xplayers", unpackPlayersUDF($"payload.players"))
    .select("payload.teamID", "xplayers.*")

  udfRes.show(false)
  udfRes.explain()

输出:

+------+------------+-------------+-----------+------------+------------+-------------+
|teamID|Batsman.Name|Batsman.State|Bowler.Name|Bowler.State|Fielder.Name|Fielder.State|
+------+------------+-------------+-----------+------------+------------+-------------+
|1     |Amar        |Gujarat      |Akbar      |Telangana   |Antony      |Kerala       |
|1     |John        |Queensland   |Smith      |Perth       |null        |null         |
+------+------------+-------------+-----------+------------+------------+-------------+

执行计划如下:

== Physical Plan ==
*Project [payload#4.teamID AS teamID#46, UDF(payload#4.players).Batsman.Name AS Batsman.Name#40, UDF(payload#4.players).Batsman.State AS Batsman.State#41, UDF(payload#4.players).Bowler.Name AS Bowler.Name#42, UDF(payload#4.players).Bowler.State AS Bowler.State#43, UDF(payload#4.players).Fielder.Name AS Fielder.Name#44, UDF(payload#4.players).Fielder.State AS Fielder.State#45]
+- Scan ExistingRDD[payload#4,someID#5]

不涉及随机播放。如果您想进一步提高性能,可以向 spark.read.schem(SCHEMA) 添加一个显式读取模式。json 将进一步提供帮助,因为读者不必这样做推断架构,从而节省时间。

我们可以使用pyspark的pivot函数

from pyspark.sql.functions import first

df = df_player_dtls.groupBy("TeamID").pivot("Type").agg(
                            first('Name').alias('Name'),
                            first("State").alias("State"))
df.show(10,False)