您可以为特定任务保留一定数量的 celery worker,或者在延迟时将任务设置为更高的优先级吗?

Can you reserve a set amount of celery workers for specific tasks or set a task to higher priority when you delay it?

我的 django 应用程序当前接收一个文件,并逐行读取文件,对于每一行,都有一个 celery 任务委托处理该行。

这是它的样子

File -> For each line in file -> celery_task.delay(line)

那么,我还有其他可以由用户触发的 celery 任务,例如:

User input line -> celery_task.delay(line)

这当然不是严格意义上的同一个任务,用户实际上可以根据他们的操作调用任何 celery 任务(信号也会调用一些任务)

现在我面临的问题是,当用户上传一个比较大的文件时,我的redis队列在处理文件时陷入困境,当用户做任何事情时,他们的任务只会在之后被委派和执行文件的 celery_task.delay() 任务已完成执行。我的问题是,是否可以保留一定数量的工人或延迟具有 "higher" 优先级的芹菜任务并覆盖队列?

代码大致如下:

@app.task(name='process_line')
def process_line(line):
    some_stuff_with_line(line)
    do_heavy_logic_stuff_with_line(line)
    more_stuff_here(line)
    obj = Data.objects.make_data_from_line(line)
    serialize_object.delay(obj.id)
    return obj.id

@app.task(name='serialize_object')
def serialize_object(important_id):
    obj = Data.objects.get(id=important_id)
    obj.pre_serialized_json = DataSerializer(obj).data
    obj.save()

@app.task(name='process_file')
def process_file(file_id):
    ingested_file = IngestedFile.objects.get(id=file_id)
    for line in ingested_file.get_all_lines():
        process_line.delay(line)

是的,您可以创建多个队列,然后您可以决定将您的任务路由到多个工作人员或单个工作人员上的这些队列 运行。默认情况下,所有任务都进入名为 celery 的默认队列。查看 Routing Tasks 上的 Celery 文档以获取更多信息和一些示例。