Prefect 如何避免重新运行任务
Prefect how to avoid rerunning a task
在 Prefect 中,假设我有一些管道,其中 运行s f(date) 用于列表中的每个日期,并将其保存到文件中。这是一个非常常见的 ETL 操作。在气流中,如果我 运行 这一次,它将回填所有历史日期。如果我再次 运行 它,它会知道该任务已经 运行,并且只有 运行 任何新出现的任务(即最新日期)。
据我所知,在 Prefect 中,它每天都会 运行 整个管道,即使前一天完成了 99% 的任务。在不切换到 Prefect Cloud 的情况下,有哪些解决方案可以解决这个问题?你是否只是做一些事情,比如让每个任务在退出之前在 redis 中缓存它的完成?
Prefect 有许多 first-class 处理缓存的方法,具体取决于您想要的控制程度。对于每个任务,您可以指定是否应缓存结果、应缓存多长时间以及应如何使缓存失效(年龄、任务的不同输入、流参数值等)。
缓存任务的最简单方法是使用 targets,它允许您指定任务具有可模板化的副作用(通常是本地或云存储中的文件,但也可以是例如数据库条目、redis 密钥或其他任何东西)。在任务 运行 之前,它会检查副作用是否存在,如果存在,则跳过 运行.
例如,此任务会将其结果写入一个自动以任务名称和当前日期为模板的本地文件:
@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
return [1, 2, 3, 4, 5]
只要匹配文件存在,任务就不会re-run。因为 {today}
是目标名称的一部分,所以这将隐式缓存任务值一天。您还可以在模板中使用参数(例如回填日期)来复制 Airflow 的行为。
要获得更多控制,您可以通过在任何任务上设置 cache_for
、cache_validator
和 cache_key
来使用 Prefect 的 full cache mechanism。如果设置,任务将以 Cached
状态而不是 Success
状态完成。当与适当的编排后端(如 Prefect Server 或 Prefect Cloud)配对时,Cached
状态可以被同一任务(或具有相同 cache_key
的任何任务)的未来 运行 查询。未来的任务将 return Cached
状态作为它自己的结果。
在 Prefect 中,假设我有一些管道,其中 运行s f(date) 用于列表中的每个日期,并将其保存到文件中。这是一个非常常见的 ETL 操作。在气流中,如果我 运行 这一次,它将回填所有历史日期。如果我再次 运行 它,它会知道该任务已经 运行,并且只有 运行 任何新出现的任务(即最新日期)。
据我所知,在 Prefect 中,它每天都会 运行 整个管道,即使前一天完成了 99% 的任务。在不切换到 Prefect Cloud 的情况下,有哪些解决方案可以解决这个问题?你是否只是做一些事情,比如让每个任务在退出之前在 redis 中缓存它的完成?
Prefect 有许多 first-class 处理缓存的方法,具体取决于您想要的控制程度。对于每个任务,您可以指定是否应缓存结果、应缓存多长时间以及应如何使缓存失效(年龄、任务的不同输入、流参数值等)。
缓存任务的最简单方法是使用 targets,它允许您指定任务具有可模板化的副作用(通常是本地或云存储中的文件,但也可以是例如数据库条目、redis 密钥或其他任何东西)。在任务 运行 之前,它会检查副作用是否存在,如果存在,则跳过 运行.
例如,此任务会将其结果写入一个自动以任务名称和当前日期为模板的本地文件:
@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
return [1, 2, 3, 4, 5]
只要匹配文件存在,任务就不会re-run。因为 {today}
是目标名称的一部分,所以这将隐式缓存任务值一天。您还可以在模板中使用参数(例如回填日期)来复制 Airflow 的行为。
要获得更多控制,您可以通过在任何任务上设置 cache_for
、cache_validator
和 cache_key
来使用 Prefect 的 full cache mechanism。如果设置,任务将以 Cached
状态而不是 Success
状态完成。当与适当的编排后端(如 Prefect Server 或 Prefect Cloud)配对时,Cached
状态可以被同一任务(或具有相同 cache_key
的任何任务)的未来 运行 查询。未来的任务将 return Cached
状态作为它自己的结果。