根据列合并两个火花数据框

Merge two spark dataframes based on a column

我有 2 个数据框需要根据列(员工代码)合并。请注意,数据框大约有 75 列,因此我提供了一个样本数据集来获得一些 suggestions/sample 的解决方案。我正在使用数据块,数据集是从 S3 读取的。

以下是我的 2 个数据帧:

DATAFRAME - 1

|-----------------------------------------------------------------------------------|
|EMP_CODE   |COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6|COLUMN7|COLUMN8|COLUMN9|
|-----------------------------------------------------------------------------------|
|A10001     |   B   |       |       |       |       |       |       |       |       |
|-----------------------------------------------------------------------------------|


DATAFRAME - 2
|-----------------------------------------------------------------------------------|
|EMP_CODE   |COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6|COLUMN7|COLUMN8|COLUMN9|
|-----------------------------------------------------------------------------------|
|A10001     |       |       |       |       |   C   |       |       |       |       |   
|B10001     |       |       |       |       |       |       |       |       |T2     |
|A10001     |       |       |       |       |       |       |       |   B   |       |
|A10001     |       |       |   C   |       |       |       |       |       |       |
|C10001     |       |       |       |       |       |   C   |       |       |       |
|-----------------------------------------------------------------------------------|   

我需要合并基于 EMP_CODE 的 2 个数据帧,基本上是基于 emp_code 将 dataframe1 与 dataframe2 连接起来。我在进行联接时收到重复的列,我正在寻求帮助。

预期的最终数据帧:

|-----------------------------------------------------------------------------------|
|EMP_CODE   |COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6|COLUMN7|COLUMN8|COLUMN9|
|-----------------------------------------------------------------------------------|
|A10001     |   B   |       |   C   |       |   C   |       |       |   B   |       |   
|B10001     |       |       |       |       |       |       |       |       |T2     |
|C10001     |       |       |       |       |       |   C   |       |       |       |
|-----------------------------------------------------------------------------------|       

dataframe1 中有 3 行 emp_code A10001,dataframe2 中有 1 行。所有数据应合并为一条记录,没有任何重复列。

非常感谢

你可以使用内连接

output = df1.join(df2,['EMP_CODE'],how='inner')

您也可以在末尾应用 distinct 来删除重复项。

output = df1.join(df2,['EMP_CODE'],how='inner').distinct()

首先,您需要聚合各个数据帧。

from pyspark.sql import functions as F
df1 = df1.groupBy('EMP_CODE').agg(F.concat_ws(" ", F.collect_list(df1.COLUMN1)))

您必须为所有列和所有数据框编写此代码。 然后你必须在所有数据帧上使用联合函数。

df1.union(df2)

然后在该联合数据帧上重复相同的聚合。

如果两个数据帧具有相同的列,您可以在 Scala 中执行此操作

output = df1.union(df2)

你需要的是工会。

如果两个数据帧具有相同的列数并且要成为“union-ed”的列在位置上相同(如您的示例所示),这将起作用:

output = df1.union(df2).dropDuplicates()

如果两个数据框具有相同的列数并且需要“union-ed”的列具有相同的名称(在您的示例中也是如此),这样会更好:

output = df1.unionByName(df2).dropDuplicates()