完美的任务调度

Prefect Task Scheduling

我是 Prefect 的新手,主要使用 Airflow 工作。我整理了一个工作流程,执行良好,但任务不按我期望的顺序执行。流向这里:

with Flow(name='4chan_extract') as flow:
    board_param = Parameter(name='board_name', required = True, default='pol')
    getData(board= board_param)
    checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
    upload_raw(url="postgresql://postgres:user@localhost:5434/postgres", 
    board=board_param)
    remove_dupes(board=board_param)

然而,当我使用flow.visualise()这个流程时,DAG看起来真的odd.

我的理解是上下文运算符with设置顺序?在每个任务中使用 up_stream 没有帮助。

感谢任何帮助。

如果您希望您的任务按顺序调用,一个接一个,您可以将 upstream_tasks 添加到您的每个任务。此外,为了轻松传递状态依赖项,您可以在调用任务时为任务分配一个名称 (data = get_data(board=board_param)),这允许将此命名引用传递给下游依赖项。

我只能猜测你希望这个流程看起来像什么,但假设你希望它按顺序 运行,这里有一个完整的例子和一个 DAG 可视化:

from prefect import task, Flow, Parameter


@task
def get_data(board):
    pass


@task
def check_db(url):
    pass


@task
def upload_raw(url, board):
    pass


@task
def remove_duplicates(board):
    pass


with Flow(name="4chan_extract") as flow:
    board_param = Parameter(name="board_name", required=True, default="pol")
    data = get_data(board=board_param)
    check = check_db(
        url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
    )
    upload = upload_raw(
        url="postgresql://postgres:user@localhost:5434/postgres",
        board=board_param,
        upstream_tasks=[check],
    )
    remove_duplicates(board=board_param, upstream_tasks=[upload])

if __name__ == "__main__":
    flow.visualize()