在 Django 中聚合窗口查询集
Aggregating a windowed queryset in Django
背景
假设我们有一组问题,以及一组学生回答了这些问题。
答案已经过审核,分数 已分配,范围未知。
现在,我们需要根据每个问题中的极值对分数进行归一化。
例如,如果 问题 1 的最小 分数 为 4,最大 分数 为 12,则那些分数将分别归一化为 0 和 1。中间的分数是线性插值的(如 Normalization to bring in the range of [0,1] 中所述)。
然后,对于每个学生,我们想知道所有问题的归一化分数的平均值 合并。
最小示例
这是一个非常幼稚的最小实现,只是为了说明我们想要实现的目标:
class Question(models.Model):
pass
class Student(models.Model):
def mean_normalized_score(self):
normalized_scores = []
for score in self.score_set.all():
normalized_scores.append(score.normalized_value())
return mean(normalized_scores) if normalized_scores else None
class Score(models.Model):
student = models.ForeignKey(to=Student, on_delete=models.CASCADE)
question = models.ForeignKey(to=Question, on_delete=models.CASCADE)
value = models.FloatField()
def normalized_value(self):
limits = Score.objects.filter(question=self.question).aggregate(
min=models.Min('value'), max=models.Max('value'))
return (self.value - limits['min']) / (limits['max'] - limits['min'])
这个效果很好,但是在数据库查询等方面效率很低
目标
我宁愿将数字运算卸载到数据库,而不是上面的实现。
我试过的
例如,考虑这两个用例:
- 列出所有
Score
个对象的 normalized_value
- 列出所有
Student
个对象的 mean_normalized_score
可以在查询中使用 window functions 来涵盖第一个用例,如下所示:
w_min = Window(expression=Min('value'), partition_by=[F('question')])
w_max = Window(expression=Max('value'), partition_by=[F('question')])
annotated_scores = Score.objects.annotate(
normalized_value=(F('value') - w_min) / (w_max - w_min))
效果很好,因此不再需要示例中的 Score.normalized_value()
方法。
现在,我想对第二个用例做类似的事情,用单个数据库查询替换 Student.mean_normalized_score()
方法。
原始 SQL 可能看起来像这样(对于 sqlite):
SELECT id, student_id, AVG(normalized_value) AS mean_normalized_score
FROM (
SELECT
myapp_score.*,
((myapp_score.value - MIN(myapp_score.value) OVER (PARTITION BY myapp_score.question_id)) / (MAX(myapp_score.value) OVER (PARTITION BY myapp_score.question_id) - MIN(myapp_score.value) OVER (PARTITION BY myapp_score.question_id)))
AS normalized_value
FROM myapp_score
)
GROUP BY student_id
我可以将此工作作为 raw Django query,但我还没有能够使用 Django 的 ORM 重现此查询。
我尝试在上述 annotated_scores
查询集上构建,使用 Django 的 Subquery、annotate()
、aggregate()
、Prefetch
和组合那些,但我一定是在某个地方犯了错误。
可能我得到的最接近的是这个:
subquery = Subquery(annotated_scores.values('normalized_value'))
Score.objects.values('student_id').annotate(mean=Avg(subquery))
但这是不正确的。
有人可以在不求助于原始查询的情况下为我指明正确的方向吗?
我可能已经找到一种使用子查询来执行此操作的方法。最主要的是至少来自 django,我们不能在聚合上使用 window 函数,所以这就是阻止计算标准化值的平均值的原因。我在这些行上添加了评论来解释我要做什么:
# Get the minimum score per question
min_subquery = Score.objects.filter(question=OuterRef('question')).values('question').annotate(min=Min('value'))
# Get the maximum score per question
max_subquery = Score.objects.filter(question=OuterRef('question')).values('question').annotate(max=Max('value'))
# Calculate the normalized value per score, then get the average by grouping by students
mean_subquery = Score.objects.filter(student=OuterRef('pk')).annotate(
min=Subquery(min_subquery.values('min')[:1]),
max=Subquery(max_subquery.values('max')[:1]),
normalized=ExpressionWrapper((F('value') - F('min'))/(F('max') - F('min')), output_field=FloatField())
).values('student').annotate(mean=Avg('normalized'))
# Get the calculated mean per student
Student.objects.annotate(mean=Subquery(mean_subquery.values('mean')[:1]))
结果SQL是:
SELECT
"student"."id",
"student"."name",
(
SELECT
AVG(
(
(
V0."value" - (
SELECT
MIN(U0."value") AS "min"
FROM
"score" U0
WHERE
U0."question_id" = (V0."question_id")
GROUP BY
U0."question_id"
LIMIT
1
)
) / (
(
SELECT
MAX(U0."value") AS "max"
FROM
"score" U0
WHERE
U0."question_id" = (V0."question_id")
GROUP BY
U0."question_id"
LIMIT
1
) - (
SELECT
MIN(U0."value") AS "min"
FROM
"score" U0
WHERE
U0."question_id" = (V0."question_id")
GROUP BY
U0."question_id"
LIMIT
1
)
)
)
) AS "mean"
FROM
"score" V0
WHERE
V0."student_id" = ("student"."id")
GROUP BY
V0."student_id"
LIMIT
1
) AS "mean"
FROM
"student"
正如@bdbd 所提到的,并且从 this Django issue 来看,似乎还不可能注释窗口查询集(使用 Django 3.2)。
作为临时解决方法,我重构了 如下。
class ScoreQuerySet(models.QuerySet):
def annotate_normalized(self):
w_min = Subquery(self.filter(
question=OuterRef('question')).values('question').annotate(
min=Min('value')).values('min')[:1])
w_max = Subquery(self.filter(
question=OuterRef('question')).values('question').annotate(
max=Max('value')).values('max')[:1])
return self.annotate(normalized=(F('value') - w_min) / (w_max - w_min))
def aggregate_student_mean(self):
return self.annotate_normalized().values('student_id').annotate(
mean=Avg('normalized'))
class Score(models.Model):
objects = ScoreQuerySet.as_manager()
...
注意:如有必要,我们可以向 aggregate_student_mean()
中的 values()
添加更多 Student
查找,例如student__name
。只要注意别把分组搞乱就好了。
现在,如果过滤和注释窗口查询集成为可能,我们可以简单地用更简单的 Window
实现替换 Subquery
行:
w_min = Window(expression=Min('value'), partition_by=[F('question')])
w_max = Window(expression=Max('value'), partition_by=[F('question')])
背景
假设我们有一组问题,以及一组学生回答了这些问题。 答案已经过审核,分数 已分配,范围未知。
现在,我们需要根据每个问题中的极值对分数进行归一化。 例如,如果 问题 1 的最小 分数 为 4,最大 分数 为 12,则那些分数将分别归一化为 0 和 1。中间的分数是线性插值的(如 Normalization to bring in the range of [0,1] 中所述)。
然后,对于每个学生,我们想知道所有问题的归一化分数的平均值 合并。
最小示例
这是一个非常幼稚的最小实现,只是为了说明我们想要实现的目标:
class Question(models.Model):
pass
class Student(models.Model):
def mean_normalized_score(self):
normalized_scores = []
for score in self.score_set.all():
normalized_scores.append(score.normalized_value())
return mean(normalized_scores) if normalized_scores else None
class Score(models.Model):
student = models.ForeignKey(to=Student, on_delete=models.CASCADE)
question = models.ForeignKey(to=Question, on_delete=models.CASCADE)
value = models.FloatField()
def normalized_value(self):
limits = Score.objects.filter(question=self.question).aggregate(
min=models.Min('value'), max=models.Max('value'))
return (self.value - limits['min']) / (limits['max'] - limits['min'])
这个效果很好,但是在数据库查询等方面效率很低
目标
我宁愿将数字运算卸载到数据库,而不是上面的实现。
我试过的
例如,考虑这两个用例:
- 列出所有
Score
个对象的normalized_value
- 列出所有
Student
个对象的mean_normalized_score
可以在查询中使用 window functions 来涵盖第一个用例,如下所示:
w_min = Window(expression=Min('value'), partition_by=[F('question')])
w_max = Window(expression=Max('value'), partition_by=[F('question')])
annotated_scores = Score.objects.annotate(
normalized_value=(F('value') - w_min) / (w_max - w_min))
效果很好,因此不再需要示例中的 Score.normalized_value()
方法。
现在,我想对第二个用例做类似的事情,用单个数据库查询替换 Student.mean_normalized_score()
方法。
原始 SQL 可能看起来像这样(对于 sqlite):
SELECT id, student_id, AVG(normalized_value) AS mean_normalized_score
FROM (
SELECT
myapp_score.*,
((myapp_score.value - MIN(myapp_score.value) OVER (PARTITION BY myapp_score.question_id)) / (MAX(myapp_score.value) OVER (PARTITION BY myapp_score.question_id) - MIN(myapp_score.value) OVER (PARTITION BY myapp_score.question_id)))
AS normalized_value
FROM myapp_score
)
GROUP BY student_id
我可以将此工作作为 raw Django query,但我还没有能够使用 Django 的 ORM 重现此查询。
我尝试在上述 annotated_scores
查询集上构建,使用 Django 的 Subquery、annotate()
、aggregate()
、Prefetch
和组合那些,但我一定是在某个地方犯了错误。
可能我得到的最接近的是这个:
subquery = Subquery(annotated_scores.values('normalized_value'))
Score.objects.values('student_id').annotate(mean=Avg(subquery))
但这是不正确的。
有人可以在不求助于原始查询的情况下为我指明正确的方向吗?
我可能已经找到一种使用子查询来执行此操作的方法。最主要的是至少来自 django,我们不能在聚合上使用 window 函数,所以这就是阻止计算标准化值的平均值的原因。我在这些行上添加了评论来解释我要做什么:
# Get the minimum score per question
min_subquery = Score.objects.filter(question=OuterRef('question')).values('question').annotate(min=Min('value'))
# Get the maximum score per question
max_subquery = Score.objects.filter(question=OuterRef('question')).values('question').annotate(max=Max('value'))
# Calculate the normalized value per score, then get the average by grouping by students
mean_subquery = Score.objects.filter(student=OuterRef('pk')).annotate(
min=Subquery(min_subquery.values('min')[:1]),
max=Subquery(max_subquery.values('max')[:1]),
normalized=ExpressionWrapper((F('value') - F('min'))/(F('max') - F('min')), output_field=FloatField())
).values('student').annotate(mean=Avg('normalized'))
# Get the calculated mean per student
Student.objects.annotate(mean=Subquery(mean_subquery.values('mean')[:1]))
结果SQL是:
SELECT
"student"."id",
"student"."name",
(
SELECT
AVG(
(
(
V0."value" - (
SELECT
MIN(U0."value") AS "min"
FROM
"score" U0
WHERE
U0."question_id" = (V0."question_id")
GROUP BY
U0."question_id"
LIMIT
1
)
) / (
(
SELECT
MAX(U0."value") AS "max"
FROM
"score" U0
WHERE
U0."question_id" = (V0."question_id")
GROUP BY
U0."question_id"
LIMIT
1
) - (
SELECT
MIN(U0."value") AS "min"
FROM
"score" U0
WHERE
U0."question_id" = (V0."question_id")
GROUP BY
U0."question_id"
LIMIT
1
)
)
)
) AS "mean"
FROM
"score" V0
WHERE
V0."student_id" = ("student"."id")
GROUP BY
V0."student_id"
LIMIT
1
) AS "mean"
FROM
"student"
正如@bdbd 所提到的,并且从 this Django issue 来看,似乎还不可能注释窗口查询集(使用 Django 3.2)。
作为临时解决方法,我重构了
class ScoreQuerySet(models.QuerySet):
def annotate_normalized(self):
w_min = Subquery(self.filter(
question=OuterRef('question')).values('question').annotate(
min=Min('value')).values('min')[:1])
w_max = Subquery(self.filter(
question=OuterRef('question')).values('question').annotate(
max=Max('value')).values('max')[:1])
return self.annotate(normalized=(F('value') - w_min) / (w_max - w_min))
def aggregate_student_mean(self):
return self.annotate_normalized().values('student_id').annotate(
mean=Avg('normalized'))
class Score(models.Model):
objects = ScoreQuerySet.as_manager()
...
注意:如有必要,我们可以向 aggregate_student_mean()
中的 values()
添加更多 Student
查找,例如student__name
。只要注意别把分组搞乱就好了。
现在,如果过滤和注释窗口查询集成为可能,我们可以简单地用更简单的 Window
实现替换 Subquery
行:
w_min = Window(expression=Min('value'), partition_by=[F('question')])
w_max = Window(expression=Max('value'), partition_by=[F('question')])