如何将 Django Rest Framework API 视图转换为异步视图?
How to turn a Django Rest Framework API View into an async one?
我正在尝试构建一个 REST API 来管理一些机器学习分类任务。我写了一个 API 视图,当它被点击时,将触发分类任务的开始(例如:使用用户先前提供的数据训练 SVM 分类器)。但是,这是一项很长的 运行 任务,所以我最好不要让用户在向该视图发出请求后等待。相反,我想在后台启动此任务并立即给他们答复。他们稍后可以在单独的视图中查看分类结果(尚未实现。)
我在 settings.py
中使用 ASGI_APPLICATION = 'mlxplorebackend.asgi.application'
。
这是我在 views.py
中的 API 视图
import asyncio
from concurrent.futures import ProcessPoolExecutor
from django import setup as SetupDjango
# ... other imports
loop = asyncio.get_event_loop()
def DummyClassification():
result = sum(i * i for i in range(10 ** 7))
print(result)
return result
# ... other API views
class TaskExecuteView(APIView):
"""
Once an API call is made to this view, the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type and parameters
2. Classification algorithm implementation
"""
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
# this is basically the classification task for now
# need to turn this to an async view
with ProcessPoolExecutor(initializer = SetupDjango) as pool:
loop.run_in_executor(pool, DummyClassification)
return Response({ "message": "The task with id: {} has been started".format(task.taskId) }, status = status.HTTP_200_OK)
我面临的问题如下:
当我不使用 with ProcessPoolExecutor(initializer = SetupDjango) as pool:
时,即没有初始化器,我得到 django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
(完整回溯:https://paste.ubuntu.com/p/ctjmFNYMXW/)
当我使用初始化器时,视图不再保持异步,它被阻塞了。任务完成后的响应returns,在我的机器上是5秒左右。我确实意识到我并没有真正在我的 DummyClassification()
函数中使用 asyncio.sleep()
,但我想不出这样做的方法。
我猜这不是解决问题的方法,因此我们将不胜感激。如果可以的话,我想避免使用芹菜,因为这对我来说似乎有点太复杂了。
编辑:
如果我摆脱 ProcessPoolExecutor()
并简单地执行 loop.run_in_executor(None, DummyClassification)
,它会按预期工作,但是只有一个工作线程正在处理该任务,这对于分类任务来说似乎并不理想。
这是一次骑行。起初,我经历了设置 celery
的痛苦,结果发现使用一个 CPU 核的分类任务的原始问题仍然存在。然后我用 redis
切换到 django-rq
,它目前按预期工作。
from .tasks import Pipeline
class TaskExecuteView(APIView):
"""
Once an API call is made to this view, the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type
2. Classification algorithm implementation
"""
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
Pipeline.delay(taskId) # this is async now ✔
# mark this as an in-progress task
TaskModel.objects.filter(taskId = taskId).update(inProgress = True)
return Response({ "message": "The task with id: {}, title: {} has been started".format(task.taskId, task.taskTitle) }, status = status.HTTP_200_OK)
tasks.py
from django_rq import job
@job('default', timeout=3600)
def Pipeline(taskId):
# classification task
我正在尝试构建一个 REST API 来管理一些机器学习分类任务。我写了一个 API 视图,当它被点击时,将触发分类任务的开始(例如:使用用户先前提供的数据训练 SVM 分类器)。但是,这是一项很长的 运行 任务,所以我最好不要让用户在向该视图发出请求后等待。相反,我想在后台启动此任务并立即给他们答复。他们稍后可以在单独的视图中查看分类结果(尚未实现。)
我在 settings.py
中使用 ASGI_APPLICATION = 'mlxplorebackend.asgi.application'
。
这是我在 views.py
import asyncio
from concurrent.futures import ProcessPoolExecutor
from django import setup as SetupDjango
# ... other imports
loop = asyncio.get_event_loop()
def DummyClassification():
result = sum(i * i for i in range(10 ** 7))
print(result)
return result
# ... other API views
class TaskExecuteView(APIView):
"""
Once an API call is made to this view, the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type and parameters
2. Classification algorithm implementation
"""
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
# this is basically the classification task for now
# need to turn this to an async view
with ProcessPoolExecutor(initializer = SetupDjango) as pool:
loop.run_in_executor(pool, DummyClassification)
return Response({ "message": "The task with id: {} has been started".format(task.taskId) }, status = status.HTTP_200_OK)
我面临的问题如下:
当我不使用
with ProcessPoolExecutor(initializer = SetupDjango) as pool:
时,即没有初始化器,我得到django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
(完整回溯:https://paste.ubuntu.com/p/ctjmFNYMXW/)当我使用初始化器时,视图不再保持异步,它被阻塞了。任务完成后的响应returns,在我的机器上是5秒左右。我确实意识到我并没有真正在我的
DummyClassification()
函数中使用asyncio.sleep()
,但我想不出这样做的方法。
我猜这不是解决问题的方法,因此我们将不胜感激。如果可以的话,我想避免使用芹菜,因为这对我来说似乎有点太复杂了。
编辑:
如果我摆脱 ProcessPoolExecutor()
并简单地执行 loop.run_in_executor(None, DummyClassification)
,它会按预期工作,但是只有一个工作线程正在处理该任务,这对于分类任务来说似乎并不理想。
这是一次骑行。起初,我经历了设置 celery
的痛苦,结果发现使用一个 CPU 核的分类任务的原始问题仍然存在。然后我用 redis
切换到 django-rq
,它目前按预期工作。
from .tasks import Pipeline
class TaskExecuteView(APIView):
"""
Once an API call is made to this view, the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type
2. Classification algorithm implementation
"""
def get(self, request, taskId, *args, **kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
Pipeline.delay(taskId) # this is async now ✔
# mark this as an in-progress task
TaskModel.objects.filter(taskId = taskId).update(inProgress = True)
return Response({ "message": "The task with id: {}, title: {} has been started".format(task.taskId, task.taskTitle) }, status = status.HTTP_200_OK)
tasks.py
from django_rq import job
@job('default', timeout=3600)
def Pipeline(taskId):
# classification task