Celery 可以将状态更新传递给非阻塞调用者吗?

Can Celery pass a Status Update to a non-Blocking Caller?

我正在使用 Celery 异步执行一组操作。有很多这样的操作,每个操作都可能需要很长时间,所以我不想将结果发送回 Celery worker 函数的 return 值,而是一次按自定义发送一个状态更新。这样,调用者就可以实现带有更改状态回调的进度条,并且 worker 函数的 return 值可以是恒定大小,而不是与操作数成线性关系。

这是一个简单的示例,其中我使用 Celery worker 函数 add_pairs_of_numbers 添加数字对列表,为每个添加的数字对发回自定义状态更新。

#!/usr/bin/env python

"""
Run worker with:

    celery -A tasks worker --loglevel=info
"""
from celery import Celery

app = Celery("tasks", broker="pyamqp://guest@localhost//", backend="rpc://")

@app.task(bind=True)
def add_pairs_of_numbers(self, pairs):
    for x, y in pairs:
        self.update_state(state="SUM", meta={"x":x, "y":y, "x+y":x+y})
    return len(pairs)

def handle_message(message):
    if message["status"] == "SUM":
        x = message["result"]["x"]
        y = message["result"]["y"]
        print(f"Message: {x} + {y} = {x+y}")

def non_looping(*pairs):
    task = add_pairs_of_numbers.delay(pairs)
    result = task.get(on_message=handle_message)
    print(result)

def looping(*pairs):
    task = add_pairs_of_numbers.delay(pairs)
    print(task)
    while True:
        pass

if __name__ == "__main__":
    import sys

    if sys.argv[1:] and sys.argv[1] == "looping":
        looping((3,4), (2,7), (5,5))
    else:
        non_looping((3,4), (2,7), (5,5))

如果你 运行 只是 ./tasks 它会执行 non_looping 函数。这是标准的 Celery 操作:延迟调用 worker 函数,然后使用 get 等待结果。 handle_message 回调函数打印每条消息,添加的对数作为结果 returned。这就是我想要的。

$ ./task.py
Message: 3 + 4 = 7
Message: 2 + 7 = 9
Message: 5 + 5 = 10
3

虽然非循环场景对于这个简单的示例来说已经足够了,但我要完成的真实世界任务是处理一批文件,而不是添加数字对。此外,客户端是 Flask REST API,因此不能包含任何阻塞 get 调用。在上面的脚本中,我使用 looping 函数模拟了这个约束。此函数启动异步 Celery 任务,但不等待响应。 (随后的无限 while 循环模拟 Web 服务器继续 运行 并处理其他请求。)

如果您 运行 带有参数 "looping" 的脚本,它 运行 就是这个代码路径。在这里它立即打印 Celery 任务 ID,然后进入无限循环。

$ ./tasks.py looping
a39c54d3-2946-4f4e-a465-4cc3adc6cbe5

Celery worker 日志显示执行了添加操作,但调用者未定义回调函数,因此它永远不会得到结果。

(我意识到这个特定的例子是令人尴尬的并行,所以我可以使用 chunks 将它分成多个任务。但是,在我未简化的真实世界案例中,我有一些任务不能并行化。)

我想要的是能够在looping场景下指定一个回调。像这样。

def looping(*pairs):
    task = add_pairs_of_numbers.delay(pairs, callback=handle_message) # There is no such callback.
    print(task)
    while True:
        pass

在 Celery 文档和我可以在网上找到的所有示例(例如 this)中,没有办法将回调函数定义为 delay 调用或其 apply_async等价。您只能指定一个作为 get 回调的一部分。这让我觉得这是一个有意的设计决定。

在我的 REST API 场景中,我可以通过让 Celery 工作进程以 HTTP post 的形式将 "status update" 发送回 Flask 服务器来解决这个问题,但是这看起来很奇怪,因为我开始在 Celery 中已经存在的 HTTP 中复制消息传递逻辑。

有没有什么方法可以编写我的 looping 场景,以便调用者在不进行阻塞调用的情况下接收回调,或者在 Celery 中明确禁止这样做?

这是一种 celery 不支持的模式,尽管您可以(在某种程度上)通过 post 对您的任务 as described here 进行自定义状态更新来欺骗它。

Use update_state() to update a task’s state:.

def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})```

celery 不支持这种模式的原因是任务生产者(调用者)与任务消费者(工作者)强烈分离,两者之间的唯一通信是代理,以支持从生产者到消费者的通信和结果后端支持从消费者到生产者的通信。您目前可以获得的最接近的方法是轮询任务状态或编写自定义结果后端,这将允许您通过 AMP RPC 或 redis 订阅 post 事件。