生成 2 个 Pyspark 数据帧之间不匹配列的报告

Generate a report of mismatch Columns between 2 Pyspark dataframes

团队,我们需要根据结构完全相同的 2 个 Pyspark 数据帧之间的关键字段生成不匹配列的报告。

这是第一个数据帧-

>>> df.show()
+--------+----+----+----+----+----+----+----+----+
|     key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
|    abcd| 123| xyz|   a|  ab| abc| def| qew| uvw|
|   abcd1| 123| xyz|   a|  ab| abc| def| qew| uvw|
|  abcd12| 123| xyz|   a|  ab| abc| def| qew| uvw|
| abcd123| 123| xyz|   a|  ab| abc| def| qew| uvw|
|abcd1234| 123| xyz|   a|  ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+

这是第二个数据帧-

>>> df1.show()
+--------+----+----+----+----+----+----+----+----+
|     key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
|    abcd| 123| xyz|   a|  ab| abc| def| qew| uvw|
|   abcdx| 123| xyz|   a|  ab| abc| def| qew| uvw|
|  abcd12| 123| xyz|   a| abx| abc|defg| qew| uvw|
| abcd123| 123| xyz|   a|  ab| abc|defg| qew| uvw|
|abcd1234| 123| xyz|   a|  ab|abcd|defg| qew| uvw|
+--------+----+----+----+----+----+----+----+----+

Full Outer Join 给了我这个-

>>> dfFull=df.join(df1,'key','outer')
>>> dfFull.show()
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|     key|col1|col2|col3|col4|col5|col6|col7|col8|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  abcd12| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a| abx| abc|defg| qew| uvw|
|   abcd1| 123| xyz|   a|  ab| abc| def| qew| uvw|null|null|null|null|null|null|null|null|
|abcd1234| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a|  ab|abcd|defg| qew| uvw|
| abcd123| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a|  ab| abc|defg| qew| uvw|
|   abcdx|null|null|null|null|null|null|null|null| 123| xyz|   a|  ab| abc| def| qew| uvw|
|    abcd| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a|  ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+

如果我只看 col6,"key" 字段有 5 个值不匹配(只有最后一条记录的值匹配)。

>>> dfFull.select('key',df['col6'],df1['col6']).show()
+--------+----+----+
|     key|col6|col6|
+--------+----+----+
|  abcd12| def|defg|
|   abcd1| def|null|
|abcd1234| def|defg|
| abcd123| def|defg|
|   abcdx|null| def|
|    abcd| def| def|
+--------+----+----+

我需要为所有列生成类似这样的报告。不匹配样本可以是数据帧中任何记录的值。

colName,NumofMismatch,mismatchSampleFromDf,misMatchSamplefromDf1
col6,5,def,defg
col7,2,null,qew
col8,2,null,uvw
col5,3,null,abc

这是一个基于键的列式摘要,表示有多少值在 2 个数据帧之间不匹配。

席德

假设两个dataframes分别是df1df2,你可以试试下面的方法:

from pyspark.sql.functions import when, array, count, first

# list of columns to be compared
cols = df1.columns[1:]

df_new = (df1.join(df2, "key", "outer")
    .select([ when(~df1[c].eqNullSafe(df2[c]), array(df1[c], df2[c])).alias(c) for c in cols ])
    .selectExpr('stack({},{}) as (colName, mismatch)'.format(len(cols), ','.join('"{0}",`{0}`'.format(c) for c in cols)))
    .filter('mismatch is not NULL'))

df_new.show(10)
+-------+-----------+                                                           
|colName|   mismatch|
+-------+-----------+
|   col4|  [ab, abx]|
|   col6|[def, defg]|
|   col6|[def, defg]|
|   col5|[abc, abcd]|
|   col6|[def, defg]|
|   col1|    [, 123]|
|   col2|    [, xyz]|
|   col3|      [, a]|
|   col4|     [, ab]|
|   col5|    [, abc]|
+-------+-----------+

注: (1) 用于查找不匹配的条件~df1[c].eqNullSafe(df2[c])满足以下任一条件:

+ df1[c] != df2[c]
+ df1[c] is NULL or df2[c] is NULL but not both

(2) 如果存在不匹配项,则保存为 ArrayType 列,第一项来自 df1,第二项来自 [=25] =]df2。如果没有匹配则返回 NULL,然后过滤掉。

(3)stack()函数由Python格式函数动态生成如下:

stack(8,"col1",`col1`,"col2",`col2`,"col3",`col3`,"col4",`col4`,"col5",`col5`,"col6",`col6`,"col7",`col7`,"col8",`col8`) as (colName, mismatch)

有了df_new之后,我们就可以做groupby+聚合了:

df_new.groupby('colName') \
    .agg(count('mismatch').alias('NumOfMismatch'), first('mismatch').alias('mismatch')) \
    .selectExpr('colName', 'NumOfMismatch', 'mismatch[0] as misMatchFromdf1', 'mismatch[1] as misMatchFromdf2')
    .show()
+-------+-------------+---------------+---------------+
|colName|NumOfMismatch|misMatchFromdf1|misMatchFromdf2|
+-------+-------------+---------------+---------------+
|   col8|            2|           null|            uvw|
|   col3|            2|           null|              a|
|   col4|            3|             ab|            abx|
|   col1|            2|           null|            123|
|   col6|            5|            def|           defg|
|   col5|            3|            abc|           abcd|
|   col2|            2|           null|            xyz|
|   col7|            2|           null|            qew|
+-------+-------------+---------------+---------------+