使用 pymongo 在 MongoDB collection 中高效地创建新字段

Efficiently create new fields across a MongoDB collection using pymongo

我有一个 collection 文档有一个字段,称之为 field1 我想在每个文档上调用一个(复杂的)python 函数 fxn field1 条目并将其存储在新的 field2 中。我的 collection 非常大,fxn 需要几秒钟才能 运行 所以我想在几个作业中并行处理它。到目前为止,这是我的方法:

for i, entry in enumerate(collection.find().sort('_id')):
    if i % nJobs != jobID: continue
    field1 = entry['field1']
    field2 = fxn(field1)
    collection.update({'_id': entry['_id']}, {'$set': {'field2':field2})

其中nJobs是作业总数,jobID是当前作业的索引(例如说我运行这个脚本并行5次,那么nJobs=5jobID 范围从 0 到 4)

有没有更快或更可靠的方法来实现它?我宁愿将所有内容都保存在 python 中,因为 fxn 需要保存在 python.

您基本上需要使用 Bulk API,在 for 循环中您可以利用写入命令 Bulk API 允许批量更新操作的执行,这些操作只是服务器顶部的抽象,可以轻松构建批量操作。这些批量操作主要有两种形式:

  • 已订购批量操作。这些操作按顺序执行所有操作,并在第一次写入错误时出错。
  • 无序批量操作。这些操作并行执行所有操作并汇总所有错误。无序批量操作不保证执行顺序。

这非常有效,因为您没有向服务器发送 "every" 请求,而是每 1000 个请求发送一次,api 实际上会在后台为您解决这个问题。请注意,对于 2.6 之前的旧服务器,API 将下转换操作。但是,不可能 100% 下转换,因此在某些边缘情况下可能无法正确报告正确的数字。

在非分片集群上实现此功能需要使用 snapshot 参数,以便您可以将查找光标与更新后的同一文档隔离开来:

bulk = db.collection.initialize_ordered_bulk_op()
counter = 0;

for entry in collection.find(snapshot = True):
    # process in bulk
    # calc field2 value first
    field2 = fxn(entry.field1)
    bulk.find({ '_id': entry._id }).update({ '$set': { 'field2': field2 } })
    counter++

    if ( counter % 1000 == 0 ):
        bulk.execute()
        bulk = db.collection.initialize_ordered_bulk_op()

if (counter % 1000 != 0):
    bulk.execute()