大多数 "Airflownic" 获取生成任务列表的方法

Most "Airflownic" way of getting list for generating tasks

我在 DAG 文件中有一个用于生成动态任务的名称列表,如下所示:

ops = []
for table in list_of_names:
    dw = SnowflakeOperator(task_id=table)
    ops.append(dw)

我的问题是:获得以下列表的最“Airflownic”方式是什么? (考虑到DAG文件中不应该有任何top代码)

list_of_names = ['a', 'b', 'c', 'd', 'e' ... 'z']

list_of_names = list(Variable.get('list_of_names'))

with open('list_of_names.txt', 'r') as list_file:
    list_of_names = list_file.read()
    

list_of_names = ['a', 'b', 'c', 'd', 'e' ... 'z']

我投那个。 Pythonic,很好,很快,不需要添加额外的“实体”。

list_of_names = list(Variable.get('list_of_names'))

非常糟糕。如果您在数据库中有变量,它将在每次调度程序解析文件时查询数据库(可能每 60 秒左右)。

with open('list_of_names.txt', 'r') as list_file: list_of_names = list_file.read()

第二好(或者说第二差)。实际上,我根本不认为它是 Pythonic。将“任务列表”和创建它的 DAG 分开是没有意义的。它们在概念上属于 DAG 的同一定义。除非您想在其他 places/DAGs 中使用该列表,否则没有必要将它们分开。

即使您有这样的需求,也没有必要使用单独的“.txt”或其他“声明性”方式来定义它们。我们在 Python 的世界里。我们已经需要导入和解析 Python 代码。从另一个 python 模块导入 ARRAY 比从 .txt 文件读取它要好得多。在这种情况下,我会投票支持另一种解决方案:

from my_org.common_definitions import MY_TASKS

ops = []
for table in MY_TASKS:
    dw = SnowflakeOperator(task_id=table)
    ops.append(dw)

在 txt 中有一个单独的文件,或者(比如)json-array 如果您需要更多属性,可能只有在您有一些进程时才合理,例如将这些文件定期转储到文件中。然而即便如此,生成一个可导入的 Python 文件,并将数组定义为要导入的 python 常量(如果您可以轻松控制此文件的格式)。