如何使用气流检查 long 运行 http 任务的状态?

How to check status of long running http task with airflow?

我的用例是使用气流控制跨微服务的大量预定作业。我正在尝试的解决方案是使用气流作为集中式作业调度程序并通过进行 http 调用来触发作业。其中一些工作会 运行 很长时间,例如。超过 10 分钟或最多 1 小时。

如何定期从airflow查看这些作业的状态?如果远程任务已经完成但气流不知道工作成功怎么办?我可以将作业完成事件发布到 kafka 并让 airflow 监听 kafka 以获取作业状态吗?

您可以通过多种方式使用 Airflow 和您的微服务来实现这一点。通常,您会想要使用传感器,这是适合此类情况的 Airflow 对象。首先查看 BaseSensorOperator and about operators。在 Airflow 中,传感器的使用就像操作符一样(传感器是操作符)。所以你可以创建一个这样的工作:

http_post_task -> http_sensor_task -> success_task

其中 http_post_task 将触发作业,http_sensor_task 将定期检查作业是否完成(例如 GET 请求微服务并检查 200,也许吧?),success_task 将在 http_sensor_task 成功后执行。

您的 http_sensor_task 需要是您自己的自定义传感器。下面是一些可以帮助您创建此传感器的 sudo 代码(请记住传感器的使用就像运算符一样)。考虑您向微服务发出请求然后发出另一个请求以检查作业状态的情况(GET 请求并检查 200),您将像这样扩展 BaseSensorOperator:

from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from time import sleep
import requests

class HTTPSensorOperator(BaseSensorOperator): 
    """
    Pokes a URL until it returns 200
    """
    ui_color = '#000000'
    @apply_defaults
    def __init__( self, url, *args, **kwargs):
        super(HTTPSensorOperator, self).__init__(*args, **kwargs)
        self.url = url


    def poke(self, context):
        """
        GET request url and return True if response is 200, False otherwise
        """
        r = requests.post(self.url)
        if r.status_code == 200:
            return True
        else:
            return False

    def execute(self, context):
        """
        Check the url and wait for it to return 200.
        """
        started_at = datetime.utcnow()
        while not self.poke(context):
            if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
                if self.soft_fail:
                    raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
                else:
                    raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
            sleep(self.poke_interval)
        self.log.info("Success criteria met. Exiting.")

然后使用如下运算符:

http_sensor_task = HTTPSensorOperator(
      task_id="http_sensor_task",
      url="http://localhost/check_job?job_id=1",
      timeout=3600, # 1 hour
      dag=dag
   )

所以您必须决定您的微服务如何与 Airflow 通信。就在我的脑海中,我想你会发出 1 个请求来触发一项工作,然后发出后续请求(可能每隔 10 秒)来检查一项工作。祝你好运!