不同远程机器上的 Celery 子任务
Celery sub tasks on different remote machines
我有两台服务器,其中 Celery 工作人员正在 运行ning。我们称它们为 R1 和 R2。
从我的其他服务器(比如 R3),我想创建链式任务,以便创建 R1.task 任务,然后创建 R2.task子任务
但我严重怀疑这是否可能。我试过了
# celery_apps.py on R3
from celery import Celery
from application.config import get_application_config
__author__ = 'hussain'
config = get_application_config()
celery_app_r1 = Celery(
'R1',
broker=config.CELERY_BROKER_URL_R1
)
celery_app_r2 = Celery(
'R2',
broker=config.CELERY_BROKER_URL_R2
)
celery_app_r1.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACKS_LATE='True',
CELERY_ACCEPT_CONTENT=['json']
)
celery_app_r2.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACKS_LATE='True',
CELERY_ACCEPT_CONTENT=['json']
)
这就是我尝试创建链式任务的方式
# client.py on R3
from celery import subtask
celery_app_r1.send_task(
'communication.tasks.send_push_notification',
(json.dumps(payload), ),
exchange=config.CELERY_COMMUNICATION_EXCHANGE,
routing_key=config.CELERY_PN_ROUTING_KEY,
link=subtask(
'application.tasks.save_pn_response',
(device.id, ),
exchange=config.CELERY_RECRUITMENT_EXCHANGE,
routing_key=config.CELERY_CALLBACKS_ROUTING_KEY
)
)
我什至无法提及 celery_app_r2。
如何在不同的远程机器上运行这样的子任务?
您不需要 2 个应用程序、2 个经纪人或 2 个交易所。代理是您的机器之间共享的 link,用于通信等。
您需要 2 个队列,每个服务器一个,并使用不同的 routing_keys 或直接执行队列名称相应地路由您的任务。
快速示例:
CELERY_QUEUES = (
Queue('notifications'),
Queue('callbacks')
)
然后在每台服务器中启动一名工作人员:
celery worker --app app -Q notifications --loglevel info
celery worker --app app -Q callbacks --loglevel info
并从通知中发送回调任务:
@app.task(queue='notifications')
def notification_task(*args, **kwargs):
# ... whatever your notification logic is
callback.s(arg1, arg2).delay()
@app.task(queue='callbacks')
def callback(*args, **kwargs)
# ...
注意我没有使用 send_task
,而是直接导入函数。除非您从具有不同代码库的服务器调用任务,否则您不需要 send_task
。 IE。如果您的项目增长并希望分离存储库等
我有两台服务器,其中 Celery 工作人员正在 运行ning。我们称它们为 R1 和 R2。
从我的其他服务器(比如 R3),我想创建链式任务,以便创建 R1.task 任务,然后创建 R2.task子任务
但我严重怀疑这是否可能。我试过了
# celery_apps.py on R3
from celery import Celery
from application.config import get_application_config
__author__ = 'hussain'
config = get_application_config()
celery_app_r1 = Celery(
'R1',
broker=config.CELERY_BROKER_URL_R1
)
celery_app_r2 = Celery(
'R2',
broker=config.CELERY_BROKER_URL_R2
)
celery_app_r1.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACKS_LATE='True',
CELERY_ACCEPT_CONTENT=['json']
)
celery_app_r2.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACKS_LATE='True',
CELERY_ACCEPT_CONTENT=['json']
)
这就是我尝试创建链式任务的方式
# client.py on R3
from celery import subtask
celery_app_r1.send_task(
'communication.tasks.send_push_notification',
(json.dumps(payload), ),
exchange=config.CELERY_COMMUNICATION_EXCHANGE,
routing_key=config.CELERY_PN_ROUTING_KEY,
link=subtask(
'application.tasks.save_pn_response',
(device.id, ),
exchange=config.CELERY_RECRUITMENT_EXCHANGE,
routing_key=config.CELERY_CALLBACKS_ROUTING_KEY
)
)
我什至无法提及 celery_app_r2。
如何在不同的远程机器上运行这样的子任务?
您不需要 2 个应用程序、2 个经纪人或 2 个交易所。代理是您的机器之间共享的 link,用于通信等。
您需要 2 个队列,每个服务器一个,并使用不同的 routing_keys 或直接执行队列名称相应地路由您的任务。
快速示例:
CELERY_QUEUES = (
Queue('notifications'),
Queue('callbacks')
)
然后在每台服务器中启动一名工作人员:
celery worker --app app -Q notifications --loglevel info
celery worker --app app -Q callbacks --loglevel info
并从通知中发送回调任务:
@app.task(queue='notifications')
def notification_task(*args, **kwargs):
# ... whatever your notification logic is
callback.s(arg1, arg2).delay()
@app.task(queue='callbacks')
def callback(*args, **kwargs)
# ...
注意我没有使用 send_task
,而是直接导入函数。除非您从具有不同代码库的服务器调用任务,否则您不需要 send_task
。 IE。如果您的项目增长并希望分离存储库等