运行 任务在气流中以随机顺序排列
Running tasks in a random order in airflow
目前我有一份任务列表,每天都需要在同一时间运行完成,但是它们都是相互独立的。我知道我可以按特定顺序将它们设置为 运行,即 t1 >> t2 >> t3
,但我希望顺序是随机的,因此它们完成的顺序并不总是相同。我怎样才能 运行 一个随机顺序的 airflow 任务列表?
你刚才说它们是相互独立的,你为什么不同时运行它们呢?
这可以通过简单地不使用任何移位运算符来实现,例如:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'Airflow',
'start_date': days_ago(0)
}
dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)
first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)
但是如果你真的想要随机顺序的任务并使它们在某种随机队列中可执行,你可以将所有任务添加到一个列表中,然后它们随机排列。然后迭代任务并使当前依赖于下一个任务,例如:
为此,请使用 random.shuffle()
就地随机播放列表:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
import random
args = {
'owner': 'Airflow',
'start_date': days_ago(0)
}
dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)
first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)
tasks_list = [first_operator, second_operator, third_operator]
random.shuffle(tasks_list)
i = 0
while i < len(tasks_list) - 1:
tasks_list[i] << tasks_list[i + 1]
i += 1
玩得开心!
目前我有一份任务列表,每天都需要在同一时间运行完成,但是它们都是相互独立的。我知道我可以按特定顺序将它们设置为 运行,即 t1 >> t2 >> t3
,但我希望顺序是随机的,因此它们完成的顺序并不总是相同。我怎样才能 运行 一个随机顺序的 airflow 任务列表?
你刚才说它们是相互独立的,你为什么不同时运行它们呢?
这可以通过简单地不使用任何移位运算符来实现,例如:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'Airflow',
'start_date': days_ago(0)
}
dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)
first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)
但是如果你真的想要随机顺序的任务并使它们在某种随机队列中可执行,你可以将所有任务添加到一个列表中,然后它们随机排列。然后迭代任务并使当前依赖于下一个任务,例如:
为此,请使用 random.shuffle()
就地随机播放列表:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
import random
args = {
'owner': 'Airflow',
'start_date': days_ago(0)
}
dag = DAG(dag_id='example_random_task', default_args=args, max_active_runs=0, catchup=False)
first_operator = DummyOperator(task_id='{}_operator'.format("first"), dag=dag)
second_operator = DummyOperator(task_id='{}_operator'.format("second"), dag=dag)
third_operator = DummyOperator(task_id='{}_operator'.format("third"), dag=dag)
tasks_list = [first_operator, second_operator, third_operator]
random.shuffle(tasks_list)
i = 0
while i < len(tasks_list) - 1:
tasks_list[i] << tasks_list[i + 1]
i += 1
玩得开心!