如何将 Django Rest Framework API 视图转换为异步视图?

How to turn a Django Rest Framework API View into an async one?

我正在尝试构建一个 REST API 来管理一些机器学习分类任务。我写了一个 API 视图,当它被点击时,将触发分类任务的开始(例如:使用用户先前提供的数据训练 SVM 分类器)。但是,这是一项很长的 运行 任务,所以我最好不要让用户在向该视图发出请求后等待。相反,我想在后台启动此任务并立即给他们答复。他们稍后可以在单独的视图中查看分类结果(尚未实现。)

我在 settings.py 中使用 ASGI_APPLICATION = 'mlxplorebackend.asgi.application'

这是我在 views.py

中的 API 视图
import asyncio
from concurrent.futures import ProcessPoolExecutor
from django import setup as SetupDjango

# ... other imports

loop = asyncio.get_event_loop()

def DummyClassification():
  result = sum(i * i for i in range(10 ** 7))
  print(result)
  return result

# ... other API views

class TaskExecuteView(APIView):
  """
  Once an API call is made to this view, the classification algorithm will start being processed.
  Depends on:
  1. Parser for the classification algorithm type and parameters
  2. Classification algorithm implementation
  """

  def get(self, request, taskId, *args, **kwargs):
    try:
      task = TaskModel.objects.get(taskId = taskId)
    except TaskModel.DoesNotExist:
      raise Http404
    else:
      # this is basically the classification task for now
      # need to turn this to an async view      
      with ProcessPoolExecutor(initializer = SetupDjango) as pool:
        loop.run_in_executor(pool, DummyClassification)

      return Response({ "message": "The task with id: {} has been started".format(task.taskId) }, status = status.HTTP_200_OK)

我面临的问题如下:

  1. 当我不使用 with ProcessPoolExecutor(initializer = SetupDjango) as pool: 时,即没有初始化器,我得到 django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.(完整回溯:https://paste.ubuntu.com/p/ctjmFNYMXW/

  2. 当我使用初始化器时,视图不再保持异步,它被阻塞了。任务完成后的响应returns,在我的机器上是5秒左右。我确实意识到我并没有真正在我的 DummyClassification() 函数中使用 asyncio.sleep(),但我想不出这样做的方法。

我猜这不是解决问题的方法,因此我们将不胜感激。如果可以的话,我想避免使用芹菜,因为这对我来说似乎有点太复杂了。

编辑: 如果我摆脱 ProcessPoolExecutor() 并简单地执行 loop.run_in_executor(None, DummyClassification),它会按预期工作,但是只有一个工作线程正在处理该任务,这对于分类任务来说似乎并不理想。

这是一次骑行。起初,我经历了设置 celery 的痛苦,结果发现使用一个 CPU 核的分类任务的原始问题仍然存在。然后我用 redis 切换到 django-rq,它目前按预期工作。

from .tasks import Pipeline

class TaskExecuteView(APIView):
  """
  Once an API call is made to this view, the classification algorithm will start being processed.
  Depends on:
  1. Parser for the classification algorithm type
  2. Classification algorithm implementation
  """

  def get(self, request, taskId, *args, **kwargs):
    try:
      task = TaskModel.objects.get(taskId = taskId)
    except TaskModel.DoesNotExist:
      raise Http404
    else:
      Pipeline.delay(taskId) # this is async now ✔

      # mark this as an in-progress task
      TaskModel.objects.filter(taskId = taskId).update(inProgress = True)

      return Response({ "message": "The task with id: {}, title: {} has been started".format(task.taskId, task.taskTitle) }, status = status.HTTP_200_OK)

tasks.py

from django_rq import job

@job('default', timeout=3600)
def Pipeline(taskId):
  # classification task