在重新启动的 Airflow TaskInstance 中恢复状态
Recovering state in a restarted Airflow TaskInstance
我有一个 Airflow 操作员,它在第 3 方服务上启动一项工作,然后监控该工作的进度。在代码中,执行看起来像
def execute(self, context):
external_id = start_external_job()
wait_until_external_job_completes(external_id)
如果当此任务的实例为 运行 时重新启动 Airflow worker(通常是由于代码部署),我希望该任务的重新启动实例能够在前一个停止(监控第 3 方服务的工作)。有没有办法在同一任务实例的后续运行中共享该第 3 方作业 ID?
增强执行方法的示例如下所示:
def execute(self, context):
external_id = load_external_id_for_task_instance()
if external_id is None:
external_id = start_external_job(args)
persist_external_id_for_task_instance(external_id)
wait_until_external_job_completes(external_id)
我需要实施 load_external_id_for_task_instance
和 persist_external_id_for_task_instance
。
将其分成两个任务
您可以让一个操作员提交作业并将 id 保存到 XCom:
class SubmitJobOperator(BaseOperator):
def execute(self, context):
external_id = start_external_job()
return external_id # return value will be stored in XCom
然后是一个从 XCom 获取 id 并轮询直到完成的传感器:
class JobCompleteSensor(BaseSensor):
@apply_defaults
def __init__(self, submit_task_id, *args, **kwargs):
self.submit_task_id = submit_task_id # so we know where to fetch XCom value from
super(JobCompleteSensor, self).__init__(*args, **kwargs)
def poke(self, context):
external_id = context['task_instance'].xcom_pull(task_ids=self.submit_task_id)
return check_if_external_job_is_complete(external_id):
所以你的 DAG 看起来像这样:
submit_job = SubmitJobOperator(
dag=dag,
task_id='submit_job',
)
wait_for_job_to_complete = JobCompleteSensor(
dag=dag,
task_id='wait_for_job_to_complete',
submit_task_id=submit_job.task_id,
)
submit_job >> wait_for_job_to_complete
XComs 保存在数据库中,因此传感器将始终能够找到之前提交的 external_id
。
我有一个 Airflow 操作员,它在第 3 方服务上启动一项工作,然后监控该工作的进度。在代码中,执行看起来像
def execute(self, context):
external_id = start_external_job()
wait_until_external_job_completes(external_id)
如果当此任务的实例为 运行 时重新启动 Airflow worker(通常是由于代码部署),我希望该任务的重新启动实例能够在前一个停止(监控第 3 方服务的工作)。有没有办法在同一任务实例的后续运行中共享该第 3 方作业 ID?
增强执行方法的示例如下所示:
def execute(self, context):
external_id = load_external_id_for_task_instance()
if external_id is None:
external_id = start_external_job(args)
persist_external_id_for_task_instance(external_id)
wait_until_external_job_completes(external_id)
我需要实施 load_external_id_for_task_instance
和 persist_external_id_for_task_instance
。
您可以让一个操作员提交作业并将 id 保存到 XCom:
class SubmitJobOperator(BaseOperator):
def execute(self, context):
external_id = start_external_job()
return external_id # return value will be stored in XCom
然后是一个从 XCom 获取 id 并轮询直到完成的传感器:
class JobCompleteSensor(BaseSensor):
@apply_defaults
def __init__(self, submit_task_id, *args, **kwargs):
self.submit_task_id = submit_task_id # so we know where to fetch XCom value from
super(JobCompleteSensor, self).__init__(*args, **kwargs)
def poke(self, context):
external_id = context['task_instance'].xcom_pull(task_ids=self.submit_task_id)
return check_if_external_job_is_complete(external_id):
所以你的 DAG 看起来像这样:
submit_job = SubmitJobOperator(
dag=dag,
task_id='submit_job',
)
wait_for_job_to_complete = JobCompleteSensor(
dag=dag,
task_id='wait_for_job_to_complete',
submit_task_id=submit_job.task_id,
)
submit_job >> wait_for_job_to_complete
XComs 保存在数据库中,因此传感器将始终能够找到之前提交的 external_id
。