如何使用 AWS Glue 作业覆盖陈旧的分区数据?
How do I overwrite stale partitioned data with AWS Glue job?
我有每天一次转储到 s3:///mydata/year=*/month=*/*.snappy.parquet[=26= 的数据] 作为该月的累计数据。我有一个抓取它以更新 mydata table 的爬虫和一个 CW 规则,它在爬虫成功时调用 lambda,启动 Glue 作业以转换列并输出到 s3:///mydata-transformed/year=*/month=*/*.snappy.parquet 。这个流程基本上有效。但是,我目前遇到的问题是输出数据被附加写入而不是替换那里的内容(因为它是当月的累积数据)。例如,假设在 2020 年 10 月 1 日午夜,10/1 的数据被转储到 s3:///mydata/year=2020/month=10/*.snappy.parquet 。该流将在 s3:///mydata-transformed/year=2020/month=10/*.snappy.parquet 中生成转换后的数据,一切都适合 10/ 1 数据。但是,第二天将 10/1 和 10/2 的数据转储到 s3:///mydata/year=2020/month=10/*.snappy.parquet (覆盖前一天的文件),Glue 作业将在输出文件夹中生成附加数据,即它将包含昨天 运行 的数据,加上今天的 运行(因此 10/1 数据两次,和 10/2 数据)。第二天,它将是 10/1 数据 3X、10/2 数据 2X 和 10/3 数据。等等。 2020/09 及之前的数据没有变化,所以没问题。下面是我的代码的基本结构,删除了样板代码,并用人为的代码代替了真正的转换。
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1, glueContext, "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://<bucket>/mydata-transformed/", "partitionKeys": ["year","month"]}, transformation_ctx = "DataSink0")
job.commit()
我该怎么做才能删除当月前一天的数据并替换为当前作业的数据?有没有办法知道,在我的示例中,源数据中的 month=10 分区已经更改,因此我可以在进行转换之前清除输出中的相同分区并输出新数据?
谢谢。
[编辑]
因此,似乎一种解决方案是获取作业书签,然后使用 CURR_LATEST_PARTITIONS 值来确定在处理数据之前我应该删除哪个分区。就我而言,当我处理 2020/10 时,CURR_LATEST_PARTITIONS 是 2020/09。所以我知道要删除 2020/10 的数据,因为如果 CURR_LATEST_PARTITIONS 是 2020/09,那必须是正在处理的数据。我不太喜欢这个解决方案,但我认为它会起作用,但我不确定我还能做些什么。
您有几个选择:
- DynamicFrameWriter 尚不支持覆盖S3 中的数据。相反,您可以使用 Spark 本机
write()
。然而,对于非常大的数据集,它可能会有点低效,因为将使用单个 worker 来覆盖 S3 中的现有数据。示例如下:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
ds_df1 \
.write.mode('overwrite') \
.format('parquet') \
.partitionBy('year', 'month') \
.save('s3://<bucket>/mydata-transformed/')
job.commit()
- 在lambda函数中,您可以使用删除S3中某个前缀下的数据。使用 Python 和 boto3 的示例是:
import boto3
s3_res = boto3.resource('s3')
bucket = 'my-bucket-name'
# Add any logic to derive required prefix based on year/month/day
prefix = 'mydata/year=2020/month=10/'
s3_res.Bucket(bucket).objects.filter(Prefix=key).delete()
- 您可以使用 Glue 的
purge_s3_path
删除某个前缀的数据。 Link 是 here.
现在 glue 中有删除 S3 路径或删除 glue 目录的功能 table。
您可以使用 purge_s3_path。
请注意,它不会开箱即用,因为默认情况下 'retention period' 是 7 天,这意味着任何超过 168 小时的内容都不会被 purge_s3_path 删除。因此,如果您希望像下面这样删除它,则需要将路径的保留指定为零:
glueContext.purge_s3_path('s3://s3_path/bucket', options={"retentionPeriod":0})
我有每天一次转储到 s3://
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
Transform0 = DynamicFrame.fromDF(ds_df1, glueContext, "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://<bucket>/mydata-transformed/", "partitionKeys": ["year","month"]}, transformation_ctx = "DataSink0")
job.commit()
我该怎么做才能删除当月前一天的数据并替换为当前作业的数据?有没有办法知道,在我的示例中,源数据中的 month=10 分区已经更改,因此我可以在进行转换之前清除输出中的相同分区并输出新数据?
谢谢。
[编辑] 因此,似乎一种解决方案是获取作业书签,然后使用 CURR_LATEST_PARTITIONS 值来确定在处理数据之前我应该删除哪个分区。就我而言,当我处理 2020/10 时,CURR_LATEST_PARTITIONS 是 2020/09。所以我知道要删除 2020/10 的数据,因为如果 CURR_LATEST_PARTITIONS 是 2020/09,那必须是正在处理的数据。我不太喜欢这个解决方案,但我认为它会起作用,但我不确定我还能做些什么。
您有几个选择:
- DynamicFrameWriter 尚不支持覆盖S3 中的数据。相反,您可以使用 Spark 本机
write()
。然而,对于非常大的数据集,它可能会有点低效,因为将使用单个 worker 来覆盖 S3 中的现有数据。示例如下:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
ds_df1 \
.write.mode('overwrite') \
.format('parquet') \
.partitionBy('year', 'month') \
.save('s3://<bucket>/mydata-transformed/')
job.commit()
- 在lambda函数中,您可以使用删除S3中某个前缀下的数据。使用 Python 和 boto3 的示例是:
import boto3
s3_res = boto3.resource('s3')
bucket = 'my-bucket-name'
# Add any logic to derive required prefix based on year/month/day
prefix = 'mydata/year=2020/month=10/'
s3_res.Bucket(bucket).objects.filter(Prefix=key).delete()
- 您可以使用 Glue 的
purge_s3_path
删除某个前缀的数据。 Link 是 here.
现在 glue 中有删除 S3 路径或删除 glue 目录的功能 table。
您可以使用 purge_s3_path。
请注意,它不会开箱即用,因为默认情况下 'retention period' 是 7 天,这意味着任何超过 168 小时的内容都不会被 purge_s3_path 删除。因此,如果您希望像下面这样删除它,则需要将路径的保留指定为零:
glueContext.purge_s3_path('s3://s3_path/bucket', options={"retentionPeriod":0})