PyMongo 中的 MapReduce
MapReduce in PyMongo
我的 Mongo collection : Impressions
有以下格式的文档:-
{
_uid: 10,
"impressions": [
{
"pos": 6,
"id": 123,
"service": "furniture"
},
{
"pos": 0,
"id": 128,
"service": "electronics"
},
{
"pos": 2,
"id": 127,
"service": "furniture"
},
{
"pos": 2,
"id": 125,
"service": "electronics"
},
{
"pos": 10,
"id": 124,
"service": "electronics"
}
]
},
{
_uid: 11,
"impressions": [
{
"pos": 1,
"id": 124,
"service": "furniture"
},
{
"pos": 10,
"id": 124,
"service": "electronics"
},
{
"pos": 1,
"id": 123,
"service": "furniture"
},
{
"pos": 21,
"id": 122,
"service": "furniture"
},
{
"pos": 3,
"id": 125,
"service": "electronics"
},
{
"pos": 10,
"id": 121,
"service": "electronics"
}
]
},
.
.
.
.
.
collection 中的每个文档都有 "impressions"
键,这是一个字典数组。在每个字典中,"id"
是实体的 ID,"service"
是服务类型,"pos"
是项目在搜索页面结果中的位置。我的目标是找出每个类别中每个 "id"
的展示次数。
因此,对于 "service"
== "furniture"
的上述数据,我希望将其作为聚合结果:-
[
{"id": 123,"impressions_count":2},
{"id": 127,"impressions_count":1},
{"id": 124,"impressions_count":1},
{"id": 122,"impressions_count":1}
]
我尝试通过 python 脚本中的以下函数使用 MAPREDUCE 在 "id" 上进行聚合
def fetch_impressions():
try:
imp_collection = get_mongo_connection('Impressions')
map = Code("""
function(){
for( x in this.impressions){
var flat_id = x['id'];
var service_type = x['service']
emit(parseInt(flat_id),1);
}
};
""")
""")
reduce = Code("""
function(a,b){
return Array.sum(b);
};
""")
results = imp_collection.map_reduce(map, reduce, 'aggregation_result')
return results
except Exception as e:
raise Exception(e)
但我得到的结果是 None,可能是因为地图有误 function.I 是 Javascript 的新手,Mongo 请帮忙!
您可以使用 aggregation framework
import pymongo
conn = pymongo.MongoClient()
db = conn.test
col = db.collection
for doc in col.aggregate([{'$unwind': '$impressions'},
{'$match': {'impressions.service': 'furniture'}},
{'$group': {'_id': '$impressions.id', 'impressions_count': {'$sum': 1}}},
]):
print(doc)
或使用 $map
and the $setDifference
运算符更有效。
col.aggregate([
{ "$project": { "impressions": {"$setDifference": [{ "$map": { "input": "$impressions", "as": "imp", "in": { "$cond": { "if": { "$eq": [ "$$imp.service", "furniture" ] }, "then": "$$imp.id", "else": 0 }}}}, [0]]}}},
{ "$unwind": "$impressions" },
{ "$group": { "_id": "$impressions", "impressions_count": { "$sum": 1 }}}
])
产生:
{'_id': 122.0, 'impressions_count': 1}
{'_id': 124.0, 'impressions_count': 1}
{'_id': 127.0, 'impressions_count': 1}
{'_id': 123.0, 'impressions_count': 2}
我制作了一个工具,让你 运行 MongoDB Map/Reduce 在 Python
import random
import threading
import bson
import pymongo
import mreduce
mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")
def map_func(document):
for impression in document["impressions"]:
yield document["id"], 1
def reduce_func(id, prices):
return sum(prices)
worker_functions = {
"exampleMap": map_func,
"exampleReduce": reduce_func
}
api = mreduce.API(
api_key = "...",
mongo_client = mongo_client
)
project_id = "..."
thread = threading.Thread(
target=api.run,
args=[project_id, worker_functions]
)
thread.start()
job = api.submit_job(
projectId=project["_id"],
mapFunctionName="exampleMap",
reduceFunctionName="exampleReduce",
inputDatabase="db",
inputCollection="impressions",
outputDatabase="db",
outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
print("Key: " + key, ", Value: " + str(value))
我的 Mongo collection : Impressions
有以下格式的文档:-
{
_uid: 10,
"impressions": [
{
"pos": 6,
"id": 123,
"service": "furniture"
},
{
"pos": 0,
"id": 128,
"service": "electronics"
},
{
"pos": 2,
"id": 127,
"service": "furniture"
},
{
"pos": 2,
"id": 125,
"service": "electronics"
},
{
"pos": 10,
"id": 124,
"service": "electronics"
}
]
},
{
_uid: 11,
"impressions": [
{
"pos": 1,
"id": 124,
"service": "furniture"
},
{
"pos": 10,
"id": 124,
"service": "electronics"
},
{
"pos": 1,
"id": 123,
"service": "furniture"
},
{
"pos": 21,
"id": 122,
"service": "furniture"
},
{
"pos": 3,
"id": 125,
"service": "electronics"
},
{
"pos": 10,
"id": 121,
"service": "electronics"
}
]
},
.
.
.
.
.
collection 中的每个文档都有 "impressions"
键,这是一个字典数组。在每个字典中,"id"
是实体的 ID,"service"
是服务类型,"pos"
是项目在搜索页面结果中的位置。我的目标是找出每个类别中每个 "id"
的展示次数。
因此,对于 "service"
== "furniture"
的上述数据,我希望将其作为聚合结果:-
[
{"id": 123,"impressions_count":2},
{"id": 127,"impressions_count":1},
{"id": 124,"impressions_count":1},
{"id": 122,"impressions_count":1}
]
我尝试通过 python 脚本中的以下函数使用 MAPREDUCE 在 "id" 上进行聚合
def fetch_impressions():
try:
imp_collection = get_mongo_connection('Impressions')
map = Code("""
function(){
for( x in this.impressions){
var flat_id = x['id'];
var service_type = x['service']
emit(parseInt(flat_id),1);
}
};
""")
""")
reduce = Code("""
function(a,b){
return Array.sum(b);
};
""")
results = imp_collection.map_reduce(map, reduce, 'aggregation_result')
return results
except Exception as e:
raise Exception(e)
但我得到的结果是 None,可能是因为地图有误 function.I 是 Javascript 的新手,Mongo 请帮忙!
您可以使用 aggregation framework
import pymongo
conn = pymongo.MongoClient()
db = conn.test
col = db.collection
for doc in col.aggregate([{'$unwind': '$impressions'},
{'$match': {'impressions.service': 'furniture'}},
{'$group': {'_id': '$impressions.id', 'impressions_count': {'$sum': 1}}},
]):
print(doc)
或使用 $map
and the $setDifference
运算符更有效。
col.aggregate([
{ "$project": { "impressions": {"$setDifference": [{ "$map": { "input": "$impressions", "as": "imp", "in": { "$cond": { "if": { "$eq": [ "$$imp.service", "furniture" ] }, "then": "$$imp.id", "else": 0 }}}}, [0]]}}},
{ "$unwind": "$impressions" },
{ "$group": { "_id": "$impressions", "impressions_count": { "$sum": 1 }}}
])
产生:
{'_id': 122.0, 'impressions_count': 1}
{'_id': 124.0, 'impressions_count': 1}
{'_id': 127.0, 'impressions_count': 1}
{'_id': 123.0, 'impressions_count': 2}
我制作了一个工具,让你 运行 MongoDB Map/Reduce 在 Python
import random
import threading
import bson
import pymongo
import mreduce
mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")
def map_func(document):
for impression in document["impressions"]:
yield document["id"], 1
def reduce_func(id, prices):
return sum(prices)
worker_functions = {
"exampleMap": map_func,
"exampleReduce": reduce_func
}
api = mreduce.API(
api_key = "...",
mongo_client = mongo_client
)
project_id = "..."
thread = threading.Thread(
target=api.run,
args=[project_id, worker_functions]
)
thread.start()
job = api.submit_job(
projectId=project["_id"],
mapFunctionName="exampleMap",
reduceFunctionName="exampleReduce",
inputDatabase="db",
inputCollection="impressions",
outputDatabase="db",
outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
print("Key: " + key, ", Value: " + str(value))