如果列数不同,如何处理从源 spark df 到 hive table 的插入

How to handle inserts from a source spark df to hive table if the number of columns differ

我正在尝试将 pyspark 数据帧写入配置单元 table 但由于我的源 df 只有 5 列而目标有 9 列,因此导致错误。

此外,由于有多种情况,我不想构建可以解决此问题的手动插入查询。我正在寻找一种更好的自动化方法来处理这个问题,而无需为每个案例手动编写查询。

我想通过获取源 df 和目标 table 中存在但不在源 df 中的额外列在 spark 中创建一个新的 df,但它没有按照我的想法工作。

这是我正在处理的代码

#extract cols from src df and tgt df(hive table) 
src_cols = df1.columns
tgt_cols = df2.columns

#get the extra cols (diff)
extra_cols = list(set(tgt_cols) - set(src_cols))
#extra_cols = ['state', 'datetime', 'zipcode', 'type']

#formulate the string to add extra cols
string = ""
for item in extra_cols:
    string += str(".withColumn(\""+item+"\", lit(\"NULL\"))")

这将打印出我可以用于新 df 的所需字符串

#'.withColumn("state", lit(NULL)).withColumn("datetime", lit(NULL)).withColumn("zipcode", lit(NULL)).withColumn("type", lit(NULL))'


new_df = "df1" + string
#'df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL"))'

现在的问题是我无法执行代码 df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL")),因为它是一个字符串

任何人都可以帮助我以更好的方式处理这种情况。

谢谢。

如果您已将列名差异列表识别为

#extra_cols = ['state', 'datetime', 'zipcode', 'type']

那么你不需要制定字符串来添加额外的cols,你可以简单地使用reduce函数在列表上应用.withColumn列名称为

import pyspark.sql.functions as f
to_be_written_df = reduce(lambda temp_df, col_name: temp_df.withColumn(col_name, f.lit('NULL')), extra_cols, df1)

这应该可以解决您的问题