Apache Airflow 1.9:作业结束时的数据流异常
Apache Airflow 1.9 : Dataflow exception at the job's end
感谢 DataflowJavaOperator,我正在使用 Airflow 1.9 在 Google 云平台 (GCP) 上启动数据流。
下面是用于从 Airflow Dag 启动数据流的代码:
df_dispatch_data = DataFlowJavaOperator(
task_id='df-dispatch-data', # Equivalent to JobName
jar="/path/of/my/dataflow/jar",
gcp_conn_id="my_connection_id",
dataflow_default_options={
'project': my_project_id,
'zone': 'europe-west1-b',
'region': 'europe-west1',
'stagingLocation': 'gs://my-bucket/staging',
'tempLocation': 'gs://my-bucket/temp'
},
options={
'workerMachineType': 'n1-standard-1',
'diskSizeGb': '50',
'numWorkers': '1',
'maxNumWorkers': '50',
'schemaBucket': 'schemas_needed_to_dispatch',
'autoscalingAlgorithm': 'THROUGHPUT_BASED',
'readQuery': 'my_query'
}
)
但是,即使因为作业成功在 GCP 上一切正常,我的计算 Airflow 上的数据流作业结束时也会发生异常。它由 gcp_dataflow_hook.py 抛出:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 528, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1584, in run
session=session)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 121, in execute
hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_java_dataflow
task_id, variables, dataflow, name, ["java", "-jar"])
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in _start_dataflow
self.get_conn(), variables['project'], name).wait_for_done()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 31, in __init__
self._job = self._get_job()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 48, in _get_job
job = self._get_job_id_from_name()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 40, in _get_job_id_from_name
for job in jobs['jobs']:
KeyError: 'jobs'
你有想法了吗?
此问题是由用于启动数据流的选项引起的。如果 --zone 或 --region 被赋予 google API 以获取作业状态不起作用,仅当默认区域和区域 US/us-central1.
感谢 DataflowJavaOperator,我正在使用 Airflow 1.9 在 Google 云平台 (GCP) 上启动数据流。
下面是用于从 Airflow Dag 启动数据流的代码:
df_dispatch_data = DataFlowJavaOperator(
task_id='df-dispatch-data', # Equivalent to JobName
jar="/path/of/my/dataflow/jar",
gcp_conn_id="my_connection_id",
dataflow_default_options={
'project': my_project_id,
'zone': 'europe-west1-b',
'region': 'europe-west1',
'stagingLocation': 'gs://my-bucket/staging',
'tempLocation': 'gs://my-bucket/temp'
},
options={
'workerMachineType': 'n1-standard-1',
'diskSizeGb': '50',
'numWorkers': '1',
'maxNumWorkers': '50',
'schemaBucket': 'schemas_needed_to_dispatch',
'autoscalingAlgorithm': 'THROUGHPUT_BASED',
'readQuery': 'my_query'
}
)
但是,即使因为作业成功在 GCP 上一切正常,我的计算 Airflow 上的数据流作业结束时也会发生异常。它由 gcp_dataflow_hook.py 抛出:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 528, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1584, in run
session=session)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 121, in execute
hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_java_dataflow
task_id, variables, dataflow, name, ["java", "-jar"])
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in _start_dataflow
self.get_conn(), variables['project'], name).wait_for_done()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 31, in __init__
self._job = self._get_job()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 48, in _get_job
job = self._get_job_id_from_name()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 40, in _get_job_id_from_name
for job in jobs['jobs']:
KeyError: 'jobs'
你有想法了吗?
此问题是由用于启动数据流的选项引起的。如果 --zone 或 --region 被赋予 google API 以获取作业状态不起作用,仅当默认区域和区域 US/us-central1.