需要对 s3 中的文件执行验证并将其复制到两个不同的表

need to perform validations on file in s3 and copy it to two different tables

我想验证 s3 中的文件并将所有有效和无效数据发送到 redshift 中的两个不同表。谁能帮忙举个例子?

您可以使用PERMISSIVE 模式从S3 读取文件。在这种模式下,Spark 将创建一个额外的列 _corrupt_record,其中包含有关为特定行发现的问题的信息。然后您可以按该列进行过滤,以将数据框区分为有效数据和无效数据。

data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

# Valid data 
validDF = (spark.read
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json(sc.parallelize(data))
  .filter(col("_corrupt_record").isNull())
)

display(validDF)

# Invalid data 
invalidDF = (spark.read
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json(sc.parallelize(data))
  .filter(col("_corrupt_record").isNotNull())
)

display(invalidDF)

如果你想将 DataFrame 保存到 Redshift table 中,你可以执行以下操作:

preactions = "TRUNCATE schema.table_name"
# Load the data into Redshift    
validDF.write\
            .format("com.databricks.spark.redshift")\
            .option("url", db_redshift_url)\
            .option("user", user)\
            .option("password", password)\
            .option("dbtable", "schema.table_name")\
            .option("aws_iam_role", redshift_copy_role)\
            .option("tempdir", args["TempDir"])\
            .option("preactions", preactions)\
            .mode("append")\
            .save()

以上代码应将 DataFrame 写入 Redshift,您可以在 AWS Glue Spark 作业中使用它。无需使用 psycopg2。