生成 2 个 Pyspark 数据帧之间不匹配列的报告
Generate a report of mismatch Columns between 2 Pyspark dataframes
团队,我们需要根据结构完全相同的 2 个 Pyspark 数据帧之间的关键字段生成不匹配列的报告。
这是第一个数据帧-
>>> df.show()
+--------+----+----+----+----+----+----+----+----+
| key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
| abcd| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd1| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd12| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd123| 123| xyz| a| ab| abc| def| qew| uvw|
|abcd1234| 123| xyz| a| ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+
这是第二个数据帧-
>>> df1.show()
+--------+----+----+----+----+----+----+----+----+
| key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
| abcd| 123| xyz| a| ab| abc| def| qew| uvw|
| abcdx| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd12| 123| xyz| a| abx| abc|defg| qew| uvw|
| abcd123| 123| xyz| a| ab| abc|defg| qew| uvw|
|abcd1234| 123| xyz| a| ab|abcd|defg| qew| uvw|
+--------+----+----+----+----+----+----+----+----+
Full Outer Join 给了我这个-
>>> dfFull=df.join(df1,'key','outer')
>>> dfFull.show()
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| key|col1|col2|col3|col4|col5|col6|col7|col8|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| abcd12| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| abx| abc|defg| qew| uvw|
| abcd1| 123| xyz| a| ab| abc| def| qew| uvw|null|null|null|null|null|null|null|null|
|abcd1234| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| ab|abcd|defg| qew| uvw|
| abcd123| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| ab| abc|defg| qew| uvw|
| abcdx|null|null|null|null|null|null|null|null| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
如果我只看 col6,"key" 字段有 5 个值不匹配(只有最后一条记录的值匹配)。
>>> dfFull.select('key',df['col6'],df1['col6']).show()
+--------+----+----+
| key|col6|col6|
+--------+----+----+
| abcd12| def|defg|
| abcd1| def|null|
|abcd1234| def|defg|
| abcd123| def|defg|
| abcdx|null| def|
| abcd| def| def|
+--------+----+----+
我需要为所有列生成类似这样的报告。不匹配样本可以是数据帧中任何记录的值。
colName,NumofMismatch,mismatchSampleFromDf,misMatchSamplefromDf1
col6,5,def,defg
col7,2,null,qew
col8,2,null,uvw
col5,3,null,abc
这是一个基于键的列式摘要,表示有多少值在 2 个数据帧之间不匹配。
席德
假设两个dataframes分别是df1和df2,你可以试试下面的方法:
from pyspark.sql.functions import when, array, count, first
# list of columns to be compared
cols = df1.columns[1:]
df_new = (df1.join(df2, "key", "outer")
.select([ when(~df1[c].eqNullSafe(df2[c]), array(df1[c], df2[c])).alias(c) for c in cols ])
.selectExpr('stack({},{}) as (colName, mismatch)'.format(len(cols), ','.join('"{0}",`{0}`'.format(c) for c in cols)))
.filter('mismatch is not NULL'))
df_new.show(10)
+-------+-----------+
|colName| mismatch|
+-------+-----------+
| col4| [ab, abx]|
| col6|[def, defg]|
| col6|[def, defg]|
| col5|[abc, abcd]|
| col6|[def, defg]|
| col1| [, 123]|
| col2| [, xyz]|
| col3| [, a]|
| col4| [, ab]|
| col5| [, abc]|
+-------+-----------+
注: (1) 用于查找不匹配的条件~df1[c].eqNullSafe(df2[c])
满足以下任一条件:
+ df1[c] != df2[c]
+ df1[c] is NULL or df2[c] is NULL but not both
(2) 如果存在不匹配项,则保存为 ArrayType 列,第一项来自 df1,第二项来自 [=25] =]df2。如果没有匹配则返回 NULL,然后过滤掉。
(3)stack()函数由Python格式函数动态生成如下:
stack(8,"col1",`col1`,"col2",`col2`,"col3",`col3`,"col4",`col4`,"col5",`col5`,"col6",`col6`,"col7",`col7`,"col8",`col8`) as (colName, mismatch)
有了df_new之后,我们就可以做groupby+聚合了:
df_new.groupby('colName') \
.agg(count('mismatch').alias('NumOfMismatch'), first('mismatch').alias('mismatch')) \
.selectExpr('colName', 'NumOfMismatch', 'mismatch[0] as misMatchFromdf1', 'mismatch[1] as misMatchFromdf2')
.show()
+-------+-------------+---------------+---------------+
|colName|NumOfMismatch|misMatchFromdf1|misMatchFromdf2|
+-------+-------------+---------------+---------------+
| col8| 2| null| uvw|
| col3| 2| null| a|
| col4| 3| ab| abx|
| col1| 2| null| 123|
| col6| 5| def| defg|
| col5| 3| abc| abcd|
| col2| 2| null| xyz|
| col7| 2| null| qew|
+-------+-------------+---------------+---------------+
团队,我们需要根据结构完全相同的 2 个 Pyspark 数据帧之间的关键字段生成不匹配列的报告。
这是第一个数据帧-
>>> df.show()
+--------+----+----+----+----+----+----+----+----+
| key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
| abcd| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd1| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd12| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd123| 123| xyz| a| ab| abc| def| qew| uvw|
|abcd1234| 123| xyz| a| ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+
这是第二个数据帧-
>>> df1.show()
+--------+----+----+----+----+----+----+----+----+
| key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
| abcd| 123| xyz| a| ab| abc| def| qew| uvw|
| abcdx| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd12| 123| xyz| a| abx| abc|defg| qew| uvw|
| abcd123| 123| xyz| a| ab| abc|defg| qew| uvw|
|abcd1234| 123| xyz| a| ab|abcd|defg| qew| uvw|
+--------+----+----+----+----+----+----+----+----+
Full Outer Join 给了我这个-
>>> dfFull=df.join(df1,'key','outer')
>>> dfFull.show()
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| key|col1|col2|col3|col4|col5|col6|col7|col8|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
| abcd12| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| abx| abc|defg| qew| uvw|
| abcd1| 123| xyz| a| ab| abc| def| qew| uvw|null|null|null|null|null|null|null|null|
|abcd1234| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| ab|abcd|defg| qew| uvw|
| abcd123| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| ab| abc|defg| qew| uvw|
| abcdx|null|null|null|null|null|null|null|null| 123| xyz| a| ab| abc| def| qew| uvw|
| abcd| 123| xyz| a| ab| abc| def| qew| uvw| 123| xyz| a| ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
如果我只看 col6,"key" 字段有 5 个值不匹配(只有最后一条记录的值匹配)。
>>> dfFull.select('key',df['col6'],df1['col6']).show()
+--------+----+----+
| key|col6|col6|
+--------+----+----+
| abcd12| def|defg|
| abcd1| def|null|
|abcd1234| def|defg|
| abcd123| def|defg|
| abcdx|null| def|
| abcd| def| def|
+--------+----+----+
我需要为所有列生成类似这样的报告。不匹配样本可以是数据帧中任何记录的值。
colName,NumofMismatch,mismatchSampleFromDf,misMatchSamplefromDf1
col6,5,def,defg
col7,2,null,qew
col8,2,null,uvw
col5,3,null,abc
这是一个基于键的列式摘要,表示有多少值在 2 个数据帧之间不匹配。
席德
假设两个dataframes分别是df1和df2,你可以试试下面的方法:
from pyspark.sql.functions import when, array, count, first
# list of columns to be compared
cols = df1.columns[1:]
df_new = (df1.join(df2, "key", "outer")
.select([ when(~df1[c].eqNullSafe(df2[c]), array(df1[c], df2[c])).alias(c) for c in cols ])
.selectExpr('stack({},{}) as (colName, mismatch)'.format(len(cols), ','.join('"{0}",`{0}`'.format(c) for c in cols)))
.filter('mismatch is not NULL'))
df_new.show(10)
+-------+-----------+
|colName| mismatch|
+-------+-----------+
| col4| [ab, abx]|
| col6|[def, defg]|
| col6|[def, defg]|
| col5|[abc, abcd]|
| col6|[def, defg]|
| col1| [, 123]|
| col2| [, xyz]|
| col3| [, a]|
| col4| [, ab]|
| col5| [, abc]|
+-------+-----------+
注: (1) 用于查找不匹配的条件~df1[c].eqNullSafe(df2[c])
满足以下任一条件:
+ df1[c] != df2[c]
+ df1[c] is NULL or df2[c] is NULL but not both
(2) 如果存在不匹配项,则保存为 ArrayType 列,第一项来自 df1,第二项来自 [=25] =]df2。如果没有匹配则返回 NULL,然后过滤掉。
(3)stack()函数由Python格式函数动态生成如下:
stack(8,"col1",`col1`,"col2",`col2`,"col3",`col3`,"col4",`col4`,"col5",`col5`,"col6",`col6`,"col7",`col7`,"col8",`col8`) as (colName, mismatch)
有了df_new之后,我们就可以做groupby+聚合了:
df_new.groupby('colName') \
.agg(count('mismatch').alias('NumOfMismatch'), first('mismatch').alias('mismatch')) \
.selectExpr('colName', 'NumOfMismatch', 'mismatch[0] as misMatchFromdf1', 'mismatch[1] as misMatchFromdf2')
.show()
+-------+-------------+---------------+---------------+
|colName|NumOfMismatch|misMatchFromdf1|misMatchFromdf2|
+-------+-------------+---------------+---------------+
| col8| 2| null| uvw|
| col3| 2| null| a|
| col4| 3| ab| abx|
| col1| 2| null| 123|
| col6| 5| def| defg|
| col5| 3| abc| abcd|
| col2| 2| null| xyz|
| col7| 2| null| qew|
+-------+-------------+---------------+---------------+