用于任务划分的多线程celery worker

Multithread celery worker for task division

我目前正在构建一个基于某些输入运行某些扫描的应用程序。

我遇到的问题是某些扫描正在产生瓶颈,我想知道是否有办法为这些任务实施不同的 thread/worker。

我再详细说明一下。

我用命令启动我的工作器

pipenv run celery -A proj worker -B -l info

### Tasks.py ###

@shared_task
def short_task_1():
    return

@shared_task
def short_task_2():
    return

@shared_task
def long_task_1():
    return
### handler.py ###
def handle_scan():
    short_task_1.delay()
    short_task_2.delay()
    long_task_1.delay()

我找到的一个可能的解决方案是将较短的任务分配给一名工人,将较长的任务分配给另一名工人。但是我无法在文档中找到如何使用 delay() 命令定义将任务分配给哪个工人。

让另一个工人来处理这个任务会有帮助吗?如果另一个线程是解决方案,那么最好的方法是什么?

我最后做了以下事情

如果您尝试使用多个任务队列,

delay() 将不起作用。主要是因为 delay() 只有在使用 "default" 队列的情况下才会使用。要使用多个队列,必须使用 apply_async()

例如,如果使用 .delay(arg1, arg2) 调用任务 现在(考虑到多个队列)需要用 .apply_async(args=[arg1,arg2], queue='queue_name')

调用它

所以,这就是我最终做到的,感谢@DejanLekic tasks.py

@shared_task
def short_task_1():
    return

@shared_task
def short_task_2():
    return

@shared_task
def long_task_1():
    return

和以前一样。但这是新的处理程序

def handle_scan():
    # Fast queue with args if required
    short_task_1.apply_async(args=[arg1, arg2], queue='fast_queue')
    short_task_2.apply_async(args=[arg1, arg2], queue='fast_queue')
    # slow queue
    long_task_1.apply_async(args=[arg1, arg2], queue='slow_queue')

我通过执行以下操作启动工作人员(注意 pipenv):

pipenv run celery -A proj worker -B --loglevel=info -Q slow_queue,fast_queue