气流未从 DAG 获取 "start_date"

Airflow not picking up "start_date" from DAG

我正在像这样注册一个新的 DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone
import pendulum

local_tz = pendulum.timezone("UTC")

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2020, 6, 19, 9, 37, 35, tzinfo=local_tz),
    'email': ["blah@blah.com"],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=15)
}
dag = DAG(
    dag_id="some_id",
    default_args=default_args,
    description= "Some description",
    schedule_interval="@once"
)

def to_be_executed_py():
    print("I did it, ma!")

with dag:
    t1 = PythonOperator(
        task_id="some_id",
        python_callable=to_be_executed_py)

我希望 运行 在 start_date 中给定的时间只发送一次。上传 DAG(使用 S3)后,我没有看到 "start_date" 的详细信息。相反,我看到了详细信息(在 default_args 下):

{'owner': 'me', 
'depends_on_past': False, 
'start_date': datetime.datetime(2020, 6, 19, 9, 37, 35, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 
'email': ['bleh@blah.com'], 
'email_on_failure': False, 
'email_on_retry': False, 
'retries': 1, 
'retry_delay': datetime.timedelta(0, 900)}

我是不是做错了什么?我是否正确假设这应该在给定的 start_time 执行?我到处寻找类似的用例,但没有多少人将 start_date 设置为包含时间。

更新

目前,DAG 运行正在立即 取消暂停。绝对不会选择开始时间。我在网上找到的所有资源都没有适用于此处的答案。

结合使用 schedule_interval='@daily'ShortCircuitOperator 和气流变量,您可以解决这个问题; DAG 每天运行并检查今天是否在您作为气流变量输入的日期列表中。如果是,则继续运行下游任务,如果不是,则跳过下游任务,等待明天的后续执行。 这是 DAG 定义:

import airflow.utils.helpers
from airflow.models import DAG, Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='run_on_release_day',
    default_args=args,
    schedule_interval='@daily'
)

def check_release_date(**context):
    release_dates = Variable.get('release_dates')
    print(context, release_dates)
    return context['ds'] in release_dates

cond = ShortCircuitOperator(
    task_id='condition',
    python_callable=check_release_date,
    dag=dag,
    provide_context=True,
)

tasks = [DummyOperator(task_id='task_' + str(i), dag=dag) for i in [1, 2]]

airflow.utils.helpers.chain(cond, *tasks)

解决了问题。这是双重的。第一,我们使用的翻译器是 12 小时制。由于这是在晚上,因此将其设置为过去(导致 Airflow 追赶)。

其次,我们不需要时区。另外,我们没有在任务中设置 dag。所以代码应该如下所示:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2020, 6, 19, 21, 37, 35),
    'email': ["blah@blah.com"],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=15)
}
dag = DAG(
    dag_id="some_id",
    default_args=default_args,
    description= "Some description",
    schedule_interval="@once"
)

def to_be_executed_py(ds, **kwargs):
    print("I did it, ma!")

with dag:
    t1 = PythonOperator(
        dag=dag,
        provide_context=True,
        task_id="some_id",
        python_callable=to_be_executed_py)

通过这些更改,一切都在给定的时间运行,一次且仅一次。