需要对 s3 中的文件执行验证并将其复制到两个不同的表
need to perform validations on file in s3 and copy it to two different tables
我想验证 s3 中的文件并将所有有效和无效数据发送到 redshift 中的两个不同表。谁能帮忙举个例子?
您可以使用PERMISSIVE
模式从S3 读取文件。在这种模式下,Spark 将创建一个额外的列 _corrupt_record
,其中包含有关为特定行发现的问题的信息。然后您可以按该列进行过滤,以将数据框区分为有效数据和无效数据。
data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')
# Valid data
validDF = (spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json(sc.parallelize(data))
.filter(col("_corrupt_record").isNull())
)
display(validDF)
# Invalid data
invalidDF = (spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json(sc.parallelize(data))
.filter(col("_corrupt_record").isNotNull())
)
display(invalidDF)
如果你想将 DataFrame 保存到 Redshift table 中,你可以执行以下操作:
preactions = "TRUNCATE schema.table_name"
# Load the data into Redshift
validDF.write\
.format("com.databricks.spark.redshift")\
.option("url", db_redshift_url)\
.option("user", user)\
.option("password", password)\
.option("dbtable", "schema.table_name")\
.option("aws_iam_role", redshift_copy_role)\
.option("tempdir", args["TempDir"])\
.option("preactions", preactions)\
.mode("append")\
.save()
以上代码应将 DataFrame 写入 Redshift,您可以在 AWS Glue Spark 作业中使用它。无需使用 psycopg2。
我想验证 s3 中的文件并将所有有效和无效数据发送到 redshift 中的两个不同表。谁能帮忙举个例子?
您可以使用PERMISSIVE
模式从S3 读取文件。在这种模式下,Spark 将创建一个额外的列 _corrupt_record
,其中包含有关为特定行发现的问题的信息。然后您可以按该列进行过滤,以将数据框区分为有效数据和无效数据。
data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')
# Valid data
validDF = (spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json(sc.parallelize(data))
.filter(col("_corrupt_record").isNull())
)
display(validDF)
# Invalid data
invalidDF = (spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json(sc.parallelize(data))
.filter(col("_corrupt_record").isNotNull())
)
display(invalidDF)
如果你想将 DataFrame 保存到 Redshift table 中,你可以执行以下操作:
preactions = "TRUNCATE schema.table_name"
# Load the data into Redshift
validDF.write\
.format("com.databricks.spark.redshift")\
.option("url", db_redshift_url)\
.option("user", user)\
.option("password", password)\
.option("dbtable", "schema.table_name")\
.option("aws_iam_role", redshift_copy_role)\
.option("tempdir", args["TempDir"])\
.option("preactions", preactions)\
.mode("append")\
.save()
以上代码应将 DataFrame 写入 Redshift,您可以在 AWS Glue Spark 作业中使用它。无需使用 psycopg2。