pySpark 映射多列

pySpark mapping multiple columns

我需要能够使用多列比较两个数据帧。

pySpark 尝试

# get PrimaryLookupAttributeValue values from reference table in a dictionary to compare them to df1. 

primaryAttributeValue_List = [ p.PrimaryLookupAttributeValue for p in AttributeLookup.select('PrimaryLookupAttributeValue').distinct().collect() ]
primaryAttributeValue_List #dict of value, vary by filter 

Out: ['Archive',
 'Pending Security Deposit',
 'Partially Abandoned',
 'Revision Contract Review',
 'Open',
 'Draft Accounting In Review',
 'Draft Returned']


# compare df1 to PrimaryLookupAttributeValue
output = dataset_standardFalse2.withColumn('ConformedLeaseStatusName', f.when(dataset_standardFalse2['LeaseStatus'].isin(primaryAttributeValue_List), "FOUND").otherwise("TBD"))

display(output)

据我了解,您可以根据 reference_df 中的列创建地图(我假设这不是一个非常大的数据框):

map_key = concat_ws('[=10=]', PrimaryLookupAttributeName, PrimaryLookupAttributeValue)
map_value = OutputItemNameByValue

然后使用这个映射得到df1中对应的值:

from itertools import chain
from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map

d = reference_df.agg(collect_set(array(concat_ws('[=11=]','PrimaryLookupAttributeName','PrimaryLookupAttributeValue'), 'OutputItemNameByValue')).alias('m')).first().m
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseStatus\x00Draft', 'Pending'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]

mappings = create_map([lit(i) for i in chain.from_iterable(d)])

primaryLookupAttributeName_List = ['LeaseType', 'LeaseRecoveryType', 'LeaseStatus']

df1.select("*", *[ mappings[concat_ws('[=11=]', lit(c), col(c))].alias("Matched[{}]OutputItemNameByValue".format(c)) for c in primaryLookupAttributeName_List ]).show()
+----------------+...+---------------------------------------+-----------------------------------------------+-----------------------------------------+
|SourceSystemName|...|Matched[LeaseType]OutputItemNameByValue|Matched[LeaseRecoveryType]OutputItemNameByValue|Matched[LeaseStatus]OutputItemNameByValue|
+----------------+...+---------------------------------------+-----------------------------------------------+-----------------------------------------+
|          ABC123|...|                                   null|                                          Gross|                               Terminated|
|          ABC123|...|                                   null|                                 Modified Gross|                                  Expired|
|          ABC123|...|                                   null|                                 Modified Gross|                                  Pending|
+----------------+...+---------------------------------------+-----------------------------------------------+-----------------------------------------+

更新: 根据通过 reference_df 数据框检索到的信息设置列名:

# a list of domains to retrieve
primaryLookupAttributeName_List = ['LeaseType', 'LeaseRecoveryType', 'LeaseStatus']

# mapping from domain names to column names: using `reference_df`.`TargetAttributeForName`
NEWprimaryLookupAttributeName_List = dict(reference_df.filter(reference_df['DomainName'].isin(primaryLookupAttributeName_List)).agg(collect_set(array('DomainName', 'TargetAttributeForName')).alias('m')).first().m)

test = dataset_standardFalse2.select("*",*[ mappings[concat_ws('[=12=]', lit(c), col(c))].alias(c_name) for c,c_name in NEWprimaryLookupAttributeName_List.items()]) 

注 1: 最好循环遍历 primaryLookupAttributeName_List 以便保留列的顺序并在如果字典中缺少 primaryLookupAttributeName_List 中的任何条目,我们可以设置默认的列名,即 Unknown-<col>。在旧方法中,缺少条目的列将被简单地丢弃。

test = dataset_standardFalse2.select("*",*[ mappings[concat_ws('[=13=]', lit(c), col(c))].alias(NEWprimaryLookupAttributeName_List.get(c,"Unknown-{}".format(c))) for c in primaryLookupAttributeName_List])

Note-2: 根据评论,覆盖现有的列名(未测试):

(1) 使用 select:

test = dataset_standardFalse2.select([c for c in dataset_standardFalse2.columns if c not in NEWprimaryLookupAttributeName_List.values()] + [ mappings[concat_ws('[=14=]', lit(c), col(c))].alias(NEWprimaryLookupAttributeName_List.get(c,"Unknown-{}".format(c))) for c in primaryLookupAttributeName_List]).show()

(2) 使用reduce(如果List很长不推荐):

from functools import reduce

df_new = reduce(lambda d, c: d.withColumn(c, mappings[concat_ws('[=15=]', lit(c), col(c))].alias(NEWprimaryLookupAttributeName_List.get(c,"Unknown-{}".format(c)))), primaryLookupAttributeName_List, dataset_standardFalse2)

参考: