即使设置大于 1 小时,Airflow 任务也会在一小时内超时

Airflow tasks timeout in one hour even the setting is larger than 1 hour

目前我正在使用带有 celery-executor+redis 的气流到 运行 dags,并且我在 S3 键传感器中将 execution_timeout 设置为 12 小时,但它会在一个每次重试小时

我已尝试在 airflow.cfg 中更新 visibility_timeout = 64800,但问题仍然存在

file_sensor = CorrectedS3KeySensor(
    task_id = 'listen_for_file_drop', dag = dag,
    aws_conn_id = 'aws_default', 
    poke_interval = 15,
    timeout = 64800, # 18 hours
    bucket_name = EnvironmentConfigs.S3_SFTP_BUCKET_NAME,
    bucket_key = dag_config[ConfigurationConstants.FILE_S3_PATTERN],
    wildcard_match = True,
    execution_timeout = timedelta(hours=12)
)

根据我的理解,execution_timeout 应该可以在总共四次 运行(重试 = 3)后持续 12 小时。但问题是每次重试,它会在一个小时内失败,总共只持续 4 小时+

[2019-08-06 13:00:08,597] {{base_task_runner.py:101}} INFO - Job 9: Subtask listen_for_file_drop [2019-08-06 13:00:08,595] {{timeout.py:41}} ERROR - Process timed out

[2019-08-06 13:00:08,612] {{models.py:1788}} ERROR - Timeout Traceback (most recent call last):

File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1652, in _run_raw_task result = task_copy.execute(context=context)

File "/usr/local/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 97, in execute while not self.poke(context):

File "/usr/local/airflow/dags/ProcessingStage/sensors/sensors.py", line 91, in poke time.sleep(30)

File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 42, in handle_timeout raise AirflowTaskTimeout(self.error_message)

airflow.exceptions.AirflowTaskTimeout: Timeout

我前几天弄明白了。

因为我使用 AWS 来部署 celery 执行器的气流,所以有一些不正确的 cloudwatch 警报会保持工作人员的规模扩大和缩小 webserver/scheuler :(

这些警报更新后,现在可以正常使用了!!