限制在 Celery worker 上排队的任务数
Limit tasks number queued on Celery worker
我有最简单的Celery任务例子(case.py
):
import time
import celery
app = celery.Celery('case', broker='redis://localhost')
@app.task
def do_sth():
time.sleep(5)
然后我创建了几个任务实例:
>>> import case
>>> tasks_list = [case.do_sth.delay() for i in range(4)]
然后我想使用 --concurrency=1
参数获得动态数量的工人。这对于我的案例至关重要,即每个工作人员只有一个并行任务,并在某些任务在服务器上排队时添加工作人员。如果一些任务没有他们的工作人员并且添加了一个新的工作人员,新的工作人员应该负责排队的(挂起但不是正在进行的)任务。但是当我调用 $ celery -A case worker --loglevel=info --concurrency=1
时,我有以下日志(不包括 Celery 徽标):
[tasks]
. case.do_sth
[2017-02-21 10:03:18,454: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:18,459: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:19,471: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:19,480: INFO/MainProcess] celery@pt0 ready.
[2017-02-21 10:03:19,658: INFO/MainProcess] Received task: case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c]
[2017-02-21 10:03:19,660: INFO/MainProcess] Received task: case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b]
[2017-02-21 10:03:19,662: INFO/MainProcess] Received task: case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803]
[2017-02-21 10:03:19,664: INFO/MainProcess] Received task: case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d]
[2017-02-21 10:03:24,667: INFO/PoolWorker-1] Task case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c] succeeded in 5.006301835s: None
[2017-02-21 10:03:29,675: INFO/PoolWorker-1] Task case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b] succeeded in 5.005384011s: None
[2017-02-21 10:03:34,683: INFO/PoolWorker-1] Task case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803] succeeded in 5.005373027s: None
[2017-02-21 10:03:39,690: INFO/PoolWorker-1] Task case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d] succeeded in 5.00531687s: None
与此同时,我启动了另一个 worker(使用相同的命令),但它什么也没做:
[tasks]
. case.do_sth
[2017-02-21 10:03:20,321: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:20,326: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:21,339: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:21,352: INFO/MainProcess] celery@pt0 ready.
如果您查看日期,您会发现第二个工作人员在第一个工作人员之后不到五秒被触发。
有没有办法(某些 worker 或 Celery 选项)将在单个 worker 上排队的任务限制为一个任务?
我认为您要找的东西叫做 prefetch limit。它有效地减少了工作人员在给定时间可以保留的任务数量。
The prefetch limit is a limit for the number of tasks (messages) a worker can reserve for itself
我有最简单的Celery任务例子(case.py
):
import time
import celery
app = celery.Celery('case', broker='redis://localhost')
@app.task
def do_sth():
time.sleep(5)
然后我创建了几个任务实例:
>>> import case
>>> tasks_list = [case.do_sth.delay() for i in range(4)]
然后我想使用 --concurrency=1
参数获得动态数量的工人。这对于我的案例至关重要,即每个工作人员只有一个并行任务,并在某些任务在服务器上排队时添加工作人员。如果一些任务没有他们的工作人员并且添加了一个新的工作人员,新的工作人员应该负责排队的(挂起但不是正在进行的)任务。但是当我调用 $ celery -A case worker --loglevel=info --concurrency=1
时,我有以下日志(不包括 Celery 徽标):
[tasks]
. case.do_sth
[2017-02-21 10:03:18,454: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:18,459: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:19,471: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:19,480: INFO/MainProcess] celery@pt0 ready.
[2017-02-21 10:03:19,658: INFO/MainProcess] Received task: case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c]
[2017-02-21 10:03:19,660: INFO/MainProcess] Received task: case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b]
[2017-02-21 10:03:19,662: INFO/MainProcess] Received task: case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803]
[2017-02-21 10:03:19,664: INFO/MainProcess] Received task: case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d]
[2017-02-21 10:03:24,667: INFO/PoolWorker-1] Task case.do_sth[0c7b0f8c-d1f8-4cd8-a100-21ef6654e04c] succeeded in 5.006301835s: None
[2017-02-21 10:03:29,675: INFO/PoolWorker-1] Task case.do_sth[f97ad614-017b-4a6c-90df-89dbed63e39b] succeeded in 5.005384011s: None
[2017-02-21 10:03:34,683: INFO/PoolWorker-1] Task case.do_sth[b0166022-196f-451b-bcb6-78cdf0558803] succeeded in 5.005373027s: None
[2017-02-21 10:03:39,690: INFO/PoolWorker-1] Task case.do_sth[b097e191-5bc4-44d9-bdcd-8aa74501e95d] succeeded in 5.00531687s: None
与此同时,我启动了另一个 worker(使用相同的命令),但它什么也没做:
[tasks]
. case.do_sth
[2017-02-21 10:03:20,321: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-02-21 10:03:20,326: INFO/MainProcess] mingle: searching for neighbors
[2017-02-21 10:03:21,339: INFO/MainProcess] mingle: all alone
[2017-02-21 10:03:21,352: INFO/MainProcess] celery@pt0 ready.
如果您查看日期,您会发现第二个工作人员在第一个工作人员之后不到五秒被触发。
有没有办法(某些 worker 或 Celery 选项)将在单个 worker 上排队的任务限制为一个任务?
我认为您要找的东西叫做 prefetch limit。它有效地减少了工作人员在给定时间可以保留的任务数量。
The prefetch limit is a limit for the number of tasks (messages) a worker can reserve for itself