如何基于第二个 DataFrame (Java) 在 Spark DataFrame 中创建新列?

How to create a new column in a Spark DataFrame based on a second DataFrame (Java)?

我有两个 Spark DataFrame,其中一个有两个 cols,id 和 Tag。第二个 DataFrame 有一个 id col,但缺少标签。第一个Dataframe本质上是一个字典,每个id出现一次,而在第二个DataFrame中id可能出现多次。我需要的是在第二个 DataFrame 中创建一个新的 col,它的 Tag 作为每行中 id 的函数(在第二个 DataFrame 中)。我认为这可以通过首先转换为 RDD 等来完成,但我认为必须有一种使用 DataFrames 的更优雅的方式(在 Java 中)。示例:给定一个 df1 Row-> id: 0, Tag: "A", a df2 Row1-> id: 0, Tag: null, a df2 Row2-> id: 0, Tag: "B", 我需要在生成的 DataFrame df3 中创建一个 Tag col 等于 df1(id=0) = "A" IF df2 标签为空,但如果不为空则保留原始标签 => 导致 df3 Row1-> id: 0, Tag: "A", df3 Row2->id:0,标签:"B"。希望例子清楚。

|   ID  |   No.   |  Tag  | new Tag Col |
|    1  |  10002  |   A   |      A      |
|    2  |  10003  |   B   |      B      | 
|    1  |  10004  | null  |      A      |
|    2  |  10005  | null  |      B      |

这里你只需要左外连接和coalesce:

import org.apache.spark.sql.functions.coalesce

val df = sc.parallelize(Seq(
  (1, 10002, Some("A")), (2, 10003, Some("B")),
  (1, 10004, None), (2, 10005, None)
)).toDF("id", "no", "tag")

val lookup = sc.parallelize(Seq(
  (1, "A"), (2, "B")
)).toDF("id", "tag")


df.join(lookup, df.col("id").equalTo(lookup.col("id")), "leftouter")
  .withColumn("new_tag", coalesce(df.col("tag"), lookup.col("tag")))

这应该与 Java 版本几乎相同。