在 Django 中聚合窗口查询集

Aggregating a windowed queryset in Django

背景

假设我们有一组问题,以及一组学生回答了这些问题。 答案已经过审核,分数 已分配,范围未知。

现在,我们需要根据每个问题中的极值对分数进行归一化。 例如,如果 问题 1 的最小 分数 为 4,最大 分数 为 12,则那些分数将分别归一化为 0 和 1。中间的分数是线性插值的(如 Normalization to bring in the range of [0,1] 中所述)。

然后,对于每个学生,我们想知道所有问题的归一化分数的平均值 合并。

最小示例

这是一个非常幼稚的最小实现,只是为了说明我们想要实现的目标:

class Question(models.Model):
    pass


class Student(models.Model):
    def mean_normalized_score(self):
        normalized_scores = []
        for score in self.score_set.all():
            normalized_scores.append(score.normalized_value())
        return mean(normalized_scores) if normalized_scores else None


class Score(models.Model):
    student = models.ForeignKey(to=Student, on_delete=models.CASCADE)
    question = models.ForeignKey(to=Question, on_delete=models.CASCADE)
    value = models.FloatField()

    def normalized_value(self):
        limits = Score.objects.filter(question=self.question).aggregate(
            min=models.Min('value'), max=models.Max('value'))
        return (self.value - limits['min']) / (limits['max'] - limits['min'])

这个效果很好,但是在数据库查询等方面效率很低

目标

我宁愿将数字运算卸载到数据库,而不是上面的实现。

我试过的

例如,考虑这两个用例:

  1. 列出所有 Score 个对象的 normalized_value
  2. 列出所有 Student 个对象的 mean_normalized_score

可以在查询中使用 window functions 来涵盖第一个用例,如下所示:

w_min = Window(expression=Min('value'), partition_by=[F('question')])
w_max = Window(expression=Max('value'), partition_by=[F('question')])
annotated_scores = Score.objects.annotate(
    normalized_value=(F('value') - w_min) / (w_max - w_min))

效果很好,因此不再需要示例中的 Score.normalized_value() 方法。

现在,我想对第二个用例做类似的事情,用单个数据库查询替换 Student.mean_normalized_score() 方法。

原始 SQL 可能看起来像这样(对于 sqlite):

SELECT id, student_id, AVG(normalized_value) AS mean_normalized_score
FROM (
    SELECT
        myapp_score.*,
        ((myapp_score.value - MIN(myapp_score.value) OVER (PARTITION BY myapp_score.question_id)) / (MAX(myapp_score.value) OVER (PARTITION BY myapp_score.question_id) - MIN(myapp_score.value) OVER (PARTITION BY myapp_score.question_id)))
        AS normalized_value
    FROM myapp_score
    ) 
GROUP BY student_id

我可以将此工作作为 raw Django query,但我还没有能够使用 Django 的 ORM 重现此查询。

我尝试在上述 annotated_scores 查询集上构建,使用 Django 的 Subqueryannotate()aggregate()Prefetch 和组合那些,但我一定是在某个地方犯了错误。

可能我得到的最接近的是这个:

subquery = Subquery(annotated_scores.values('normalized_value'))
Score.objects.values('student_id').annotate(mean=Avg(subquery))

但这是不正确的。

有人可以在不求助于原始查询的情况下为我指明正确的方向吗?

我可能已经找到一种使用子查询来执行此操作的方法。最主要的是至少来自 django,我们不能在聚合上使用 window 函数,所以这就是阻止计算标准化值的平均值的原因。我在这些行上添加了评论来解释我要做什么:

# Get the minimum score per question
min_subquery = Score.objects.filter(question=OuterRef('question')).values('question').annotate(min=Min('value'))

# Get the maximum score per question
max_subquery = Score.objects.filter(question=OuterRef('question')).values('question').annotate(max=Max('value'))

# Calculate the normalized value per score, then get the average by grouping by students
mean_subquery = Score.objects.filter(student=OuterRef('pk')).annotate(
    min=Subquery(min_subquery.values('min')[:1]), 
    max=Subquery(max_subquery.values('max')[:1]), 
    normalized=ExpressionWrapper((F('value') - F('min'))/(F('max') - F('min')), output_field=FloatField())
).values('student').annotate(mean=Avg('normalized'))

# Get the calculated mean per student
Student.objects.annotate(mean=Subquery(mean_subquery.values('mean')[:1]))

结果SQL是:

SELECT 
  "student"."id", 
  "student"."name", 
  (
    SELECT 
      AVG(
        (
          (
            V0."value" - (
              SELECT 
                MIN(U0."value") AS "min" 
              FROM 
                "score" U0 
              WHERE 
                U0."question_id" = (V0."question_id") 
              GROUP BY 
                U0."question_id" 
              LIMIT 
                1
            )
          ) / (
            (
              SELECT 
                MAX(U0."value") AS "max" 
              FROM 
                "score" U0 
              WHERE 
                U0."question_id" = (V0."question_id") 
              GROUP BY 
                U0."question_id" 
              LIMIT 
                1
            ) - (
              SELECT 
                MIN(U0."value") AS "min" 
              FROM 
                "score" U0 
              WHERE 
                U0."question_id" = (V0."question_id") 
              GROUP BY 
                U0."question_id" 
              LIMIT 
                1
            )
          )
        )
      ) AS "mean" 
    FROM 
      "score" V0 
    WHERE 
      V0."student_id" = ("student"."id") 
    GROUP BY 
      V0."student_id" 
    LIMIT 
      1
  ) AS "mean" 
FROM 
  "student"

正如@bdbd 所提到的,并且从 this Django issue 来看,似乎还不可能注释窗口查询集(使用 Django 3.2)。

作为临时解决方法,我重构了 如下。

class ScoreQuerySet(models.QuerySet):
    def annotate_normalized(self):
        w_min = Subquery(self.filter(
            question=OuterRef('question')).values('question').annotate(
            min=Min('value')).values('min')[:1])
        w_max = Subquery(self.filter(
            question=OuterRef('question')).values('question').annotate(
            max=Max('value')).values('max')[:1])
        return self.annotate(normalized=(F('value') - w_min) / (w_max - w_min))

    def aggregate_student_mean(self):
        return self.annotate_normalized().values('student_id').annotate(
            mean=Avg('normalized'))


class Score(models.Model):
    objects = ScoreQuerySet.as_manager()
    ...

注意:如有必要,我们可以向 aggregate_student_mean() 中的 values() 添加更多 Student 查找,例如student__name。只要注意别把分组搞乱就好了。

现在,如果过滤和注释窗口查询集成为可能,我们可以简单地用更简单的 Window 实现替换 Subquery 行:

w_min = Window(expression=Min('value'), partition_by=[F('question')])
w_max = Window(expression=Max('value'), partition_by=[F('question')])