Pyspark DataFrame:找到两个 DataFrame 之间的差异(值和列名)

Pyspark DataFrame: find difference between two DataFrames (values and column names)

我在数据框中总共有 100 多列。 我正在尝试比较两个数据框并找到具有列名的不匹配记录。 我得到了下面的输出代码,但是当我 运行 100 多列作业的代码被中止时。

我这样做是为了发现 SCD 类型 2 增量过程错误。

from pyspark.sql.types import *
from pyspark.sql.functions import *

d2 = sc.parallelize([("A1", 500,1005) ,("A2", 700,10007)])
dataFrame1 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])

d2 = sc.parallelize([("A1", 600,1005),("A2", 700,10007)])
dataFrame2 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])

key_id_col_name="ID"
key_id_value="A1"
dataFrame1.select("ID","VALUE1").subtract(dataFrame2.select("ID",col("VALUE1").alias("value"))).show()

def unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value):
    chk_fst=True
    dataFrame1 = dataFrame1.where(dataFrame1[key_id_col_name] == key_id_value)
    dataFrame2 = dataFrame2.where(dataFrame2[key_id_col_name] == key_id_value)
    col_names = list(set(dataFrame1.columns).intersection(dataFrame2.columns))
    col_names.remove(key_id_col_name)
    for col_name in col_names:
        if chk_fst == True:
            df_tmp = dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name))
            chk_fst = False
        else:
            df_tmp = df_tmp.unionAll(dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name)))
    return df_tmp

res_df = unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value)

res_df.show() 

   >>> dataFrame1.show()
    +---+------+------+
    | ID|VALUE1|VALUE2|
    +---+------+------+
    | A1|   500|  1005|
    | A2|   700| 10007|
    +---+------+------+

    >>> dataFrame2.show()
    +---+------+------+
    | ID|VALUE1|VALUE2|
    +---+------+------+
    | A1|   600|  1005|
    | A2|   700| 10007|
    +---+------+------+

    >>> res_df.show()
    +------+-----+--------+
    |KEY_ID|VALUE|COL_NAME|
    +------+-----+--------+
    |    A1|  500|  VALUE1|
    +------+-----+--------+

请推荐其他方式。

这是另一种方法:

  • 使用 ID 列连接两个 DataFrame。
  • 然后为每一行创建一个新列,其中包含有差异的列。
    • 使用 pyspark.sql.functions.create_map() 将此新列创建为 key-value 对映射。1
    • 映射的键将是列名称。
    • 使用 pyspark.sql.functions.when(),将值设置为 dataFrame1 中的相应值(因为看起来这就是您想要的示例)if 两个DataFrame是有区别的。否则,我们将值设置为 None.
  • 使用pyspark.sql.functions.explode() on the map column, and filter out any rows where the difference is not null using pyspark.sql.functions.isnull()
  • Select 您想要的列并使用 alias() 重命名。

示例:

import pyspark.sql.functions as f
columns = [c for c in dataFrame1.columns if c != 'ID']
dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
    .withColumn(
        'diffs',
        f.create_map(
            *reduce(
                list.__add__,
                [
                    [
                        f.lit(c),
                        f.when(
                            f.col('r.'+c) != f.col('l.'+c),
                            f.col('r.'+c)
                        ).otherwise(None)
                    ] 
                 for c in columns
                ]
            )
        )
    )\
    .select([f.col('ID'), f.explode('diffs')])\
    .where(~f.isnull(f.col('value')))\
    .select(
        f.col('ID').alias('KEY_ID'),
        f.col('value').alias('VALUE'),
        f.col('key').alias('COL_NAME')
    )\
    .show(truncate=False)
#+------+-----+--------+
#|KEY_ID|VALUE|COL_NAME|
#+------+-----+--------+
#|A1    |500  |VALUE1  |
#+------+-----+--------+

备注

1 语法 *reduce(list.__add__, [[f.lit(c), ...] for c in columns]) 作为 create_map() 的参数是一些 python-fu 有助于动态创建地图。

create_map() 需要偶数个参数 - 它假定每对中的第一个参数是键,第二个是值。为了按该顺序放置参数,列表理解为每次迭代生成一个列表。我们使用 list.__add__.

将这个列表列表缩减为平面列表

最后使用 * 运算符解压列表。

这里是中间输出,可以让逻辑更清晰:

dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
    .withColumn(
        'diffs',
        f.create_map(
            *reduce(
                list.__add__,
                [
                    [
                        f.lit(c),
                        f.when(
                            f.col('r.'+c) != f.col('l.'+c),
                            f.col('r.'+c)
                        ).otherwise(None)
                     ] 
                     for c in columns
                ]
            )
        )
    )\
    .select('ID', 'diffs').show(truncate=False)
#+---+-----------------------------------+
#|ID |diffs                              |
#+---+-----------------------------------+
#|A2 |Map(VALUE1 -> null, VALUE2 -> null)|
#|A1 |Map(VALUE1 -> 500, VALUE2 -> null) |
#+---+-----------------------------------+