Apache Airflow 1.9:作业结束时的数据流异常

Apache Airflow 1.9 : Dataflow exception at the job's end

感谢 DataflowJavaOperator,我正在使用 Airflow 1.9 在 Google 云平台 (GCP) 上启动数据流。

下面是用于从 Airflow Dag 启动数据流的代码:

df_dispatch_data = DataFlowJavaOperator(
    task_id='df-dispatch-data',  # Equivalent to JobName
    jar="/path/of/my/dataflow/jar",
    gcp_conn_id="my_connection_id",
    dataflow_default_options={
        'project': my_project_id,
        'zone': 'europe-west1-b',
        'region': 'europe-west1',
        'stagingLocation': 'gs://my-bucket/staging',
        'tempLocation': 'gs://my-bucket/temp'
    },
    options={
        'workerMachineType': 'n1-standard-1',
        'diskSizeGb': '50',
        'numWorkers': '1',
        'maxNumWorkers': '50',
        'schemaBucket': 'schemas_needed_to_dispatch',
        'autoscalingAlgorithm': 'THROUGHPUT_BASED',
        'readQuery': 'my_query'
    }
)

但是,即使因为作业成功在 GCP 上一切正常,我的计算 Airflow 上的数据流作业结束时也会发生异常。它由 gcp_dataflow_hook.py 抛出:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 528, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1584, in run
    session=session)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 121, in execute
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_java_dataflow
    task_id, variables, dataflow, name, ["java", "-jar"])
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in _start_dataflow
    self.get_conn(), variables['project'], name).wait_for_done()
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 31, in __init__
    self._job = self._get_job()
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 48, in _get_job
    job = self._get_job_id_from_name()
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 40, in _get_job_id_from_name
    for job in jobs['jobs']:
KeyError: 'jobs'

你有想法了吗?

此问题是由用于启动数据流的选项引起的。如果 --zone 或 --region 被赋予 google API 以获取作业状态不起作用,仅当默认区域和区域 US/us-central1.