Google Cloud Dataflow Python,正在检索作业 ID

Google Cloud Dataflow Python, Retrieving Job ID

我目前正在 Python 中处理 Dataflow Template,我想访问作业 ID 并使用它保存到特定的 Firestore 文档。

是否可以访问作业 ID?

我在文档中找不到与此相关的任何内容。

通过浏览文档,您应该从启动作业中获得的响应应该包含一个 json 正文,其中 属性 "job" 是 [=11= 的一个实例].

您应该可以使用它来获取您需要的 ID。

如果您使用 google 云 SDK 进行数据流,您可能会在 templates() 上调用创建方法时得到 different object

您可以使用 Google Dataflow API. Use the projects.jobs.list 方法检索数据流作业 ID。

您可以通过从管道中调用 dataflow.projects().locations().jobs().list 来实现(请参阅下面的完整代码)。一种可能性是始终使用相同的作业名称调用模板,这是有道理的,否则作业前缀可以作为运行时参数传递。应用正则表达式解析作业列表以查看作业是否包含名称前缀,如果包含,则 returns 作业 ID。如果有多个,它只会 return 最新的(当前 运行)。

模板在定义 PROJECTBUCKET 变量后暂存,其中:

python script.py \
    --runner DataflowRunner \
    --project $PROJECT \
    --staging_location gs://$BUCKET/staging \
    --temp_location gs://$BUCKET/temp \
    --template_location gs://$BUCKET/templates/retrieve_job_id

然后,在执行模板作业时指定所需的作业名称(myjobprefix 在我的例子中):

gcloud dataflow jobs run myjobprefix \
   --gcs-location gs://$BUCKET/templates/retrieve_job_id

retrieve_job_id 函数将 return 作业中的作业 ID,更改 job_prefix 以匹配给定的名称。

import argparse, logging, re
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def retrieve_job_id(element):
  project = 'PROJECT_ID'
  job_prefix = "myjobprefix"
  location = 'us-central1'

  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))

  try:
    credentials = GoogleCredentials.get_application_default()
    dataflow = build('dataflow', 'v1b3', credentials=credentials)

    result = dataflow.projects().locations().jobs().list(
      projectId=project,
      location=location,
    ).execute()

    job_id = "none"

    for job in result['jobs']:
      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
        job_id = job['id']
        break

    logging.info("Job ID: {}".format(job_id))
    return job_id

  except Exception as e:
    logging.info("Error retrieving Job ID")
    raise KeyError(e)


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  p = beam.Pipeline(options=pipeline_options)

  init_data = (p
               | 'Start' >> beam.Create(["Init pipeline"])
               | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))

  p.run()


if __name__ == '__main__':
  run()

以下代码片段启动存储在 GCS 存储桶中的 Dataflow 模板,从启动模板的响应正文中获取作业 ID API,最后轮询 例如,数据流作业的最终作业状态每 10 秒一次。

Cloud 的 Google 响应正文的官方文档是 here

到目前为止,我只看到了 Dataflow 作业的六个作业状态,如果我错过了其他的,请告诉我。

def launch_dataflow_template(project_id, location, credentials, template_path):
    dataflow = googleapiclient.discovery.build('dataflow', 'v1b3', credentials=credentials)
    logger.info(f"Template path: {template_path}")
    result = dataflow.projects().locations().templates().launch(
            projectId=project_id,
            location=location,
            body={
                ...
            },
            gcsPath=template_path  # dataflow template path
    ).execute()
    return result.get('job', {}).get('id')

def poll_dataflow_job_status(project_id, location, credentials, job_id):
    dataflow = googleapiclient.discovery.build('dataflow', 'v1b3', credentials=credentials)
    # executing states are not the final states of a Dataflow job, they show that the Job is transitioning into another upcoming state
    executing_states = ['JOB_STATE_PENDING', 'JOB_STATE_RUNNING', 'JOB_STATE_CANCELLING']
    # final states do not change further
    final_states = ['JOB_STATE_DONE', 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED']
    while True:
        job_desc =_get_dataflow_job_status(dataflow, project_id, location, job_id)
        if job_desc['currentState'] in executing_states:
            pass
        elif job_desc['currentState'] in final_states:
            break
        sleep(10)
    return job_id, job_desc['currentState']

您可以在 2.35.0 中使用这些波束函数获取 gcp 元数据。 您可以访问文档 https://beam.apache.org/releases/pydoc/2.35.0/_modules/apache_beam/io/gcp/gce_metadata_util.html#fetch_dataflow_job_id

beam.io.gcp.gce_metadata_util._fetch_custom_gce_metadata("job_name")
beam.io.gcp.gce_metadata_util._fetch_custom_gce_metadata("job_id")