气流未从 DAG 获取 "start_date"
Airflow not picking up "start_date" from DAG
我正在像这样注册一个新的 DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone
import pendulum
local_tz = pendulum.timezone("UTC")
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2020, 6, 19, 9, 37, 35, tzinfo=local_tz),
'email': ["blah@blah.com"],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15)
}
dag = DAG(
dag_id="some_id",
default_args=default_args,
description= "Some description",
schedule_interval="@once"
)
def to_be_executed_py():
print("I did it, ma!")
with dag:
t1 = PythonOperator(
task_id="some_id",
python_callable=to_be_executed_py)
我希望 运行 在 start_date
中给定的时间只发送一次。上传 DAG(使用 S3)后,我没有看到 "start_date" 的详细信息。相反,我看到了详细信息(在 default_args
下):
{'owner': 'me',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 6, 19, 9, 37, 35, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
'email': ['bleh@blah.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(0, 900)}
我是不是做错了什么?我是否正确假设这应该在给定的 start_time
执行?我到处寻找类似的用例,但没有多少人将 start_date
设置为包含时间。
更新
目前,DAG 运行正在立即 取消暂停。绝对不会选择开始时间。我在网上找到的所有资源都没有适用于此处的答案。
结合使用 schedule_interval='@daily'
、ShortCircuitOperator
和气流变量,您可以解决这个问题; DAG 每天运行并检查今天是否在您作为气流变量输入的日期列表中。如果是,则继续运行下游任务,如果不是,则跳过下游任务,等待明天的后续执行。
这是 DAG 定义:
import airflow.utils.helpers
from airflow.models import DAG, Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='run_on_release_day',
default_args=args,
schedule_interval='@daily'
)
def check_release_date(**context):
release_dates = Variable.get('release_dates')
print(context, release_dates)
return context['ds'] in release_dates
cond = ShortCircuitOperator(
task_id='condition',
python_callable=check_release_date,
dag=dag,
provide_context=True,
)
tasks = [DummyOperator(task_id='task_' + str(i), dag=dag) for i in [1, 2]]
airflow.utils.helpers.chain(cond, *tasks)
解决了问题。这是双重的。第一,我们使用的翻译器是 12 小时制。由于这是在晚上,因此将其设置为过去(导致 Airflow 追赶)。
其次,我们不需要时区。另外,我们没有在任务中设置 dag
。所以代码应该如下所示:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2020, 6, 19, 21, 37, 35),
'email': ["blah@blah.com"],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15)
}
dag = DAG(
dag_id="some_id",
default_args=default_args,
description= "Some description",
schedule_interval="@once"
)
def to_be_executed_py(ds, **kwargs):
print("I did it, ma!")
with dag:
t1 = PythonOperator(
dag=dag,
provide_context=True,
task_id="some_id",
python_callable=to_be_executed_py)
通过这些更改,一切都在给定的时间运行,一次且仅一次。
我正在像这样注册一个新的 DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone
import pendulum
local_tz = pendulum.timezone("UTC")
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2020, 6, 19, 9, 37, 35, tzinfo=local_tz),
'email': ["blah@blah.com"],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15)
}
dag = DAG(
dag_id="some_id",
default_args=default_args,
description= "Some description",
schedule_interval="@once"
)
def to_be_executed_py():
print("I did it, ma!")
with dag:
t1 = PythonOperator(
task_id="some_id",
python_callable=to_be_executed_py)
我希望 运行 在 start_date
中给定的时间只发送一次。上传 DAG(使用 S3)后,我没有看到 "start_date" 的详细信息。相反,我看到了详细信息(在 default_args
下):
{'owner': 'me',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 6, 19, 9, 37, 35, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
'email': ['bleh@blah.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(0, 900)}
我是不是做错了什么?我是否正确假设这应该在给定的 start_time
执行?我到处寻找类似的用例,但没有多少人将 start_date
设置为包含时间。
更新
目前,DAG 运行正在立即 取消暂停。绝对不会选择开始时间。我在网上找到的所有资源都没有适用于此处的答案。
结合使用 schedule_interval='@daily'
、ShortCircuitOperator
和气流变量,您可以解决这个问题; DAG 每天运行并检查今天是否在您作为气流变量输入的日期列表中。如果是,则继续运行下游任务,如果不是,则跳过下游任务,等待明天的后续执行。
这是 DAG 定义:
import airflow.utils.helpers
from airflow.models import DAG, Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='run_on_release_day',
default_args=args,
schedule_interval='@daily'
)
def check_release_date(**context):
release_dates = Variable.get('release_dates')
print(context, release_dates)
return context['ds'] in release_dates
cond = ShortCircuitOperator(
task_id='condition',
python_callable=check_release_date,
dag=dag,
provide_context=True,
)
tasks = [DummyOperator(task_id='task_' + str(i), dag=dag) for i in [1, 2]]
airflow.utils.helpers.chain(cond, *tasks)
解决了问题。这是双重的。第一,我们使用的翻译器是 12 小时制。由于这是在晚上,因此将其设置为过去(导致 Airflow 追赶)。
其次,我们不需要时区。另外,我们没有在任务中设置 dag
。所以代码应该如下所示:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2020, 6, 19, 21, 37, 35),
'email': ["blah@blah.com"],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15)
}
dag = DAG(
dag_id="some_id",
default_args=default_args,
description= "Some description",
schedule_interval="@once"
)
def to_be_executed_py(ds, **kwargs):
print("I did it, ma!")
with dag:
t1 = PythonOperator(
dag=dag,
provide_context=True,
task_id="some_id",
python_callable=to_be_executed_py)
通过这些更改,一切都在给定的时间运行,一次且仅一次。