使用 PySpark 将数据上传到 Redshift

Upload data to Redshift with PySpark

我有一个用pyspark写的脚本。我尝试做的是使用 pyspark 从 AWS 中的 S3 存储桶中读取 *.csv 文件。

我创建了一个包含所有数据的 DataFrame,select 我需要的所有列并将它们转换成我的 Redshift table 期望的类型:

    mapping = [('id', StringType), ('session', StringType), ('ip', StringType)]

    df = spark.read.\
        format("csv").\
        option("header", True).\
        load(f"...")
    
    rows_to_map = [field[0] for field in columns_mapping]
    # We need to select only specific columns
    mapped_df = df.select(*rows_to_map)
    # Now need to cast types
    for mapping in columns_mapping:
        mapped_df = mapped_df.withColumn(mapping[0], mapped_df[mapping[0]].cast(mapping[1]()))
    
    mapped_df.printSchema()
    
    mapped_df.write.format("com.databricks.spark.redshift").\
        option("url", "...").\
        option("dbtable", "...").\
        option("tempdir", "...").\
        option("user", "...").\
        option("password", "...").\
        option("aws_iam_role", "...").\
        mode("append").\
        save()

我在将数据插入 redshift 期间收到错误消息:检查 'stl_load_errors' 系统 table 了解详细信息。

我看到它试图随机(几乎)读取 csv 中的列。

我的数据框的架构:

|-- id: string (nullable = true) 
|-- session: string (nullable = true) 
|-- ip: string (nullable = true)
...

正如你在第一行看到的那样,id -> session -> ip ... 但是我的 Redshift table 显示具有相同字段但顺序不同的架构。前 3 行:

|-- id: string (nullable = true) 
|-- created_at: long (nullable = true) 
|-- session: string (nullable = true)

结果在第二列他哭着说我正在尝试将 STRING 写入 LONG 列。而不是 created_at 他从文件会话中读取。

问题:我的 DataFrame(tmp_file) 中的列顺序是否很重要? 有什么解决办法吗?处理每个文件会花费太多时间。

感谢您的帮助。

在您的 redshift table 中提供列名列表,并在写入前重新排列 Spark 数据框中的列:

# redshift table columns, in correct order
colnames = ['id', 'created_at', 'session', ...]   

mapped_df = mapped_df.select(colnames)
mapped_df.write(...)