完美的任务调度
Prefect Task Scheduling
我是 Prefect 的新手,主要使用 Airflow 工作。我整理了一个工作流程,执行良好,但任务不按我期望的顺序执行。流向这里:
with Flow(name='4chan_extract') as flow:
board_param = Parameter(name='board_name', required = True, default='pol')
getData(board= board_param)
checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
upload_raw(url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param)
remove_dupes(board=board_param)
然而,当我使用flow.visualise()
这个流程时,DAG看起来真的odd.
我的理解是上下文运算符with
设置顺序?在每个任务中使用 up_stream
没有帮助。
感谢任何帮助。
如果您希望您的任务按顺序调用,一个接一个,您可以将 upstream_tasks
添加到您的每个任务。此外,为了轻松传递状态依赖项,您可以在调用任务时为任务分配一个名称 (data = get_data(board=board_param)
),这允许将此命名引用传递给下游依赖项。
我只能猜测你希望这个流程看起来像什么,但假设你希望它按顺序 运行,这里有一个完整的例子和一个 DAG 可视化:
from prefect import task, Flow, Parameter
@task
def get_data(board):
pass
@task
def check_db(url):
pass
@task
def upload_raw(url, board):
pass
@task
def remove_duplicates(board):
pass
with Flow(name="4chan_extract") as flow:
board_param = Parameter(name="board_name", required=True, default="pol")
data = get_data(board=board_param)
check = check_db(
url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
)
upload = upload_raw(
url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param,
upstream_tasks=[check],
)
remove_duplicates(board=board_param, upstream_tasks=[upload])
if __name__ == "__main__":
flow.visualize()
我是 Prefect 的新手,主要使用 Airflow 工作。我整理了一个工作流程,执行良好,但任务不按我期望的顺序执行。流向这里:
with Flow(name='4chan_extract') as flow:
board_param = Parameter(name='board_name', required = True, default='pol')
getData(board= board_param)
checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
upload_raw(url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param)
remove_dupes(board=board_param)
然而,当我使用flow.visualise()
这个流程时,DAG看起来真的odd.
我的理解是上下文运算符with
设置顺序?在每个任务中使用 up_stream
没有帮助。
感谢任何帮助。
如果您希望您的任务按顺序调用,一个接一个,您可以将 upstream_tasks
添加到您的每个任务。此外,为了轻松传递状态依赖项,您可以在调用任务时为任务分配一个名称 (data = get_data(board=board_param)
),这允许将此命名引用传递给下游依赖项。
我只能猜测你希望这个流程看起来像什么,但假设你希望它按顺序 运行,这里有一个完整的例子和一个 DAG 可视化:
from prefect import task, Flow, Parameter
@task
def get_data(board):
pass
@task
def check_db(url):
pass
@task
def upload_raw(url, board):
pass
@task
def remove_duplicates(board):
pass
with Flow(name="4chan_extract") as flow:
board_param = Parameter(name="board_name", required=True, default="pol")
data = get_data(board=board_param)
check = check_db(
url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
)
upload = upload_raw(
url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param,
upstream_tasks=[check],
)
remove_duplicates(board=board_param, upstream_tasks=[upload])
if __name__ == "__main__":
flow.visualize()