Airflow 的 DAG 在一分钟内 运行 多次,尽管它被安排为每 5 分钟 运行
Airflow's DAG runs multiple times in one minute, although it was scheduled to run every 5 minutes
我使用 cron 语法创建了一个计划每 5 分钟执行一次的 DAG。
此外,池是为此 dag 创建的,只有一个插槽。
我已尝试重新启动 server/scheduler 并重置数据库。目前,DAG 在 UTC 时间是 运行。此外,我尝试设置我的本地时区,即 'Europe/Minsk' (UTC+3) - 但它没有任何效果。
import random
import time
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'pool': 'download',
# 'priority_weight': 10,
# 'queue': 'bash_queue',
}
params = {
'table': 'api_avitoimage',
}
dag = DAG(
dag_id='test_download_avitoimage',
default_args=default_args,
schedule_interval='*/5 * * * *',
)
def sleep_for_a_bit(random_base):
time.sleep(random_base)
with dag:
download = BashOperator(
task_id='download',
bash_command='/usr/bin/python3 /home/artur/downloader.py --table {{ params.table }}',
params=params,
dag=dag)
sleep = PythonOperator(
task_id='sleep_for_a_bit',
python_callable=sleep_for_a_bit,
op_kwargs={'random_base': random.uniform(0, 1)},
dag=dag,
)
download >> sleep
问题:DAG每分钟运行~2-3次,完全是不正确的执行。
已编辑: 碰巧有 16/16 个同时活动的 DAG runs.But 我不明白这个 "magic number 16" 来自哪里。
默认情况下,Airflow 尝试完成自 start_date
以来的所有 "missed" 个 DAG。由于您的 start_date
设置为 airflow.utils.dates.days_ago(2)
,Airflow 在开始按计划启动 DAG 之前将 运行 DAG 576 次。您可以通过将 catchup = False
添加到 DAG 定义(而不是 default_args)来关闭它。
幻数16来自参数max_active_runs_per_dag = 16
,默认设置。
我使用 cron 语法创建了一个计划每 5 分钟执行一次的 DAG。 此外,池是为此 dag 创建的,只有一个插槽。
我已尝试重新启动 server/scheduler 并重置数据库。目前,DAG 在 UTC 时间是 运行。此外,我尝试设置我的本地时区,即 'Europe/Minsk' (UTC+3) - 但它没有任何效果。
import random
import time
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'pool': 'download',
# 'priority_weight': 10,
# 'queue': 'bash_queue',
}
params = {
'table': 'api_avitoimage',
}
dag = DAG(
dag_id='test_download_avitoimage',
default_args=default_args,
schedule_interval='*/5 * * * *',
)
def sleep_for_a_bit(random_base):
time.sleep(random_base)
with dag:
download = BashOperator(
task_id='download',
bash_command='/usr/bin/python3 /home/artur/downloader.py --table {{ params.table }}',
params=params,
dag=dag)
sleep = PythonOperator(
task_id='sleep_for_a_bit',
python_callable=sleep_for_a_bit,
op_kwargs={'random_base': random.uniform(0, 1)},
dag=dag,
)
download >> sleep
问题:DAG每分钟运行~2-3次,完全是不正确的执行。 已编辑: 碰巧有 16/16 个同时活动的 DAG runs.But 我不明白这个 "magic number 16" 来自哪里。
默认情况下,Airflow 尝试完成自 start_date
以来的所有 "missed" 个 DAG。由于您的 start_date
设置为 airflow.utils.dates.days_ago(2)
,Airflow 在开始按计划启动 DAG 之前将 运行 DAG 576 次。您可以通过将 catchup = False
添加到 DAG 定义(而不是 default_args)来关闭它。
幻数16来自参数max_active_runs_per_dag = 16
,默认设置。