对集合 Mongo DB 中的每个文档调用自定义 python 函数
call custom python function on every document in a collection Mongo DB
我想对整个集合中每个文档的某些现有属性调用自定义 python 函数 并且 将结果作为新的键值对存储在那个(同一个)文件。我可以知道是否有任何方法可以做到这一点(因为每个调用都独立于其他调用)?
我注意到了 cursor.forEach
,但是仅仅使用 python 就不能有效地完成吗?
一个简单的例子是拆分 text
中的字符串并存储编号。单词作为新属性。
def split_count(text):
# some complex preprocessing...
return len(text.split())
# Need something like this...
db.collection.update_many({}, {'$set': {"split": split_count('$text') }}, upsert=True)
但似乎根据同一文档中另一个属性的值在文档中设置新属性是 not possible 这种方式。这个 post 是旧的,但问题似乎仍然存在。
在 python 中做这种事情不太可能有效。这是因为文档必须往返并通过客户端计算机上的 python 函数。
在您的示例代码中,您将函数的结果传递给 mongodb update
查询,这将不起作用。您不能 运行 数据库服务器上 mongodb 查询中的任何 python 代码。
正如 answer to you linked question 所暗示的,此类操作必须在 mongo shell 中执行。例如:
db.collection.find().snapshot().forEach(
function (elem) {
splitLength = elem.text.split(" ").length
db.collection.update(
{
_id: elem._id
},
{
$set: {
split: splitLength
}
}
);
}
);
我找到了一种在 PyMongo 中使用 parallel_scan 在集合上调用任何自定义 python 函数的方法。
def process_text(cursor):
for row in cursor.batch_size(200):
# Any complex preprocessing here...
split_text = row['text'].split()
db.collection.update_one({'_id': row['_id']},
{'$set': {'split_text': split_text,
'num_words': len(split_text) }},
upsert=True)
def preprocess(num_threads=4):
# Get up to max 'num_threads' cursors.
cursors = db.collection.parallel_scan(num_threads)
threads = [threading.Thread(target=process_text, args=(cursor,)) for cursor in cursors]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
这并不比 cursor.forEach
快(但也没有那么慢),但它可以帮助我执行任意复杂的 python 代码并在 Python 本身中保存结果.
此外,如果我在其中一个属性中有一个 ints
数组,执行 cursor.forEach
会将它们转换为我不想要的 floats
。所以我更喜欢这种方式。
但我很高兴知道是否有比这更好的方法:)
我想对整个集合中每个文档的某些现有属性调用自定义 python 函数 并且 将结果作为新的键值对存储在那个(同一个)文件。我可以知道是否有任何方法可以做到这一点(因为每个调用都独立于其他调用)?
我注意到了 cursor.forEach
,但是仅仅使用 python 就不能有效地完成吗?
一个简单的例子是拆分 text
中的字符串并存储编号。单词作为新属性。
def split_count(text):
# some complex preprocessing...
return len(text.split())
# Need something like this...
db.collection.update_many({}, {'$set': {"split": split_count('$text') }}, upsert=True)
但似乎根据同一文档中另一个属性的值在文档中设置新属性是 not possible 这种方式。这个 post 是旧的,但问题似乎仍然存在。
在 python 中做这种事情不太可能有效。这是因为文档必须往返并通过客户端计算机上的 python 函数。
在您的示例代码中,您将函数的结果传递给 mongodb update
查询,这将不起作用。您不能 运行 数据库服务器上 mongodb 查询中的任何 python 代码。
正如 answer to you linked question 所暗示的,此类操作必须在 mongo shell 中执行。例如:
db.collection.find().snapshot().forEach(
function (elem) {
splitLength = elem.text.split(" ").length
db.collection.update(
{
_id: elem._id
},
{
$set: {
split: splitLength
}
}
);
}
);
我找到了一种在 PyMongo 中使用 parallel_scan 在集合上调用任何自定义 python 函数的方法。
def process_text(cursor):
for row in cursor.batch_size(200):
# Any complex preprocessing here...
split_text = row['text'].split()
db.collection.update_one({'_id': row['_id']},
{'$set': {'split_text': split_text,
'num_words': len(split_text) }},
upsert=True)
def preprocess(num_threads=4):
# Get up to max 'num_threads' cursors.
cursors = db.collection.parallel_scan(num_threads)
threads = [threading.Thread(target=process_text, args=(cursor,)) for cursor in cursors]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
这并不比 cursor.forEach
快(但也没有那么慢),但它可以帮助我执行任意复杂的 python 代码并在 Python 本身中保存结果.
此外,如果我在其中一个属性中有一个 ints
数组,执行 cursor.forEach
会将它们转换为我不想要的 floats
。所以我更喜欢这种方式。
但我很高兴知道是否有比这更好的方法:)