在异步 django 通道 websocket 中使用同步 celery 回调

Using a synchronous celery callback in an async django channels websocket

在我们的 django-channels 异步消费者中,我正在尝试检索组结果。这个结果可能完整也可能不完整,因此我想在管道结果准备好时添加一个回调。问题是,websocket 是异步的,回调必须是同步的,并且 self.send 方法必须是异步的。最初我假设能够将异步发送包装在 async_to_sync 中,尽管它给出了错误 You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.,因此似乎不允许进行 async->sync->async。

class ImageConsumer(AsyncWebsocketConsumer):

    async def celery_retrieve_group_result(self, batch_id):
        group = group = celery.GroupResult().restore(batch_id)
        res = group.get(callback=self.sync_callback)

    def sync_callback(self, task_id, value):
        async_to_sync(self.send_group_result)(task_id, value)

    async def send_group_result(self, task_id , value):
        await self.send(json.dumps({"type": "result.redraw_border", "result": value}))

我尝试了不同的组合,但我的限制因素是回调必须是同步的,其他一切都是异步的。 有没有人有以这种方式将芹菜与异步 django 混合的经验? 如有任何帮助,我们将不胜感激!

决定采用一种不同的方法迭代封闭的 AsyncResults。

class ImageConsumer(AsyncWebsocketConsumer):
    name = "Image Consumer"
    timeout = 20
    sleep_duration = 5e-2

    async def celery_retrieve_group_result(self, batch_id):

        group = celery.GroupResult().restore(batch_id)
        if group is not None:
            ordered_list_of_async_results = group.results
            for result in ordered_list_of_async_results:
                start = time.time()
                try:
                    while not result.ready():
                        await asyncio.sleep(self.sleep_duration)
                        if self.timeout and time.time() - start > self.timeout:
                            logger.exception(str(TimeoutError))
                            raise TimeoutError
                    value = result.get()
                    await self.send(json.dumps({'type': 'result.processing', 'result': value}))
                except:
                    await self.send(json.dumps({'type': 'result.processing', 'error': str(TimeoutError)}))
            group.forget()
        else:
            await self.send(json.dumps({'type': 'response.nonexistant_group', 'error': 'No result batch exists. re-upload'}))

这真的很好用。超时是因为 Celery 4.3 当前存在 Redis 后端问题,如果删除该问题将导致死机。所以在这个问题被修复之前,这是完美的。