使用 PySpark 将数据上传到 Redshift
Upload data to Redshift with PySpark
我有一个用pyspark写的脚本。我尝试做的是使用 pyspark 从 AWS 中的 S3 存储桶中读取 *.csv 文件。
我创建了一个包含所有数据的 DataFrame,select 我需要的所有列并将它们转换成我的 Redshift table 期望的类型:
mapping = [('id', StringType), ('session', StringType), ('ip', StringType)]
df = spark.read.\
format("csv").\
option("header", True).\
load(f"...")
rows_to_map = [field[0] for field in columns_mapping]
# We need to select only specific columns
mapped_df = df.select(*rows_to_map)
# Now need to cast types
for mapping in columns_mapping:
mapped_df = mapped_df.withColumn(mapping[0], mapped_df[mapping[0]].cast(mapping[1]()))
mapped_df.printSchema()
mapped_df.write.format("com.databricks.spark.redshift").\
option("url", "...").\
option("dbtable", "...").\
option("tempdir", "...").\
option("user", "...").\
option("password", "...").\
option("aws_iam_role", "...").\
mode("append").\
save()
我在将数据插入 redshift 期间收到错误消息:检查 'stl_load_errors' 系统 table 了解详细信息。
我看到它试图随机(几乎)读取 csv 中的列。
我的数据框的架构:
|-- id: string (nullable = true)
|-- session: string (nullable = true)
|-- ip: string (nullable = true)
...
正如你在第一行看到的那样,id -> session -> ip ...
但是我的 Redshift table 显示具有相同字段但顺序不同的架构。前 3 行:
|-- id: string (nullable = true)
|-- created_at: long (nullable = true)
|-- session: string (nullable = true)
结果在第二列他哭着说我正在尝试将 STRING 写入 LONG 列。而不是 created_at 他从文件会话中读取。
问题:我的 DataFrame(tmp_file) 中的列顺序是否很重要?
有什么解决办法吗?处理每个文件会花费太多时间。
感谢您的帮助。
在您的 redshift table 中提供列名列表,并在写入前重新排列 Spark 数据框中的列:
# redshift table columns, in correct order
colnames = ['id', 'created_at', 'session', ...]
mapped_df = mapped_df.select(colnames)
mapped_df.write(...)
我有一个用pyspark写的脚本。我尝试做的是使用 pyspark 从 AWS 中的 S3 存储桶中读取 *.csv 文件。
我创建了一个包含所有数据的 DataFrame,select 我需要的所有列并将它们转换成我的 Redshift table 期望的类型:
mapping = [('id', StringType), ('session', StringType), ('ip', StringType)]
df = spark.read.\
format("csv").\
option("header", True).\
load(f"...")
rows_to_map = [field[0] for field in columns_mapping]
# We need to select only specific columns
mapped_df = df.select(*rows_to_map)
# Now need to cast types
for mapping in columns_mapping:
mapped_df = mapped_df.withColumn(mapping[0], mapped_df[mapping[0]].cast(mapping[1]()))
mapped_df.printSchema()
mapped_df.write.format("com.databricks.spark.redshift").\
option("url", "...").\
option("dbtable", "...").\
option("tempdir", "...").\
option("user", "...").\
option("password", "...").\
option("aws_iam_role", "...").\
mode("append").\
save()
我在将数据插入 redshift 期间收到错误消息:检查 'stl_load_errors' 系统 table 了解详细信息。
我看到它试图随机(几乎)读取 csv 中的列。
我的数据框的架构:
|-- id: string (nullable = true)
|-- session: string (nullable = true)
|-- ip: string (nullable = true)
...
正如你在第一行看到的那样,id -> session -> ip ... 但是我的 Redshift table 显示具有相同字段但顺序不同的架构。前 3 行:
|-- id: string (nullable = true)
|-- created_at: long (nullable = true)
|-- session: string (nullable = true)
结果在第二列他哭着说我正在尝试将 STRING 写入 LONG 列。而不是 created_at 他从文件会话中读取。
问题:我的 DataFrame(tmp_file) 中的列顺序是否很重要? 有什么解决办法吗?处理每个文件会花费太多时间。
感谢您的帮助。
在您的 redshift table 中提供列名列表,并在写入前重新排列 Spark 数据框中的列:
# redshift table columns, in correct order
colnames = ['id', 'created_at', 'session', ...]
mapped_df = mapped_df.select(colnames)
mapped_df.write(...)