AWS Glue Pyspark,结束有条件的工作?

AWS Glue Pyspark, End a job with a condition?

这似乎是一项简单的任务,但我找不到文档以查看是否可行。基本上我有一个每小时运行一次的粘合作业,并搜索一个文件夹以查看是否已上传数据。在某些情况下,过去一小时内没有数据上传,所以当 Glue 函数运行并发现没有数据时,我希望它终止。那可能吗?下面是一些伪代码来说明我的意思:

def fn(input):
    *fetches list of data*
    return (list of data)

list_of_data = fn(input)
if list_of_data is None:
    Terminate Job

如果您的源是 s3,那么您甚至不需要每小时 运行 您的 Glue 作业来确定源 s3 存储桶中是否有任何 uploads/change 文件。

您可以利用 s3 lambda 触发器,如果​​有任何上传到 s3,它实际上会触发您的 Glue 作业。一旦 lambda 触发,您就可以 start your Glue job. Check out this 观看更多视频。

这样你只能 运行 你的 Glue 作业只有当有上传而不是每小时一次时。

如果您仍然希望每小时 运行 您的 Glue 作业,那么您可以使用 Glue 作业 bookmarking,它仅每隔 运行.

处理最新数据

您概述的伪代码可以像我以前 运行 类似的工作一样工作。

但是,我发现以这种方式使用 Glue 作业非常昂贵,因为即使您的作业 运行 不到一分钟(尤其是在没有文件)。

成本更低的替代方案(但更复杂,因为您将同时使用 S3 事件、SQS 和 Lambda)是执行以下操作:

  1. 在 S3 中设置事件通知以监视相关文件夹中的 PUT 事件,该事件将向 SQS(简单队列服务)发送消息。
  2. 将 SQS 队列的消息保留期设置为 1 小时(或您 运行 执行 Glue 作业的任何时间段)。这样,消息最多只会在队列中等待 1 小时。
  3. 创建一个 Lambda 作业来检查 SQS 队列中的消息(使用 boto3)。基本上你会把你拥有的伪代码放在 Lambda 而不是 Glue 中。如果有消息(这意味着至少有 1 个文件已到达该时间段),则触发 Glue 作业进行处理。如果没有,做事并退出。

以上方法将为您节省$$。

是的,正如 bdcloud 提到的那样,我们可以直接从 Lambda 触发 Glue 作业。在着陆文件夹上创建事件触发器,并在上传文件时触发粘合作业。请在此处找到 AWS Lambda 的代码片段:

from __future__ import print_function
import json
import boto3
import time
import sys
import time
from datetime import datetime

s3 = boto3.client('s3')
glue = boto3.client('glue')

def lambda_handler(event, context):
    gluejobname="<< THE GLUE JOB NAME >>"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist '
              'and your bucket is in the same region as this '
              'function.'.format(source_bucket, source_bucket))
    raise e

我们在生产环境中进行了此设置,并且 运行 在过去的 1.5 年中取得了成功。

谢谢,

尤瓦