使用 Django REST Framework 进行批量插入的最佳设计模式是什么?

What is the best design pattern for batch insertion using the Django REST Framework?

背景

我有一个允许通过 Django REST Framework 插入记录的 Django 应用程序。

查询电子表格和其他数据库的客户端应用程序将定期逐行批量插入记录。 REST API 允许处理数据转换等的其他应用程序从 Django 中抽象出来。

问题

我想将实际的记录插入与 API 分离,以提高容错能力和可伸缩性的潜力。

建议的方法

我正在考虑用 Celery 做这个,但我以前没有用过它。我正在考虑在我现有的 DRF ModelViewSet 中覆盖 perform_create()perform_create() 已添加到 DRF 3.0)以创建工作人员将在后台获取和处理的 Celery 任务。

DRF 文档说 perform_create() 应该 "should save the object instance by calling serializer.save()"。我想知道,就我而言,我是否可以忽略此建议,而是让我的 Celery 任务调用适当的序列化程序来执行对象保存。

例子

例如,如果我有几个模型:

class Book(models.Model):
    name = models.CharField(max_length=32)

class Author(models.Model):
    surname = models.CharField(max_length=32)

我已经为这些模型提供了 DRF 视图和序列化程序:

class BookSerializer(serializers.ModelSerializer):
    class Meta:
        model = Book

class AuthorSerializer(serializers.ModelSerializer):
    class Meta:
        model = Author

class BookViewSet(viewsets.ModelViewSet):
    queryset = Book.objects.all()
    serializer_class = Book

class AuthorViewSet(viewsets.ModelViewSet):
    queryset = Author.objects.all()
    serializer_class = Author

覆盖 perform_create() 是个好主意吗? BookViewSet:

def perform_create(self, serializer):
    create_book_task(serializer.data)

其中 create_book_task 分别类似于:

@shared_task
def create_book_task(data):
    serializer = BookSerializer(data=data)
    serializer.save()

我真的没能找到任何其他开发人员做类似事情或试图解决相同问题的例子。我是不是太复杂了?当涉及到物理插入时,我的数据库仍然是限制因素,但至少它不会阻止 API 客户端排队他们的数据。如果它不合适,我不会致力于芹菜。这是最好的解决方案吗?它是否存在明显的问题,或者是否有更好的替代方案?

我发现你的方法是合理的,Celery 很棒,除了一些边界情况,根据我的经验可能会有点讨厌(但我不希望 运行 在你概述的用例中在问题中)。

但是,考虑使用 Redis 的简化方法如下。它有利也有弊。

在 BookViewSet 中:

from redis import StrictRedis
from rest_framework import viewsets, renderers

redis_client = StrictRedis()

class BookViewSet(viewsets.ModelViewSet):
    queryset = Book.objects.all()
    serializer_class = Book

    def perform_create(self, serializer):
        json = renderers.JSONRenderer().render(serializer.data)
        redis_client.lpush('create_book_task', json)

在单独的工作脚本中:

from django.utils.six import BytesIO
from redis import StrictRedis
from rest_framework.parsers import JSONParser
from myproject import BookSerializer, Book

MAX_BATCH_SIZE = 1000

def create_book_task():
    bookset = []
    for json in redis_client.brpop(('create_book_task',)):
       stream = BytesIO(json)
       data = JSONParser().parse(stream)
       serializer = BookSerializer(data=data)
       assert serializer.is_valid()
       bookset.append(serializer.instance)
       if len(bookset) >= MAX_BATCH_SIZE:
           break

    if len(bookset) > 0:
        Book.objects.bulk_create(bookset)

while True:
    create_book_task()

优点

  • 您不需要添加 Celery(再一次,喜欢它,但它使测试变得有点棘手,有时会变得有点毛茸茸,具体取决于工作负载、配置等)
  • 它处理批量创建,因此如果您在短时间内(几秒或不到一秒)提交了数千本书,则只会在数据库上执行少量插入(而不是数千次插入)

缺点

  • 您正在自己处理低级序列化,而不是 Celery 做的 "magically"
  • 您将需要自己管理 worker 脚本(对其进行守护进程,可能将其打包为管理命令,负责重启等),而不是将其交给 Celery

当然以上是第一种方法,您可能想让它更通用以重用于其他模型,将 MAX_BATCH_SIZE 移动到您的设置,使用酸洗而不是 JSON 或根据您的特定需求进行各种其他调整、改进或设计决策。

最后,我可能会同意我的回答中概述的方法,除非您预计还有其他几项任务将被卸载到异步处理中,在这种情况下使用 Celery 的情况会变得更加强烈。

PS:由于实际插入将异步完成,请考虑使用 202 Accepted response code 而不是 201 Created 进行响应(除非这会搞砸您的客户)。