Spark Scala Dataframe - replace/join 列值具有来自另一个数据帧的值(但已转置)

Spark Scala Dataframe - replace/join column values with values from another dataframe (but is transposed)

我有一个 table 约 300 列填充字符(存储为字符串):

valuesDF:

| FavouriteBeer | FavouriteCheese | ...
|---------------|-----------------|--------
| U             | C               | ...
| U             | E               | ...
| I             | B               | ...
| C             | U               | ...
| ...           | ...             | ...

我有一个数据摘要,它将字符映射到它们的实际含义上。它是这样的形式:

summaryDF:

| Field            | Value | ValueDesc     |
|------------------|-------|---------------|
|  FavouriteBeer   |   U   |  Unknown      |
|  FavouriteBeer   |   C   |  Carlsberg    |
|  FavouriteBeer   |   I   |  InnisAndGunn |
|  FavouriteBeer   |   D   |  DoomBar      |
|  FavouriteCheese |   C   |  Cheddar      |
|  FavouriteCheese |   E   |  Emmental     |
|  FavouriteCheese |   B   |  Brie         |
|  FavouriteCheese |   U   |  Unknown      |
|  ...             |  ...  |    ...        |

我想以编程方式将 valuesDF 中每一列的字符值替换为 summaryDF 中的值描述。这是我正在寻找的结果:

finalDF:

| FavouriteBeer | FavouriteCheese | ...
|---------------|-----------------|--------
| Unknown       | Cheddar         | ...
| Unknown       | Emmental        | ...
| InnisAndGunn  | Brie            | ...
| Carlsberg     | Unknown         | ...
| ...           | ...             | ...

因为有 ~300 列,我不想为每一列输入 withColumn 方法。

不幸的是,在 Spark 编程方面我还是个新手,尽管在过去的 2 个月里我已经掌握了足够多的知识。


我很确定我需要做的是:

  1. valuesDF.columns.foreach { col => ...... } 遍历每一列
  2. 使用 col 字符串值
  3. Field 上过滤 summaryDF
  4. 根据当前列
  5. 左联接summaryDFvaluesDF
  6. withColumnvaluesDF 中的原始字符代码列替换为新的描述列
  7. 将新的 DF 指定为 var
  8. 继续循环

但是,尝试这样做给了我笛卡尔积错误(我确保将连接定义为 "left")。

我试过但失败了summaryDF(因为没有聚合可做??)然后将两个数据帧连接在一起。

这是我尝试过的事情,并且总是得到 NullPointerException。我知道这 确实 不是正确的方法,并且可以理解为什么我得到空指针......但我真的被卡住了并恢复到旧的,愚蠢的&绝望中的坏 Python 习惯。

var valuesDF = sourceDF
// I converted summaryDF to a broadcasted RDD 
// because its small and a "constant" lookup table
summaryBroadcast
 .value
 .foreach{ x =>

   // searchValue = Value (e.g. `U`), 
   // replaceValue = ValueDescription (e.g. `Unknown`), 

   val field = x(0).toString
   val searchValue = x(1).toString
   val replaceValue = x(2).toString

   // error catching as summary data does not exactly mapping onto field names
   // the joys of business people working in Excel...
   try {
     // I'm using regexp_replace because I'm lazy
     valuesDF = valuesDF
       .withColumn( attribute, regexp_replace(col(attribute), searchValue, replaceValue ))
   }
   catch {case _: Exception =>
     null
   }
}

有什么想法吗?建议?谢谢。

首先,我们需要一个函数来执行 valuesDfsummaryDf 通过 Value 的连接以及 Favourite* 和 [=17= 的相应对]:

private def joinByColumn(colName: String, sourceDf: DataFrame): DataFrame = {
  sourceDf.as("src") // alias it to help selecting appropriate columns in the result
          // the join 
          .join(summaryDf, $"Value" === col(colName) && $"Field" === colName, "left")
          // we do not need the original `Favourite*` column, so drop it
          .drop(colName)
          // select all previous columns, plus the one that contains the match
          .select("src.*", "ValueDesc")
          // rename the resulting column to have the name of the source one
          .withColumnRenamed("ValueDesc", colName)
}

现在,为了生成目标结果,我们可以迭代要匹配的列的名称:

val result = Seq("FavouriteBeer", 
                 "FavouriteCheese").foldLeft(valuesDF) { 
                    case(df, colName) => joinByColumn(colName, df) 
                 }

result.show()
+-------------+---------------+
|FavouriteBeer|FavouriteCheese|
+-------------+---------------+
|      Unknown|        Cheddar|
|      Unknown|       Emmental|
| InnisAndGunn|           Brie|
|    Carlsberg|        Unknown|
+-------------+---------------+

如果 valuesDf 中的值与 summaryDf 中的任何值都不匹配,此解决方案中的结果单元格将包含 null。如果您只想将其替换为 Unknown 值,而不是上面的 .select.withColumnRenamed 行,请使用:

.withColumn(colName, when($"ValueDesc".isNotNull, $"ValueDesc").otherwise(lit("Unknown")))
.select("src.*", colName)