执行多个 Celery worker 以消耗唯一队列
Executing multiple Celery workers to consume unique queues
我有一个用例,我需要启动一个 celery worker 以便它们使用唯一的队列,我已经尝试像下面那样实现它。
from celery import Celery
app = Celery(broker='redis://localhost:9555/0')
@app.task
def afunction(arg1=None, arg2=None, arg3=None):
if arg1 == 'awesome_1':
return "First type of Queue executed"
if arg2 == "awesome_2":
return "Second Type of Queue executed"
if arg3 == "awesome_3":
return "Third Type of Queue executed"
if __name__=='__main__':
qlist = ["awesome_1", "awesome_2", "awesome_3"]
arglist = [None, None, None]
for q in qlist:
arglist[qlist.index(q)] = q
argv = [
'worker',
'--detach',
'--queue={0}'.format(q),
'--concurrency=1',
'-E',
'--loglevel=INFO'
]
app.worker_main(argv)
afunction.apply_async(args=[arglist[0], arglist[1], arglist[2]], queue=q)
此代码在执行时给出以下输出:
[2018-02-08 11:28:43,479: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 11:28:43,486: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 11:28:44,503: INFO/MainProcess] mingle: all alone
[2018-02-08 11:28:44,527: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 11:28:44,612: INFO/MainProcess] Received task: __main__.afunction[f092f721-6523-4055-98fc-158ac316f4cc]
[2018-02-08 11:28:44,618: INFO/ForkPoolWorker-1] Task __main__.afunction[f092f721-6523-4055-98fc-158ac316f4cc] succeeded in 0.0010992150055244565s: 'First type of Queue executed'
因此,我可以看到工作程序在 for
循环的第一次迭代中按预期执行,但随后它就停在那里,不再继续 for 循环。
我相信这是因为 worker 没有 运行ning 分离,或者作为脚本的子进程,因为我可以看到 1 + 与 python 运行ning 一样多的进程ps aux
上正在设置与 --concurrency
相同的脚本。关于出现问题或如何使工作队列 运行 分离的任何指示,因此在 return
之后 for
循环继续迭代。
我尝试执行以下解决方法,虽然我不确定代码对基础结构的影响,但结果与预期一致。如果有人可以评论是否有更好的解决问题的方法,那就太好了,但现在我正在使用这个解决方案。
from celery import Celery
import os
import time
app = Celery('app', broker='redis://localhost:9555/0')
@app.task
def afunction(arg1=None, arg2=None, arg3=None):
if arg1 == 'awesome_1':
return "First type of Queue executed"
if arg2 == "awesome_2":
return "Second Type of Queue executed"
if arg3 == "awesome_3":
return "Third Type of Queue executed"
qlist = ["awesome_1", "awesome_2", "awesome_3"]
arglist = [None, None, None]
for q in qlist:
os.system('nohup celery worker -A app.celery -Q {0} --loglevel=INFO --concurrency=1 &'.format(q))
os.system('echo \'\n\'')
time.sleep(5)
for q in qlist:
arglist = [None, None, None]
arglist[qlist.index(q)] = q
afunction.apply_async(args=[arglist[0], arglist[1], arglist[2]], queue=q)
它创建了一个具有以下输出的 nohup.out 文件:
[2018-02-08 17:15:53,269: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 17:15:53,272: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 17:15:53,274: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 17:15:53,277: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 17:15:53,280: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 17:15:53,280: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 17:15:54,293: INFO/MainProcess] mingle: all alone
[2018-02-08 17:15:54,295: INFO/MainProcess] mingle: all alone
[2018-02-08 17:15:54,296: INFO/MainProcess] mingle: all alone
[2018-02-08 17:15:54,304: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 17:15:54,304: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 17:15:54,306: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 17:15:57,975: INFO/MainProcess] Received task: app.afunction[e825444d-e123-4f55-9365-f36f95d62734]
[2018-02-08 17:15:57,976: INFO/ForkPoolWorker-1] Task app.afunction[e825444d-e123-4f55-9365-f36f95d62734] succeeded in 0.0003634110325947404s: 'First type of Queue executed'
[2018-02-08 17:15:57,976: INFO/MainProcess] Received task: app.afunction[80816d50-5680-4373-8b5e-dac2ae2a3ff9]
[2018-02-08 17:15:57,977: INFO/MainProcess] Received task: app.afunction[0e88c758-3010-4d37-bda2-6a9a9a02bedf]
[2018-02-08 17:15:57,977: INFO/ForkPoolWorker-1] Task app.afunction[80816d50-5680-4373-8b5e-dac2ae2a3ff9] succeeded in 0.0003187900292687118s: 'Second Type of Queue executed'
[2018-02-08 17:15:57,978: INFO/ForkPoolWorker-1] Task app.afunction[0e88c758-3010-4d37-bda2-6a9a9a02bedf] succeeded in 0.00042019598186016083s: 'Third type of queue executed'
我有一个用例,我需要启动一个 celery worker 以便它们使用唯一的队列,我已经尝试像下面那样实现它。
from celery import Celery
app = Celery(broker='redis://localhost:9555/0')
@app.task
def afunction(arg1=None, arg2=None, arg3=None):
if arg1 == 'awesome_1':
return "First type of Queue executed"
if arg2 == "awesome_2":
return "Second Type of Queue executed"
if arg3 == "awesome_3":
return "Third Type of Queue executed"
if __name__=='__main__':
qlist = ["awesome_1", "awesome_2", "awesome_3"]
arglist = [None, None, None]
for q in qlist:
arglist[qlist.index(q)] = q
argv = [
'worker',
'--detach',
'--queue={0}'.format(q),
'--concurrency=1',
'-E',
'--loglevel=INFO'
]
app.worker_main(argv)
afunction.apply_async(args=[arglist[0], arglist[1], arglist[2]], queue=q)
此代码在执行时给出以下输出:
[2018-02-08 11:28:43,479: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 11:28:43,486: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 11:28:44,503: INFO/MainProcess] mingle: all alone
[2018-02-08 11:28:44,527: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 11:28:44,612: INFO/MainProcess] Received task: __main__.afunction[f092f721-6523-4055-98fc-158ac316f4cc]
[2018-02-08 11:28:44,618: INFO/ForkPoolWorker-1] Task __main__.afunction[f092f721-6523-4055-98fc-158ac316f4cc] succeeded in 0.0010992150055244565s: 'First type of Queue executed'
因此,我可以看到工作程序在 for
循环的第一次迭代中按预期执行,但随后它就停在那里,不再继续 for 循环。
我相信这是因为 worker 没有 运行ning 分离,或者作为脚本的子进程,因为我可以看到 1 + 与 python 运行ning 一样多的进程ps aux
上正在设置与 --concurrency
相同的脚本。关于出现问题或如何使工作队列 运行 分离的任何指示,因此在 return
之后 for
循环继续迭代。
我尝试执行以下解决方法,虽然我不确定代码对基础结构的影响,但结果与预期一致。如果有人可以评论是否有更好的解决问题的方法,那就太好了,但现在我正在使用这个解决方案。
from celery import Celery
import os
import time
app = Celery('app', broker='redis://localhost:9555/0')
@app.task
def afunction(arg1=None, arg2=None, arg3=None):
if arg1 == 'awesome_1':
return "First type of Queue executed"
if arg2 == "awesome_2":
return "Second Type of Queue executed"
if arg3 == "awesome_3":
return "Third Type of Queue executed"
qlist = ["awesome_1", "awesome_2", "awesome_3"]
arglist = [None, None, None]
for q in qlist:
os.system('nohup celery worker -A app.celery -Q {0} --loglevel=INFO --concurrency=1 &'.format(q))
os.system('echo \'\n\'')
time.sleep(5)
for q in qlist:
arglist = [None, None, None]
arglist[qlist.index(q)] = q
afunction.apply_async(args=[arglist[0], arglist[1], arglist[2]], queue=q)
它创建了一个具有以下输出的 nohup.out 文件:
[2018-02-08 17:15:53,269: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 17:15:53,272: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 17:15:53,274: INFO/MainProcess] Connected to redis://localhost:9555/0
[2018-02-08 17:15:53,277: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 17:15:53,280: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 17:15:53,280: INFO/MainProcess] mingle: searching for neighbors
[2018-02-08 17:15:54,293: INFO/MainProcess] mingle: all alone
[2018-02-08 17:15:54,295: INFO/MainProcess] mingle: all alone
[2018-02-08 17:15:54,296: INFO/MainProcess] mingle: all alone
[2018-02-08 17:15:54,304: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 17:15:54,304: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 17:15:54,306: INFO/MainProcess] celery@SYSTEM ready.
[2018-02-08 17:15:57,975: INFO/MainProcess] Received task: app.afunction[e825444d-e123-4f55-9365-f36f95d62734]
[2018-02-08 17:15:57,976: INFO/ForkPoolWorker-1] Task app.afunction[e825444d-e123-4f55-9365-f36f95d62734] succeeded in 0.0003634110325947404s: 'First type of Queue executed'
[2018-02-08 17:15:57,976: INFO/MainProcess] Received task: app.afunction[80816d50-5680-4373-8b5e-dac2ae2a3ff9]
[2018-02-08 17:15:57,977: INFO/MainProcess] Received task: app.afunction[0e88c758-3010-4d37-bda2-6a9a9a02bedf]
[2018-02-08 17:15:57,977: INFO/ForkPoolWorker-1] Task app.afunction[80816d50-5680-4373-8b5e-dac2ae2a3ff9] succeeded in 0.0003187900292687118s: 'Second Type of Queue executed'
[2018-02-08 17:15:57,978: INFO/ForkPoolWorker-1] Task app.afunction[0e88c758-3010-4d37-bda2-6a9a9a02bedf] succeeded in 0.00042019598186016083s: 'Third type of queue executed'