Django 查询 child 条记录而不获取重复行
Django query on child records without getting duplicate rows
我正在尝试编写一个 Django 查询来查找一组 parent 记录以及某些类型的 child 记录。问题是 parent 记录有两个 children 匹配搜索结果将包含两次。
我怎样才能得到每个 parent 一次,即使它有多个匹配 child?
我在下面提供了一个简单示例来演示该问题。 Blog
是 parent,Entry
是 child。当我搜索包含标题中带有 "Hello" 的条目的博客时,我得到了 Jimmy 博客的两个副本。
这是我创建的记录和我尝试的查询:
b = Blog(name="Jimmy's Jottings")
b.save()
Entry(blog=b, headline='Hello, World!').save()
Entry(blog=b, headline='Hello Kitty').save()
blog_count = Blog.objects.filter(entries__headline__contains='Hello').count()
assert blog_count == 1, blog_count
您可以看到只有一个博客,但是断言失败,计数为两个。
完整示例如下:
# Tested with Django 1.9.2
import sys
import django
from django.apps import apps
from django.apps.config import AppConfig
from django.conf import settings
from django.db import connections, models, DEFAULT_DB_ALIAS
from django.db.models.base import ModelBase
NAME = 'udjango'
def main():
setup()
class Blog(models.Model):
name = models.CharField(max_length=100)
tagline = models.TextField()
def __str__(self): # __unicode__ on Python 2
return self.name
class Entry(models.Model):
blog = models.ForeignKey(Blog, related_name='entries')
headline = models.CharField(max_length=255)
body_text = models.TextField()
def __str__(self): # __unicode__ on Python 2
return self.headline
syncdb(Blog)
syncdb(Entry)
b = Blog(name="Jimmy's Jottings")
b.save()
Entry(blog=b, headline='Hello, World!').save()
Entry(blog=b, headline='Hello Kitty').save()
blog_count = Blog.objects.filter(entries__headline__contains='Hello').count()
assert blog_count == 1, blog_count
print('Done.')
def setup():
DB_FILE = NAME + '.db'
with open(DB_FILE, 'w'):
pass # wipe the database
settings.configure(
DEBUG=True,
DATABASES={
DEFAULT_DB_ALIAS: {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': DB_FILE}},
LOGGING={'version': 1,
'disable_existing_loggers': False,
'formatters': {
'debug': {
'format': '%(asctime)s[%(levelname)s]'
'%(name)s.%(funcName)s(): %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'}},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'debug'}},
'root': {
'handlers': ['console'],
'level': 'WARN'},
'loggers': {
"django.db": {"level": "WARN"}}})
app_config = AppConfig(NAME, sys.modules['__main__'])
apps.populate([app_config])
django.setup()
original_new_func = ModelBase.__new__
@staticmethod
def patched_new(cls, name, bases, attrs):
if 'Meta' not in attrs:
class Meta:
app_label = NAME
attrs['Meta'] = Meta
return original_new_func(cls, name, bases, attrs)
ModelBase.__new__ = patched_new
def syncdb(model):
""" Standard syncdb expects models to be in reliable locations.
Based on https://github.com/django/django/blob/1.9.3
/django/core/management/commands/migrate.py#L285
"""
connection = connections[DEFAULT_DB_ALIAS]
with connection.schema_editor() as editor:
editor.create_model(model)
main()
诀窍是使用子查询查找匹配子项的博客 ID,然后搜索在该子查询中具有 ID 的所有博客。那么子查询可以有重复,而不会导致主查询出现重复。
固定查询如下:
blog_ids = Entry.objects.filter(headline__contains='Hello').values('blog_id')
blog_count = Blog.objects.filter(id__in=blog_ids).count()
assert blog_count == 1, blog_count
这是生成的 SQL 查询:
SELECT COUNT(*) AS "__count"
FROM "udjango_blog"
WHERE "udjango_blog"."id" IN
(
SELECT U0."blog_id"
FROM "udjango_entry" U0
WHERE U0."headline" LIKE '%Hello%' ESCAPE '\'
)
虽然我相信 Don Kirby 的回应有效,但我认为更好的解决方案是在查询集的末尾添加 .distinct()。这只是从查询结果中消除了任何重复的行。 SQL 等价于在给定查询上使用 SELECT DISTINCT。
我正在尝试编写一个 Django 查询来查找一组 parent 记录以及某些类型的 child 记录。问题是 parent 记录有两个 children 匹配搜索结果将包含两次。
我怎样才能得到每个 parent 一次,即使它有多个匹配 child?
我在下面提供了一个简单示例来演示该问题。 Blog
是 parent,Entry
是 child。当我搜索包含标题中带有 "Hello" 的条目的博客时,我得到了 Jimmy 博客的两个副本。
这是我创建的记录和我尝试的查询:
b = Blog(name="Jimmy's Jottings")
b.save()
Entry(blog=b, headline='Hello, World!').save()
Entry(blog=b, headline='Hello Kitty').save()
blog_count = Blog.objects.filter(entries__headline__contains='Hello').count()
assert blog_count == 1, blog_count
您可以看到只有一个博客,但是断言失败,计数为两个。
完整示例如下:
# Tested with Django 1.9.2
import sys
import django
from django.apps import apps
from django.apps.config import AppConfig
from django.conf import settings
from django.db import connections, models, DEFAULT_DB_ALIAS
from django.db.models.base import ModelBase
NAME = 'udjango'
def main():
setup()
class Blog(models.Model):
name = models.CharField(max_length=100)
tagline = models.TextField()
def __str__(self): # __unicode__ on Python 2
return self.name
class Entry(models.Model):
blog = models.ForeignKey(Blog, related_name='entries')
headline = models.CharField(max_length=255)
body_text = models.TextField()
def __str__(self): # __unicode__ on Python 2
return self.headline
syncdb(Blog)
syncdb(Entry)
b = Blog(name="Jimmy's Jottings")
b.save()
Entry(blog=b, headline='Hello, World!').save()
Entry(blog=b, headline='Hello Kitty').save()
blog_count = Blog.objects.filter(entries__headline__contains='Hello').count()
assert blog_count == 1, blog_count
print('Done.')
def setup():
DB_FILE = NAME + '.db'
with open(DB_FILE, 'w'):
pass # wipe the database
settings.configure(
DEBUG=True,
DATABASES={
DEFAULT_DB_ALIAS: {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': DB_FILE}},
LOGGING={'version': 1,
'disable_existing_loggers': False,
'formatters': {
'debug': {
'format': '%(asctime)s[%(levelname)s]'
'%(name)s.%(funcName)s(): %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'}},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'debug'}},
'root': {
'handlers': ['console'],
'level': 'WARN'},
'loggers': {
"django.db": {"level": "WARN"}}})
app_config = AppConfig(NAME, sys.modules['__main__'])
apps.populate([app_config])
django.setup()
original_new_func = ModelBase.__new__
@staticmethod
def patched_new(cls, name, bases, attrs):
if 'Meta' not in attrs:
class Meta:
app_label = NAME
attrs['Meta'] = Meta
return original_new_func(cls, name, bases, attrs)
ModelBase.__new__ = patched_new
def syncdb(model):
""" Standard syncdb expects models to be in reliable locations.
Based on https://github.com/django/django/blob/1.9.3
/django/core/management/commands/migrate.py#L285
"""
connection = connections[DEFAULT_DB_ALIAS]
with connection.schema_editor() as editor:
editor.create_model(model)
main()
诀窍是使用子查询查找匹配子项的博客 ID,然后搜索在该子查询中具有 ID 的所有博客。那么子查询可以有重复,而不会导致主查询出现重复。
固定查询如下:
blog_ids = Entry.objects.filter(headline__contains='Hello').values('blog_id')
blog_count = Blog.objects.filter(id__in=blog_ids).count()
assert blog_count == 1, blog_count
这是生成的 SQL 查询:
SELECT COUNT(*) AS "__count"
FROM "udjango_blog"
WHERE "udjango_blog"."id" IN
(
SELECT U0."blog_id"
FROM "udjango_entry" U0
WHERE U0."headline" LIKE '%Hello%' ESCAPE '\'
)
虽然我相信 Don Kirby 的回应有效,但我认为更好的解决方案是在查询集的末尾添加 .distinct()。这只是从查询结果中消除了任何重复的行。 SQL 等价于在给定查询上使用 SELECT DISTINCT。