如果列数不同,如何处理从源 spark df 到 hive table 的插入
How to handle inserts from a source spark df to hive table if the number of columns differ
我正在尝试将 pyspark 数据帧写入配置单元 table 但由于我的源 df 只有 5 列而目标有 9 列,因此导致错误。
此外,由于有多种情况,我不想构建可以解决此问题的手动插入查询。我正在寻找一种更好的自动化方法来处理这个问题,而无需为每个案例手动编写查询。
我想通过获取源 df 和目标 table 中存在但不在源 df 中的额外列在 spark 中创建一个新的 df,但它没有按照我的想法工作。
这是我正在处理的代码
#extract cols from src df and tgt df(hive table)
src_cols = df1.columns
tgt_cols = df2.columns
#get the extra cols (diff)
extra_cols = list(set(tgt_cols) - set(src_cols))
#extra_cols = ['state', 'datetime', 'zipcode', 'type']
#formulate the string to add extra cols
string = ""
for item in extra_cols:
string += str(".withColumn(\""+item+"\", lit(\"NULL\"))")
这将打印出我可以用于新 df 的所需字符串
#'.withColumn("state", lit(NULL)).withColumn("datetime", lit(NULL)).withColumn("zipcode", lit(NULL)).withColumn("type", lit(NULL))'
new_df = "df1" + string
#'df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL"))'
现在的问题是我无法执行代码 df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL"))
,因为它是一个字符串
任何人都可以帮助我以更好的方式处理这种情况。
谢谢。
如果您已将列名差异列表识别为
#extra_cols = ['state', 'datetime', 'zipcode', 'type']
那么你不需要制定字符串来添加额外的cols,你可以简单地使用reduce
函数在列表上应用.withColumn
列名称为
import pyspark.sql.functions as f
to_be_written_df = reduce(lambda temp_df, col_name: temp_df.withColumn(col_name, f.lit('NULL')), extra_cols, df1)
这应该可以解决您的问题
我正在尝试将 pyspark 数据帧写入配置单元 table 但由于我的源 df 只有 5 列而目标有 9 列,因此导致错误。
此外,由于有多种情况,我不想构建可以解决此问题的手动插入查询。我正在寻找一种更好的自动化方法来处理这个问题,而无需为每个案例手动编写查询。
我想通过获取源 df 和目标 table 中存在但不在源 df 中的额外列在 spark 中创建一个新的 df,但它没有按照我的想法工作。
这是我正在处理的代码
#extract cols from src df and tgt df(hive table)
src_cols = df1.columns
tgt_cols = df2.columns
#get the extra cols (diff)
extra_cols = list(set(tgt_cols) - set(src_cols))
#extra_cols = ['state', 'datetime', 'zipcode', 'type']
#formulate the string to add extra cols
string = ""
for item in extra_cols:
string += str(".withColumn(\""+item+"\", lit(\"NULL\"))")
这将打印出我可以用于新 df 的所需字符串
#'.withColumn("state", lit(NULL)).withColumn("datetime", lit(NULL)).withColumn("zipcode", lit(NULL)).withColumn("type", lit(NULL))'
new_df = "df1" + string
#'df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL"))'
现在的问题是我无法执行代码 df1.withColumn("state", lit("NULL")).withColumn("datetime", lit("NULL")).withColumn("zipcode", lit("NULL")).withColumn("type", lit("NULL"))
,因为它是一个字符串
任何人都可以帮助我以更好的方式处理这种情况。
谢谢。
如果您已将列名差异列表识别为
#extra_cols = ['state', 'datetime', 'zipcode', 'type']
那么你不需要制定字符串来添加额外的cols,你可以简单地使用reduce
函数在列表上应用.withColumn
列名称为
import pyspark.sql.functions as f
to_be_written_df = reduce(lambda temp_df, col_name: temp_df.withColumn(col_name, f.lit('NULL')), extra_cols, df1)
这应该可以解决您的问题