如何按任务名称对 Celery 任务进行评级?

How to rate limit Celery tasks by task name?

我正在使用 Celery 处理来自 Django 应用程序的异步任务。大多数任务都很短,运行 只需几秒钟,但我有一项任务可能需要几个小时。

由于我的服务器上的处理限制,Celery 被配置为一次只能 运行 2 个任务。这意味着,如果有人启动了两个这样的 运行ning 任务,它会有效地阻止所有其他 Celery 处理站点范围几个小时,这是非常糟糕的。

有什么方法可以配置 Celery,让它一次只处理一种类型的任务,不超过一个?类似于:

@task(max_running_instances=1)
def my_really_long_task():
    for i in range(1000000000):
        time.sleep(6000)

请注意,我不想取消 my_really_long_task 的所有其他发布。我只是不希望他们立即开始,只有在所有其他同名任务完成后才开始。

由于 Celery 似乎不支持,我目前的 hacky 解决方案是查询任务中的其他任务,如果我们找到其他 运行ning 实例,则重新安排自己到 运行 之后,例如

from celery.task.control import inspect

def get_all_active_celery_task_names(ignore_id=None):
    """
    Returns Celery task names for all running tasks.
    """
    i = inspect()
    task_names = defaultdict(int) # {name: count}
    if i:
        active = i.active()
        if active is not None:
            for worker_name, tasks in i.active().iteritems():
                for task in tasks:
                    if ignore_id and task['id'] == ignore_id:
                        continue
                    task_names[task['name']] += 1
    return task_names

@task
def my_really_long_task():

    all_names = get_all_active_celery_task_names()
    if 'my_really_long_task' in all_names:
        my_really_long_task.retry(max_retries=100, countdown=random.randint(10, 300))
        return

    for i in range(1000000000):
        time.sleep(6000)

有更好的方法吗?

我知道其他像 this 这样的 hacky 解决方案,但是设置一个单独的内存缓存服务器来跟踪任务唯一性甚至更不可靠,并且比我上面使用的方法更复杂。

另一种解决方案是将 my_really_long_task 放入单独的队列中。

 my_really_long_task.apply_async(*args, queue='foo')

然后启动并发为1的worker来消费这些任务,这样一次只执行1个任务。

celery -A foo worker -l info -Q foo