django + celery:禁用一名工人的预取,有错误吗?
django + celery: disable prefetch for one worker, Is there a bug?
我有一个带芹菜的 Django 项目
由于 RAM 限制,我只能 运行 两个工作进程。
我混合了 'slow' 和 'fast' 任务。
快速任务应尽快执行。在短时间范围内 (0.1s - 3s) 可以有许多快速任务,因此理想情况下两个 CPU 都应该处理它们。
慢速任务可能 运行 几分钟,但结果可能会延迟。
慢任务出现的频率较低,但可能会出现 2 个或 3 个同时排队的情况。
我的想法是拥有一个:
- 1 个 celery worker W1,并发度为 1,仅处理快速任务
- 1 个 celery worker W2,并发 1,可以处理快速和慢速任务。
celery 的任务预取乘数 (https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-prefetch-multiplier) 默认为 4,这意味着 4 个快速任务可以排在一个慢速任务之后,并可能延迟几分钟。因此,我想禁用 worker W2 的预取。文档指出:
To disable prefetching, set worker_prefetch_multiplier to 1. Changing
that setting to 0 will allow the worker to keep consuming as many
messages as it wants.
但是我观察到的是,prefetch_multiplier 为 1 时,一个任务被预取并且仍然会被慢速任务延迟。
这是文档错误吗?这是一个实现错误吗?还是我误解了文档?
有什么办法可以实现我想要的吗?
我执行的启动 worker 的命令是:
celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 0
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast
我的芹菜设置是默认设置,除了:
CELERY_BROKER_URL = "pyamqp://*****@localhost:5672/mini"
CELERY_TASK_ROUTES = {
'app1.tasks.task_fast': {"queue": "fast"},
'app1.tasks.task_slow': {"queue": "slow"},
}
我的 django 项目的 celery.py 文件是:
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'miniclry.settings')
app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
我的django项目的__init__.py
是
from .celery import app as celery_app
__all__ = ('celery_app',)
我工人的代码
import time, logging
from celery import shared_task
from miniclry.celery import app as celery_app
logger = logging.getLogger(__name__)
@shared_task
def task_fast(delay=0.1):
logger.warning("fast in")
time.sleep(delay)
logger.warning("fast out")
@shared_task
def task_slow(delay=30):
logger.warning("slow in")
time.sleep(delay)
logger.warning("slow out")
如果我执行下面的管理shell我明白了,只有在慢任务完成后才执行一个快速任务。
from app1.tasks import task_fast, task_slow
task_slow.delay()
for i in range(30):
task_fast.delay()
有人可以帮忙吗?
如果觉得有帮助,我可以 post 整个测试项目。只是建议交换此类项目的推荐SO方式
版本信息:
- 芹菜==4.3.0
- Django==1.11.25
- Python 2.7.12
我有一个带芹菜的 Django 项目
由于 RAM 限制,我只能 运行 两个工作进程。
我混合了 'slow' 和 'fast' 任务。 快速任务应尽快执行。在短时间范围内 (0.1s - 3s) 可以有许多快速任务,因此理想情况下两个 CPU 都应该处理它们。
慢速任务可能 运行 几分钟,但结果可能会延迟。
慢任务出现的频率较低,但可能会出现 2 个或 3 个同时排队的情况。
我的想法是拥有一个:
- 1 个 celery worker W1,并发度为 1,仅处理快速任务
- 1 个 celery worker W2,并发 1,可以处理快速和慢速任务。
celery 的任务预取乘数 (https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-prefetch-multiplier) 默认为 4,这意味着 4 个快速任务可以排在一个慢速任务之后,并可能延迟几分钟。因此,我想禁用 worker W2 的预取。文档指出:
To disable prefetching, set worker_prefetch_multiplier to 1. Changing that setting to 0 will allow the worker to keep consuming as many messages as it wants.
但是我观察到的是,prefetch_multiplier 为 1 时,一个任务被预取并且仍然会被慢速任务延迟。
这是文档错误吗?这是一个实现错误吗?还是我误解了文档? 有什么办法可以实现我想要的吗?
我执行的启动 worker 的命令是:
celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 0
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast
我的芹菜设置是默认设置,除了:
CELERY_BROKER_URL = "pyamqp://*****@localhost:5672/mini"
CELERY_TASK_ROUTES = {
'app1.tasks.task_fast': {"queue": "fast"},
'app1.tasks.task_slow': {"queue": "slow"},
}
我的 django 项目的 celery.py 文件是:
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'miniclry.settings')
app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
我的django项目的__init__.py
是
from .celery import app as celery_app
__all__ = ('celery_app',)
我工人的代码
import time, logging
from celery import shared_task
from miniclry.celery import app as celery_app
logger = logging.getLogger(__name__)
@shared_task
def task_fast(delay=0.1):
logger.warning("fast in")
time.sleep(delay)
logger.warning("fast out")
@shared_task
def task_slow(delay=30):
logger.warning("slow in")
time.sleep(delay)
logger.warning("slow out")
如果我执行下面的管理shell我明白了,只有在慢任务完成后才执行一个快速任务。
from app1.tasks import task_fast, task_slow
task_slow.delay()
for i in range(30):
task_fast.delay()
有人可以帮忙吗?
如果觉得有帮助,我可以 post 整个测试项目。只是建议交换此类项目的推荐SO方式
版本信息:
- 芹菜==4.3.0
- Django==1.11.25
- Python 2.7.12