pymongo:删除重复项(地图减少?)

pymongo: remove duplicates (map reduce?)

我确实有一个包含多个集合(总共约 1500 万个文档)的数据库,文档看起来像这样(简化):

{'Text': 'blabla', 'ID': 101}
{'Text': 'Whuppppyyy', 'ID': 102}
{'Text': 'Abrakadabraaa', 'ID': 103}
{'Text': 'olalalaal', 'ID': 104}
{'Text': 'test1234545', 'ID': 104}
{'Text': 'whapwhapwhap', 'ID': 104}

它们都有一个唯一的 _id 字段,但我想根据另一个字段(外部 ID 字段)删除重复项。

首先,我尝试了一种非常手动的方法,使用列表然后删除,但是数据库似乎太大,需要很长时间并且不实用。

其次,以下内容在当前 MongoDB 版本中不再适用,即使有人建议这样做。

db.collection.ensureIndex( { ID: 1 }, { unique: true, dropDups: true } )

所以,现在我正在尝试创建一个 map reduce 解决方案,但我真的不知道我在做什么,尤其是在使用另一个字段(不是数据库 _id)来查找和删除重复项时遇到困难。这是我糟糕的第一种方法(从一些内部来源采用):

map = Code("function(){ if(this.fieldName){emit(this.fieldName,1);}}")
reduce = Code("function(key,values) {return Array.sum(values);}")
res = coll.map_reduce(map,reduce,"my_results");

response = []
for doc in res.find():
    if(doc['value'] > 1):
        count = int(doc['value']) - 1
        docs = col.find({"fieldName":doc['ID']},{'ID':1}).limit(count)
        for i in docs:
            response.append(i['ID'])

coll.remove({"ID": {"$in": response}})

如果能减少外部 ID 字段中的任何重复项(留下一个条目),我们将不胜感激;) 谢谢!

另一种方法是使用 aggregation framework which has better performance than map-reduce. Consider the following aggregation pipeline which as the first stage of the aggregation pipeline, the $group operator groups documents by the ID field and stores in the unique_ids field each _id value of the grouped records using the $addToSet operator. The $sum accumulator operator adds up the values of the fields passed to it, in this case the constant 1 - thereby counting the number of grouped records into the count field. The other pipeline step $match 过滤器来过滤计数至少为 2 的文档,即重复项。

一旦你从聚合中得到结果,你迭代游标以删除 unique_ids 字段中的第一个 _id,然后将其余的推入一个数组,稍后将用于删除重复项(减去一项):

cursor = db.coll.aggregate(
    [
        {"$group": {"_id": "$ID", "unique_ids": {"$addToSet": "$_id"}, "count": {"$sum": 1}}},
        {"$match": {"count": { "$gte": 2 }}}
    ]
)

response = []
for doc in cursor:
    del doc["unique_ids"][0]
    for id in doc["unique_ids"]:
        response.append(id)

coll.remove({"_id": {"$in": response}})

First, I tried a very manual approach with lists and deleting afterwards, but the DB seems too big, takes very long and is not practical.

最好的办法是使用 .aggregate() method which provides access to the aggregation pipeline to find those documents that are duplicate. The first stage in the pipeline is the $group stage where you group your documents by the duplicated key then use the $push and $sum accumulator operators which respectively return an array of all _id for each group and the count of elements in the group. The next and last stage in the pipeline is the $match stage to return only those result where there is duplicate "ID". From there you then iterate the cursor and update each document using "bulk" 操作。

pipeline = [{'$group': {'_id': '$ID', 'count': {'$sum': 1}, 'ids': {'$push': '$_id'}}},
    {'$match': {'count': {'$gte': 2}}}]

bulk = db.collection.initialize_ordered_bulk_op()
count = 0
for document in db.collection.aggregate(pipeline):
    it = iter(document['ids'])
    next(it)
    for id in it:
        bulk.find({'_id': id}).remove_one({'_id': id})
        count = count + 1
        if count % 1000 == 0:
            bulk.execute()
    if count > 0:
        bulk.execute()

MongoDB 3.2 弃用 Bulk() 及其相关方法,因此您需要使用 bulk_write() 方法来执行您的请求。

from pymongo import DeleteOne

request = []
for document in db.collection.aggregate(pipeline):
    it = iter(document['ids'])
    next(it)
    for id in it:
        requests.append(DeleteOne({'_id': id}))
db.collection.bulk_write(requests)

您也可以在 shell 中执行此操作,如 and

的已接受答案所示

我的解决方案也是使用聚合。 您选择为聚合复制的字段。 结果将是重复项集合的列表。每个位置都将包含一组重复项。 您对列表进行交互,忽略每个组的第一个元素以保留它,并删除其余的。您为每个重复集合都这样做。 见下文:

replic = db.<YOUR_COLLECTION>.aggregate([            # Cursor with all duplicated documents
    {'$group': {
        '_id': {'<FIELD_DUPLICATED>': '$<FIELD_DUPLICATED>'},     # Duplicated field
        'idsUnicos': {'$addToSet': '$_id'},
        'total': {'$sum': 1}
        }
    },
    {'$match': { 
        'total': {'$gt': 1}    # Holds how many duplicates for each group, if you need it.
        }
    }
])
                          # Result is a list of lists of ObjectsIds
for i in replic:
    for idx, j in enumerate(i['idsUnicos']):             # It holds the ids of all duplicates 
        if idx != 0:                                     # Jump over first element to keep it
            <YOUR_COLLECTION>.delete_one({'_id': j})     # Remove the rest

您可以尝试“delete_many”来提高性能。