Spark Scala Dataframe - replace/join 列值具有来自另一个数据帧的值(但已转置)
Spark Scala Dataframe - replace/join column values with values from another dataframe (but is transposed)
我有一个 table 约 300 列填充字符(存储为字符串):
valuesDF:
| FavouriteBeer | FavouriteCheese | ...
|---------------|-----------------|--------
| U | C | ...
| U | E | ...
| I | B | ...
| C | U | ...
| ... | ... | ...
我有一个数据摘要,它将字符映射到它们的实际含义上。它是这样的形式:
summaryDF:
| Field | Value | ValueDesc |
|------------------|-------|---------------|
| FavouriteBeer | U | Unknown |
| FavouriteBeer | C | Carlsberg |
| FavouriteBeer | I | InnisAndGunn |
| FavouriteBeer | D | DoomBar |
| FavouriteCheese | C | Cheddar |
| FavouriteCheese | E | Emmental |
| FavouriteCheese | B | Brie |
| FavouriteCheese | U | Unknown |
| ... | ... | ... |
我想以编程方式将 valuesDF
中每一列的字符值替换为 summaryDF
中的值描述。这是我正在寻找的结果:
finalDF:
| FavouriteBeer | FavouriteCheese | ...
|---------------|-----------------|--------
| Unknown | Cheddar | ...
| Unknown | Emmental | ...
| InnisAndGunn | Brie | ...
| Carlsberg | Unknown | ...
| ... | ... | ...
因为有 ~300 列,我不想为每一列输入 withColumn
方法。
不幸的是,在 Spark 编程方面我还是个新手,尽管在过去的 2 个月里我已经掌握了足够多的知识。
我很确定我需要做的是:
valuesDF.columns.foreach { col => ...... }
遍历每一列
- 使用
col
字符串值 在 Field
上过滤 summaryDF
- 根据当前列
左联接summaryDF
到valuesDF
withColumn
将 valuesDF
中的原始字符代码列替换为新的描述列
- 将新的 DF 指定为
var
- 继续循环
但是,尝试这样做给了我笛卡尔积错误(我确保将连接定义为 "left"
)。
我试过但失败了summaryDF
(因为没有聚合可做??)然后将两个数据帧连接在一起。
这是我尝试过的事情,并且总是得到 NullPointerException
。我知道这 确实 不是正确的方法,并且可以理解为什么我得到空指针......但我真的被卡住了并恢复到旧的,愚蠢的&绝望中的坏 Python 习惯。
var valuesDF = sourceDF
// I converted summaryDF to a broadcasted RDD
// because its small and a "constant" lookup table
summaryBroadcast
.value
.foreach{ x =>
// searchValue = Value (e.g. `U`),
// replaceValue = ValueDescription (e.g. `Unknown`),
val field = x(0).toString
val searchValue = x(1).toString
val replaceValue = x(2).toString
// error catching as summary data does not exactly mapping onto field names
// the joys of business people working in Excel...
try {
// I'm using regexp_replace because I'm lazy
valuesDF = valuesDF
.withColumn( attribute, regexp_replace(col(attribute), searchValue, replaceValue ))
}
catch {case _: Exception =>
null
}
}
有什么想法吗?建议?谢谢。
首先,我们需要一个函数来执行 valuesDf
与 summaryDf
通过 Value
的连接以及 Favourite*
和 [=17= 的相应对]:
private def joinByColumn(colName: String, sourceDf: DataFrame): DataFrame = {
sourceDf.as("src") // alias it to help selecting appropriate columns in the result
// the join
.join(summaryDf, $"Value" === col(colName) && $"Field" === colName, "left")
// we do not need the original `Favourite*` column, so drop it
.drop(colName)
// select all previous columns, plus the one that contains the match
.select("src.*", "ValueDesc")
// rename the resulting column to have the name of the source one
.withColumnRenamed("ValueDesc", colName)
}
现在,为了生成目标结果,我们可以迭代要匹配的列的名称:
val result = Seq("FavouriteBeer",
"FavouriteCheese").foldLeft(valuesDF) {
case(df, colName) => joinByColumn(colName, df)
}
result.show()
+-------------+---------------+
|FavouriteBeer|FavouriteCheese|
+-------------+---------------+
| Unknown| Cheddar|
| Unknown| Emmental|
| InnisAndGunn| Brie|
| Carlsberg| Unknown|
+-------------+---------------+
如果 valuesDf
中的值与 summaryDf
中的任何值都不匹配,此解决方案中的结果单元格将包含 null
。如果您只想将其替换为 Unknown
值,而不是上面的 .select
和 .withColumnRenamed
行,请使用:
.withColumn(colName, when($"ValueDesc".isNotNull, $"ValueDesc").otherwise(lit("Unknown")))
.select("src.*", colName)
我有一个 table 约 300 列填充字符(存储为字符串):
valuesDF:
| FavouriteBeer | FavouriteCheese | ...
|---------------|-----------------|--------
| U | C | ...
| U | E | ...
| I | B | ...
| C | U | ...
| ... | ... | ...
我有一个数据摘要,它将字符映射到它们的实际含义上。它是这样的形式:
summaryDF:
| Field | Value | ValueDesc |
|------------------|-------|---------------|
| FavouriteBeer | U | Unknown |
| FavouriteBeer | C | Carlsberg |
| FavouriteBeer | I | InnisAndGunn |
| FavouriteBeer | D | DoomBar |
| FavouriteCheese | C | Cheddar |
| FavouriteCheese | E | Emmental |
| FavouriteCheese | B | Brie |
| FavouriteCheese | U | Unknown |
| ... | ... | ... |
我想以编程方式将 valuesDF
中每一列的字符值替换为 summaryDF
中的值描述。这是我正在寻找的结果:
finalDF:
| FavouriteBeer | FavouriteCheese | ...
|---------------|-----------------|--------
| Unknown | Cheddar | ...
| Unknown | Emmental | ...
| InnisAndGunn | Brie | ...
| Carlsberg | Unknown | ...
| ... | ... | ...
因为有 ~300 列,我不想为每一列输入 withColumn
方法。
不幸的是,在 Spark 编程方面我还是个新手,尽管在过去的 2 个月里我已经掌握了足够多的知识。
我很确定我需要做的是:
valuesDF.columns.foreach { col => ...... }
遍历每一列- 使用
col
字符串值 在 - 根据当前列 左联接
withColumn
将valuesDF
中的原始字符代码列替换为新的描述列- 将新的 DF 指定为
var
- 继续循环
Field
上过滤 summaryDF
summaryDF
到valuesDF
但是,尝试这样做给了我笛卡尔积错误(我确保将连接定义为 "left"
)。
我试过但失败了summaryDF
(因为没有聚合可做??)然后将两个数据帧连接在一起。
这是我尝试过的事情,并且总是得到 NullPointerException
。我知道这 确实 不是正确的方法,并且可以理解为什么我得到空指针......但我真的被卡住了并恢复到旧的,愚蠢的&绝望中的坏 Python 习惯。
var valuesDF = sourceDF
// I converted summaryDF to a broadcasted RDD
// because its small and a "constant" lookup table
summaryBroadcast
.value
.foreach{ x =>
// searchValue = Value (e.g. `U`),
// replaceValue = ValueDescription (e.g. `Unknown`),
val field = x(0).toString
val searchValue = x(1).toString
val replaceValue = x(2).toString
// error catching as summary data does not exactly mapping onto field names
// the joys of business people working in Excel...
try {
// I'm using regexp_replace because I'm lazy
valuesDF = valuesDF
.withColumn( attribute, regexp_replace(col(attribute), searchValue, replaceValue ))
}
catch {case _: Exception =>
null
}
}
有什么想法吗?建议?谢谢。
首先,我们需要一个函数来执行 valuesDf
与 summaryDf
通过 Value
的连接以及 Favourite*
和 [=17= 的相应对]:
private def joinByColumn(colName: String, sourceDf: DataFrame): DataFrame = {
sourceDf.as("src") // alias it to help selecting appropriate columns in the result
// the join
.join(summaryDf, $"Value" === col(colName) && $"Field" === colName, "left")
// we do not need the original `Favourite*` column, so drop it
.drop(colName)
// select all previous columns, plus the one that contains the match
.select("src.*", "ValueDesc")
// rename the resulting column to have the name of the source one
.withColumnRenamed("ValueDesc", colName)
}
现在,为了生成目标结果,我们可以迭代要匹配的列的名称:
val result = Seq("FavouriteBeer",
"FavouriteCheese").foldLeft(valuesDF) {
case(df, colName) => joinByColumn(colName, df)
}
result.show()
+-------------+---------------+
|FavouriteBeer|FavouriteCheese|
+-------------+---------------+
| Unknown| Cheddar|
| Unknown| Emmental|
| InnisAndGunn| Brie|
| Carlsberg| Unknown|
+-------------+---------------+
如果 valuesDf
中的值与 summaryDf
中的任何值都不匹配,此解决方案中的结果单元格将包含 null
。如果您只想将其替换为 Unknown
值,而不是上面的 .select
和 .withColumnRenamed
行,请使用:
.withColumn(colName, when($"ValueDesc".isNotNull, $"ValueDesc").otherwise(lit("Unknown")))
.select("src.*", colName)