如何将等于 priority_weight 设置为依赖于另一个任务的任务

How to set equal priority_weight to task that depends on another task

我有 8 组任务。每组都是一系列任务:task1 >> task2 >> task3。 task3 依赖于 task2,因此 task2 依赖于 task1。

我的问题是 task2 在所有 task1 完成后才开始。 因此,为了启动 set1.task2,它必须先 运行 set8.task1。

我最初的研究是关于 priority_weight 的东西,可以包含在 DAG 的 default_args 中。我了解到 task1 的下游 priority_weight 会更高。

有没有办法让所有的优先级权重都一样。因此 set1.task2 已经可以启动,而不管 set2、3 等,因为它只依赖于 set1.task1.

谢谢!

weight_rule 设置为 'upstream' 或 'absolute' 应该会有帮助。这是来自 BaseOperator 文档字符串:

:param weight_rule: weighting method used for the effective total
    priority weight of the task. Options are:
    ``{ downstream | upstream | absolute }`` default is ``downstream``
    When set to ``downstream`` the effective weight of the task is the
    aggregate sum of all downstream descendants. As a result, upstream
    tasks will have higher weight and will be scheduled more aggressively
    when using positive weight values. This is useful when you have
    multiple dag run instances and desire to have all upstream tasks to
    complete for all runs before each dag can continue processing
    downstream tasks. When set to ``upstream`` the effective weight is the
    aggregate sum of all upstream ancestors. This is the opposite where
    downtream tasks have higher weight and will be scheduled more
    aggressively when using positive weight values. This is useful when you
    have multiple dag run instances and prefer to have each dag complete
    before starting upstream tasks of other dags.  When set to
    ``absolute``, the effective weight is the exact ``priority_weight``
    specified without additional weighting. You may want to do this when
    you know exactly what priority weight each task should have.
    Additionally, when set to ``absolute``, there is bonus effect of
    significantly speeding up the task creation process as for very large
    DAGS. Options can be set as string or using the constants defined in
    the static class ``airflow.utils.WeightRule``

Link: https://github.com/apache/airflow/blob/master/airflow/models/baseoperator.py#L129-L150

为 DAG 添加任何参数都会应用于该 DAG 下定义的所有任务。您可以在实例化 DAG 时在 default_args 中传递 weight_rule

例如:

with DAG(
    "dag_1",
    schedule_interval="@daily", 
    catchup=False,
    start_date=datetime(2021, 9, 10),
    default_args={
        "priority_weight": 5,
        "pool": "testing_pool",
        "weight_rule": "absolute",
    },
) as dag: