使用 pymongo 在 MongoDB collection 中高效地创建新字段
Efficiently create new fields across a MongoDB collection using pymongo
我有一个 collection 文档有一个字段,称之为 field1
我想在每个文档上调用一个(复杂的)python 函数 fxn
field1
条目并将其存储在新的 field2
中。我的 collection 非常大,fxn
需要几秒钟才能 运行 所以我想在几个作业中并行处理它。到目前为止,这是我的方法:
for i, entry in enumerate(collection.find().sort('_id')):
if i % nJobs != jobID: continue
field1 = entry['field1']
field2 = fxn(field1)
collection.update({'_id': entry['_id']}, {'$set': {'field2':field2})
其中nJobs
是作业总数,jobID
是当前作业的索引(例如说我运行这个脚本并行5次,那么nJobs=5
和 jobID
范围从 0 到 4)
有没有更快或更可靠的方法来实现它?我宁愿将所有内容都保存在 python 中,因为 fxn
需要保存在 python.
中
您基本上需要使用 Bulk API,在 for 循环中您可以利用写入命令 Bulk API 允许批量更新操作的执行,这些操作只是服务器顶部的抽象,可以轻松构建批量操作。这些批量操作主要有两种形式:
- 已订购批量操作。这些操作按顺序执行所有操作,并在第一次写入错误时出错。
- 无序批量操作。这些操作并行执行所有操作并汇总所有错误。无序批量操作不保证执行顺序。
这非常有效,因为您没有向服务器发送 "every" 请求,而是每 1000 个请求发送一次,api 实际上会在后台为您解决这个问题。请注意,对于 2.6 之前的旧服务器,API 将下转换操作。但是,不可能 100% 下转换,因此在某些边缘情况下可能无法正确报告正确的数字。
在非分片集群上实现此功能需要使用 snapshot
参数,以便您可以将查找光标与更新后的同一文档隔离开来:
bulk = db.collection.initialize_ordered_bulk_op()
counter = 0;
for entry in collection.find(snapshot = True):
# process in bulk
# calc field2 value first
field2 = fxn(entry.field1)
bulk.find({ '_id': entry._id }).update({ '$set': { 'field2': field2 } })
counter++
if ( counter % 1000 == 0 ):
bulk.execute()
bulk = db.collection.initialize_ordered_bulk_op()
if (counter % 1000 != 0):
bulk.execute()
我有一个 collection 文档有一个字段,称之为 field1
我想在每个文档上调用一个(复杂的)python 函数 fxn
field1
条目并将其存储在新的 field2
中。我的 collection 非常大,fxn
需要几秒钟才能 运行 所以我想在几个作业中并行处理它。到目前为止,这是我的方法:
for i, entry in enumerate(collection.find().sort('_id')):
if i % nJobs != jobID: continue
field1 = entry['field1']
field2 = fxn(field1)
collection.update({'_id': entry['_id']}, {'$set': {'field2':field2})
其中nJobs
是作业总数,jobID
是当前作业的索引(例如说我运行这个脚本并行5次,那么nJobs=5
和 jobID
范围从 0 到 4)
有没有更快或更可靠的方法来实现它?我宁愿将所有内容都保存在 python 中,因为 fxn
需要保存在 python.
您基本上需要使用 Bulk API,在 for 循环中您可以利用写入命令 Bulk API 允许批量更新操作的执行,这些操作只是服务器顶部的抽象,可以轻松构建批量操作。这些批量操作主要有两种形式:
- 已订购批量操作。这些操作按顺序执行所有操作,并在第一次写入错误时出错。
- 无序批量操作。这些操作并行执行所有操作并汇总所有错误。无序批量操作不保证执行顺序。
这非常有效,因为您没有向服务器发送 "every" 请求,而是每 1000 个请求发送一次,api 实际上会在后台为您解决这个问题。请注意,对于 2.6 之前的旧服务器,API 将下转换操作。但是,不可能 100% 下转换,因此在某些边缘情况下可能无法正确报告正确的数字。
在非分片集群上实现此功能需要使用 snapshot
参数,以便您可以将查找光标与更新后的同一文档隔离开来:
bulk = db.collection.initialize_ordered_bulk_op()
counter = 0;
for entry in collection.find(snapshot = True):
# process in bulk
# calc field2 value first
field2 = fxn(entry.field1)
bulk.find({ '_id': entry._id }).update({ '$set': { 'field2': field2 } })
counter++
if ( counter % 1000 == 0 ):
bulk.execute()
bulk = db.collection.initialize_ordered_bulk_op()
if (counter % 1000 != 0):
bulk.execute()