在重新启动的 Airflow TaskInstance 中恢复状态

Recovering state in a restarted Airflow TaskInstance

我有一个 Airflow 操作员,它在第 3 方服务上启动一项工作,然后监控该工作的进度。在代码中,执行看起来像

def execute(self, context):
    external_id = start_external_job()
    wait_until_external_job_completes(external_id)

如果当此任务的实例为 运行 时重新启动 Airflow worker(通常是由于代码部署),我希望该任务的重新启动实例能够在前一个停止(监控第 3 方服务的工作)。有没有办法在同一任务实例的后续运行中共享该第 3 方作业 ID?

增强执行方法的示例如下所示:

def execute(self, context):
    external_id = load_external_id_for_task_instance()
    if external_id is None:
        external_id = start_external_job(args)
        persist_external_id_for_task_instance(external_id)

    wait_until_external_job_completes(external_id)

我需要实施 load_external_id_for_task_instancepersist_external_id_for_task_instance

我建议使用 XComs and Sensors.

将其分成两个任务

您可以让一个操作员提交作业并将 id 保存到 XCom:

class SubmitJobOperator(BaseOperator):

    def execute(self, context):
        external_id = start_external_job()
        return external_id  # return value will be stored in XCom

然后是一个从 XCom 获取 id 并轮询直到完成的传感器:

class JobCompleteSensor(BaseSensor):

    @apply_defaults
    def __init__(self, submit_task_id, *args, **kwargs):
        self.submit_task_id = submit_task_id  # so we know where to fetch XCom value from
        super(JobCompleteSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        external_id = context['task_instance'].xcom_pull(task_ids=self.submit_task_id)
        return check_if_external_job_is_complete(external_id):

所以你的 DAG 看起来像这样:

submit_job = SubmitJobOperator(
    dag=dag,
    task_id='submit_job',
)

wait_for_job_to_complete = JobCompleteSensor(
    dag=dag,
    task_id='wait_for_job_to_complete',
    submit_task_id=submit_job.task_id,
)

submit_job >> wait_for_job_to_complete

XComs 保存在数据库中,因此传感器将始终能够找到之前提交的 external_id