无法获取 Airflow 中数据流作业的状态
Not able to GET the state of Dataflow job in Airflow
最近我将 Apache Beam 升级到 2.20.0,然后我的数据流作业的 Airflow 任务开始失败,尽管数据流作业本身成功了。
我注意到升级后数据流的 GET API 在 URL
中使用位置而不是作业 ID
GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/us-central1?alt=json
理想情况下 URL 应该是这样的
GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/{job_id}?alt=json
有人可以解释为什么会这样吗?
这是一个已知的 Airflow issue. The fix 已合并到 Airflow master 但尚未发布。
作为解决方法,您可以使用 Apache Beam SDK 2.19.0 或在自定义挂钩中实施修复(与 dataflow_hook.py 相同,但应用了建议的 change),然后实施自定义使用这个钩子的运算符。
这是我的做法:
首先,我创建了一个名为 my_dataflow_hook.py
:
的文件
import re
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class _myDataflow(_Dataflow):
@staticmethod
def _extract_job(line):
job_id_pattern = re.compile(
br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
matched_job = job_id_pattern.search(line or '')
if matched_job:
return matched_job.group(1).decode()
class MyDataFlowHook(DataFlowHook):
@GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _myDataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
self.poll_sleep, job_id,
self.num_retries).wait_for_done()
然后,我创建了一个名为 my_dataflow_python.py
:
的文件
import re
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin
class MyDataFlowPythonOperator(DataFlowPythonOperator):
def execute(self, context):
"""Execute the python dataflow job."""
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = self.dataflow_default_options.copy()
dataflow_options.update(self.options)
# Convert argument names from lowerCamelCase to snake case.
camel_to_snake = lambda name: re.sub(
r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options)
class MyDataFlowPlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'dataflow_fix_plugin'
operators = [MyDataFlowPythonOperator]
最后,我按照以下结构将这些文件上传到 Composer 环境的存储桶中:
├── dags
│ └── my_dag.py
└── plugins
├── hooks
│ └── my_dataflow_hook.py
└── my_dataflow_python.py
现在,我可以在我的 DAG 中使用 MyDataFlowPythonOperator
创建任务:
from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowPythonOperator
...
with DAG("df-python-test", default_args=default_args) as dag:
test_task = MyDataFlowPythonOperator(dag=dag, task_id="df-python", py_file=PY_FILE, job_name=JOB_NAME)
最近我将 Apache Beam 升级到 2.20.0,然后我的数据流作业的 Airflow 任务开始失败,尽管数据流作业本身成功了。
我注意到升级后数据流的 GET API 在 URL
中使用位置而不是作业 IDGET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/us-central1?alt=json
理想情况下 URL 应该是这样的
GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/{job_id}?alt=json
有人可以解释为什么会这样吗?
这是一个已知的 Airflow issue. The fix 已合并到 Airflow master 但尚未发布。
作为解决方法,您可以使用 Apache Beam SDK 2.19.0 或在自定义挂钩中实施修复(与 dataflow_hook.py 相同,但应用了建议的 change),然后实施自定义使用这个钩子的运算符。 这是我的做法:
首先,我创建了一个名为 my_dataflow_hook.py
:
import re
from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class _myDataflow(_Dataflow):
@staticmethod
def _extract_job(line):
job_id_pattern = re.compile(
br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
matched_job = job_id_pattern.search(line or '')
if matched_job:
return matched_job.group(1).decode()
class MyDataFlowHook(DataFlowHook):
@GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _myDataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
self.poll_sleep, job_id,
self.num_retries).wait_for_done()
然后,我创建了一个名为 my_dataflow_python.py
:
import re
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin
class MyDataFlowPythonOperator(DataFlowPythonOperator):
def execute(self, context):
"""Execute the python dataflow job."""
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = self.dataflow_default_options.copy()
dataflow_options.update(self.options)
# Convert argument names from lowerCamelCase to snake case.
camel_to_snake = lambda name: re.sub(
r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options)
class MyDataFlowPlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'dataflow_fix_plugin'
operators = [MyDataFlowPythonOperator]
最后,我按照以下结构将这些文件上传到 Composer 环境的存储桶中:
├── dags
│ └── my_dag.py
└── plugins
├── hooks
│ └── my_dataflow_hook.py
└── my_dataflow_python.py
现在,我可以在我的 DAG 中使用 MyDataFlowPythonOperator
创建任务:
from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowPythonOperator
...
with DAG("df-python-test", default_args=default_args) as dag:
test_task = MyDataFlowPythonOperator(dag=dag, task_id="df-python", py_file=PY_FILE, job_name=JOB_NAME)