在 Spark 中合并数据帧
Merging Dataframes in Spark
我有 2 个 Dataframes,比如 A 和 B。我想在一个键列上加入它们并创建另一个 Dataframe。当 A 和 B 中的键匹配时,我需要从 B 而不是 A 中获取行。
例如:
数据框A:
Employee1, salary100
Employee2, salary50
Employee3, salary200
数据框 B
Employee1, salary150
Employee2, salary100
Employee4, salary300
生成的 DataFrame 应该是:
数据帧 C:
Employee1, salary150
Employee2, salary100
Employee3, salary200
Employee4, salary300
如何在 Spark 和 Scala 中执行此操作?
尝试:
dfA.registerTempTable("dfA")
dfB.registerTempTable("dfB")
sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee),
coalesce(dfB.salary, dfA.salary) FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")
或
sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee),
CASE dfB.employee IS NOT NULL THEN dfB.salary
CASE dfB.employee IS NOT NULL THEN dfA.salary
END FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")
假设 dfA 和 dfB 有 2 列 emp 和 sal。您可以使用以下内容:
import org.apache.spark.sql.{functions => f}
val dfB1 = dfB
.withColumnRenamed("sal", "salB")
.withColumnRenamed("emp", "empB")
val joined = dfA
.join(dfB1, 'emp === 'empB, "outer")
.select(
f.coalesce('empB, 'emp).as("emp"),
f.coalesce('salB, 'sal).as("sal")
)
注意:对于 emp
的给定值,每个 Dataframe 应该只有一行
我有 2 个 Dataframes,比如 A 和 B。我想在一个键列上加入它们并创建另一个 Dataframe。当 A 和 B 中的键匹配时,我需要从 B 而不是 A 中获取行。
例如:
数据框A:
Employee1, salary100
Employee2, salary50
Employee3, salary200
数据框 B
Employee1, salary150
Employee2, salary100
Employee4, salary300
生成的 DataFrame 应该是:
数据帧 C:
Employee1, salary150
Employee2, salary100
Employee3, salary200
Employee4, salary300
如何在 Spark 和 Scala 中执行此操作?
尝试:
dfA.registerTempTable("dfA")
dfB.registerTempTable("dfB")
sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee),
coalesce(dfB.salary, dfA.salary) FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")
或
sqlContext.sql("""
SELECT coalesce(dfA.employee, dfB.employee),
CASE dfB.employee IS NOT NULL THEN dfB.salary
CASE dfB.employee IS NOT NULL THEN dfA.salary
END FROM dfA FULL OUTER JOIN dfB
ON dfA.employee = dfB.employee""")
假设 dfA 和 dfB 有 2 列 emp 和 sal。您可以使用以下内容:
import org.apache.spark.sql.{functions => f}
val dfB1 = dfB
.withColumnRenamed("sal", "salB")
.withColumnRenamed("emp", "empB")
val joined = dfA
.join(dfB1, 'emp === 'empB, "outer")
.select(
f.coalesce('empB, 'emp).as("emp"),
f.coalesce('salB, 'sal).as("sal")
)
注意:对于 emp
的给定值,每个 Dataframe 应该只有一行