用于任务划分的多线程celery worker
Multithread celery worker for task division
我目前正在构建一个基于某些输入运行某些扫描的应用程序。
我遇到的问题是某些扫描正在产生瓶颈,我想知道是否有办法为这些任务实施不同的 thread/worker。
我再详细说明一下。
我用命令启动我的工作器
pipenv run celery -A proj worker -B -l info
### Tasks.py ###
@shared_task
def short_task_1():
return
@shared_task
def short_task_2():
return
@shared_task
def long_task_1():
return
### handler.py ###
def handle_scan():
short_task_1.delay()
short_task_2.delay()
long_task_1.delay()
我找到的一个可能的解决方案是将较短的任务分配给一名工人,将较长的任务分配给另一名工人。但是我无法在文档中找到如何使用 delay()
命令定义将任务分配给哪个工人。
让另一个工人来处理这个任务会有帮助吗?如果另一个线程是解决方案,那么最好的方法是什么?
我最后做了以下事情
如果您尝试使用多个任务队列,delay()
将不起作用。主要是因为 delay()
只有在使用 "default" 队列的情况下才会使用。要使用多个队列,必须使用 apply_async()
。
例如,如果使用 .delay(arg1, arg2)
调用任务
现在(考虑到多个队列)需要用 .apply_async(args=[arg1,arg2], queue='queue_name')
调用它
所以,这就是我最终做到的,感谢@DejanLekic
tasks.py
@shared_task
def short_task_1():
return
@shared_task
def short_task_2():
return
@shared_task
def long_task_1():
return
和以前一样。但这是新的处理程序
def handle_scan():
# Fast queue with args if required
short_task_1.apply_async(args=[arg1, arg2], queue='fast_queue')
short_task_2.apply_async(args=[arg1, arg2], queue='fast_queue')
# slow queue
long_task_1.apply_async(args=[arg1, arg2], queue='slow_queue')
我通过执行以下操作启动工作人员(注意 pipenv):
pipenv run celery -A proj worker -B --loglevel=info -Q slow_queue,fast_queue
我目前正在构建一个基于某些输入运行某些扫描的应用程序。
我遇到的问题是某些扫描正在产生瓶颈,我想知道是否有办法为这些任务实施不同的 thread/worker。
我再详细说明一下。
我用命令启动我的工作器
pipenv run celery -A proj worker -B -l info
### Tasks.py ###
@shared_task
def short_task_1():
return
@shared_task
def short_task_2():
return
@shared_task
def long_task_1():
return
### handler.py ###
def handle_scan():
short_task_1.delay()
short_task_2.delay()
long_task_1.delay()
我找到的一个可能的解决方案是将较短的任务分配给一名工人,将较长的任务分配给另一名工人。但是我无法在文档中找到如何使用 delay()
命令定义将任务分配给哪个工人。
让另一个工人来处理这个任务会有帮助吗?如果另一个线程是解决方案,那么最好的方法是什么?
我最后做了以下事情
如果您尝试使用多个任务队列,delay()
将不起作用。主要是因为 delay()
只有在使用 "default" 队列的情况下才会使用。要使用多个队列,必须使用 apply_async()
。
例如,如果使用 .delay(arg1, arg2)
调用任务
现在(考虑到多个队列)需要用 .apply_async(args=[arg1,arg2], queue='queue_name')
所以,这就是我最终做到的,感谢@DejanLekic
tasks.py
@shared_task
def short_task_1():
return
@shared_task
def short_task_2():
return
@shared_task
def long_task_1():
return
和以前一样。但这是新的处理程序
def handle_scan():
# Fast queue with args if required
short_task_1.apply_async(args=[arg1, arg2], queue='fast_queue')
short_task_2.apply_async(args=[arg1, arg2], queue='fast_queue')
# slow queue
long_task_1.apply_async(args=[arg1, arg2], queue='slow_queue')
我通过执行以下操作启动工作人员(注意 pipenv):
pipenv run celery -A proj worker -B --loglevel=info -Q slow_queue,fast_queue