在异步 django 通道 websocket 中使用同步 celery 回调
Using a synchronous celery callback in an async django channels websocket
在我们的 django-channels 异步消费者中,我正在尝试检索组结果。这个结果可能完整也可能不完整,因此我想在管道结果准备好时添加一个回调。问题是,websocket 是异步的,回调必须是同步的,并且 self.send
方法必须是异步的。最初我假设能够将异步发送包装在 async_to_sync
中,尽管它给出了错误 You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
,因此似乎不允许进行 async->sync->async。
class ImageConsumer(AsyncWebsocketConsumer):
async def celery_retrieve_group_result(self, batch_id):
group = group = celery.GroupResult().restore(batch_id)
res = group.get(callback=self.sync_callback)
def sync_callback(self, task_id, value):
async_to_sync(self.send_group_result)(task_id, value)
async def send_group_result(self, task_id , value):
await self.send(json.dumps({"type": "result.redraw_border", "result": value}))
我尝试了不同的组合,但我的限制因素是回调必须是同步的,其他一切都是异步的。
有没有人有以这种方式将芹菜与异步 django 混合的经验?
如有任何帮助,我们将不胜感激!
决定采用一种不同的方法迭代封闭的 AsyncResults。
class ImageConsumer(AsyncWebsocketConsumer):
name = "Image Consumer"
timeout = 20
sleep_duration = 5e-2
async def celery_retrieve_group_result(self, batch_id):
group = celery.GroupResult().restore(batch_id)
if group is not None:
ordered_list_of_async_results = group.results
for result in ordered_list_of_async_results:
start = time.time()
try:
while not result.ready():
await asyncio.sleep(self.sleep_duration)
if self.timeout and time.time() - start > self.timeout:
logger.exception(str(TimeoutError))
raise TimeoutError
value = result.get()
await self.send(json.dumps({'type': 'result.processing', 'result': value}))
except:
await self.send(json.dumps({'type': 'result.processing', 'error': str(TimeoutError)}))
group.forget()
else:
await self.send(json.dumps({'type': 'response.nonexistant_group', 'error': 'No result batch exists. re-upload'}))
这真的很好用。超时是因为 Celery 4.3 当前存在 Redis 后端问题,如果删除该问题将导致死机。所以在这个问题被修复之前,这是完美的。
在我们的 django-channels 异步消费者中,我正在尝试检索组结果。这个结果可能完整也可能不完整,因此我想在管道结果准备好时添加一个回调。问题是,websocket 是异步的,回调必须是同步的,并且 self.send
方法必须是异步的。最初我假设能够将异步发送包装在 async_to_sync
中,尽管它给出了错误 You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
,因此似乎不允许进行 async->sync->async。
class ImageConsumer(AsyncWebsocketConsumer):
async def celery_retrieve_group_result(self, batch_id):
group = group = celery.GroupResult().restore(batch_id)
res = group.get(callback=self.sync_callback)
def sync_callback(self, task_id, value):
async_to_sync(self.send_group_result)(task_id, value)
async def send_group_result(self, task_id , value):
await self.send(json.dumps({"type": "result.redraw_border", "result": value}))
我尝试了不同的组合,但我的限制因素是回调必须是同步的,其他一切都是异步的。 有没有人有以这种方式将芹菜与异步 django 混合的经验? 如有任何帮助,我们将不胜感激!
决定采用一种不同的方法迭代封闭的 AsyncResults。
class ImageConsumer(AsyncWebsocketConsumer):
name = "Image Consumer"
timeout = 20
sleep_duration = 5e-2
async def celery_retrieve_group_result(self, batch_id):
group = celery.GroupResult().restore(batch_id)
if group is not None:
ordered_list_of_async_results = group.results
for result in ordered_list_of_async_results:
start = time.time()
try:
while not result.ready():
await asyncio.sleep(self.sleep_duration)
if self.timeout and time.time() - start > self.timeout:
logger.exception(str(TimeoutError))
raise TimeoutError
value = result.get()
await self.send(json.dumps({'type': 'result.processing', 'result': value}))
except:
await self.send(json.dumps({'type': 'result.processing', 'error': str(TimeoutError)}))
group.forget()
else:
await self.send(json.dumps({'type': 'response.nonexistant_group', 'error': 'No result batch exists. re-upload'}))
这真的很好用。超时是因为 Celery 4.3 当前存在 Redis 后端问题,如果删除该问题将导致死机。所以在这个问题被修复之前,这是完美的。