无法获取 Airflow 中数据流作业的状态

Not able to GET the state of Dataflow job in Airflow

最近我将 Apache Beam 升级到 2.20.0,然后我的数据流作业的 Airflow 任务开始失败,尽管数据流作业本身成功了。

我注意到升级后数据流的 GET API 在 URL

中使用位置而不是作业 ID
GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/us-central1?alt=json

理想情况下 URL 应该是这样的

GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/{job_id}?alt=json

有人可以解释为什么会这样吗?

这是一个已知的 Airflow issue. The fix 已合并到 Airflow master 但尚未发布。

作为解决方法,您可以使用 Apache Beam SDK 2.19.0 或在自定义挂钩中实施修复(与 dataflow_hook.py 相同,但应用了建议的 change),然后实施自定义使用这个钩子的运算符。 这是我的做法:

首先,我创建了一个名为 my_dataflow_hook.py:

的文件
import re

from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook


class _myDataflow(_Dataflow):
    @staticmethod
    def _extract_job(line):
        job_id_pattern = re.compile(
            br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
        matched_job = job_id_pattern.search(line or '')
        if matched_job:
            return matched_job.group(1).decode()


class MyDataFlowHook(DataFlowHook):
    @GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
    def _start_dataflow(self, variables, name, command_prefix, label_formatter):
        variables = self._set_variables(variables)
        cmd = command_prefix + self._build_cmd(variables, label_formatter)
        job_id = _myDataflow(cmd).wait_for_done()
        _DataflowJob(self.get_conn(), variables['project'], name,
                     variables['region'],
                     self.poll_sleep, job_id,
                     self.num_retries).wait_for_done()

然后,我创建了一个名为 my_dataflow_python.py:

的文件
import re

from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin


class MyDataFlowPythonOperator(DataFlowPythonOperator):
    def execute(self, context):
        """Execute the python dataflow job."""
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
        hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
        dataflow_options = self.dataflow_default_options.copy()
        dataflow_options.update(self.options)
        # Convert argument names from lowerCamelCase to snake case.
        camel_to_snake = lambda name: re.sub(
            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
        formatted_options = {camel_to_snake(key): dataflow_options[key]
                             for key in dataflow_options}
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options)

class MyDataFlowPlugin(AirflowPlugin):
    """Expose Airflow operators."""

    name = 'dataflow_fix_plugin'
    operators = [MyDataFlowPythonOperator]

最后,我按照以下结构将这些文件上传到 Composer 环境的存储桶中:

├── dags
│   └── my_dag.py
└── plugins
    ├── hooks
    │   └── my_dataflow_hook.py
    └── my_dataflow_python.py

现在,我可以在我的 DAG 中使用 MyDataFlowPythonOperator 创建任务:

from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowPythonOperator
...
with DAG("df-python-test", default_args=default_args) as dag:
    test_task = MyDataFlowPythonOperator(dag=dag, task_id="df-python", py_file=PY_FILE, job_name=JOB_NAME)