从芹菜后端获取所有任务的列表
Getting list of all tasks from celery backend
我有一个 celery 应用程序,它使用后端来存储已完成任务的结果。
虽然任务是queued/running,但我可以获取有关它们的信息,但是在它们完成后,如何从结果后端获取所有任务ID的列表?
我想在 python 应用程序中以不依赖于特定后端的方式执行此操作(例如,我可能想在文件系统和 MySQL 之间切换作为结果存储在未来)。
我认为最好的解决方案是将有关已完成任务的信息存储在数据库中。首先,它很容易加工。
这里只是一个 SQLite
的例子。 Table 我们的任务:
# You can add specific columns for args, kwargs etc.
# it is just an example
CREATE TABLE celery_tasks (
"id" INTEGER PRIMARY KEY,
"task_id" TEXT NOT NULL,
"task_name" TEXT NOT NULL,
"state" TEXT NOT NULL,
"created" TEXT NOT NULL
)
我们的芹菜应用程序tasks.py
:
import celery
from celery.signals import task_postrun
from celery.task import Task
import sqlite3
from datetime import datetime
@task_postrun.connect()
def task_postrun(signal=None, sender=None, task_id=None, task=None,
args=None, kwargs=None, retval=None, state=None):
# For example we don't want to store info about specific tasks
ignored_tasks = ('tasks.ignore_task', )
if task.name not in ignored_tasks:
# write info about a finished task into SQLite
conn = sqlite3.connect('db')
conn.execute(
'INSERT INTO celery_tasks (task_id, task_name, state, created) VALUES (?,?,?,?)',
(task_id, task.name, state, datetime.now())
)
conn.commit()
conn.close()
app = celery.Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
)
@app.task
def success_task():
pass
@app.task
def failure_task():
raise Exception('something wrong')
@app.task
def ignore_task():
"""
Example of the task that we want to ignore for SQLite.
"""
pass
run_tasks.py
:
from tasks import success_task, failure_task, ignore_task
success_task.delay()
failure_task.delay()
ignore_task.delay()
因此,在此之后您可以使用常规 SQL 查询在代码的任何位置获取有关已完成任务的任何信息。(SELECT * FROM celery_tasks WHERE created ... AND ...
)
您也可以不时清除table。
我认为使用 db 是一个很好的解决方案。
还有一种方法。
您可以配置 CELERY_RESULT_BACKEND 设置。在这种情况下,celery
将创建 celery_tasksetmeta
、celery_taskmeta
table。任务数据将自动实现:
app = Celery(
'app_name',
broker='CELERY_BROKER_URL...',
backend='db+mysql://credentials...',
)
希望这对您有所帮助。
我有一个 celery 应用程序,它使用后端来存储已完成任务的结果。
虽然任务是queued/running,但我可以获取有关它们的信息,但是在它们完成后,如何从结果后端获取所有任务ID的列表?
我想在 python 应用程序中以不依赖于特定后端的方式执行此操作(例如,我可能想在文件系统和 MySQL 之间切换作为结果存储在未来)。
我认为最好的解决方案是将有关已完成任务的信息存储在数据库中。首先,它很容易加工。
这里只是一个 SQLite
的例子。 Table 我们的任务:
# You can add specific columns for args, kwargs etc.
# it is just an example
CREATE TABLE celery_tasks (
"id" INTEGER PRIMARY KEY,
"task_id" TEXT NOT NULL,
"task_name" TEXT NOT NULL,
"state" TEXT NOT NULL,
"created" TEXT NOT NULL
)
我们的芹菜应用程序tasks.py
:
import celery
from celery.signals import task_postrun
from celery.task import Task
import sqlite3
from datetime import datetime
@task_postrun.connect()
def task_postrun(signal=None, sender=None, task_id=None, task=None,
args=None, kwargs=None, retval=None, state=None):
# For example we don't want to store info about specific tasks
ignored_tasks = ('tasks.ignore_task', )
if task.name not in ignored_tasks:
# write info about a finished task into SQLite
conn = sqlite3.connect('db')
conn.execute(
'INSERT INTO celery_tasks (task_id, task_name, state, created) VALUES (?,?,?,?)',
(task_id, task.name, state, datetime.now())
)
conn.commit()
conn.close()
app = celery.Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0',
)
@app.task
def success_task():
pass
@app.task
def failure_task():
raise Exception('something wrong')
@app.task
def ignore_task():
"""
Example of the task that we want to ignore for SQLite.
"""
pass
run_tasks.py
:
from tasks import success_task, failure_task, ignore_task
success_task.delay()
failure_task.delay()
ignore_task.delay()
因此,在此之后您可以使用常规 SQL 查询在代码的任何位置获取有关已完成任务的任何信息。(SELECT * FROM celery_tasks WHERE created ... AND ...
)
您也可以不时清除table。 我认为使用 db 是一个很好的解决方案。
还有一种方法。
您可以配置 CELERY_RESULT_BACKEND 设置。在这种情况下,celery
将创建 celery_tasksetmeta
、celery_taskmeta
table。任务数据将自动实现:
app = Celery(
'app_name',
broker='CELERY_BROKER_URL...',
backend='db+mysql://credentials...',
)
希望这对您有所帮助。