PyMongo 中的 MapReduce

MapReduce in PyMongo

我的 Mongo collection : Impressions 有以下格式的文档:-

   {
        _uid: 10,
        "impressions": [
            {
                "pos": 6,
                "id": 123,
                "service": "furniture"
            },
            {
                "pos": 0,
                "id": 128,
                "service": "electronics"
            },
            {
                "pos": 2,
                "id": 127,
                "service": "furniture"
            },
            {
                "pos": 2,
                "id": 125,
                "service": "electronics"
            },
            {
                "pos": 10,
                "id": 124,
                "service": "electronics"
            }
        ]
      },
     {
        _uid: 11,
        "impressions": [
            {
                "pos": 1,
                "id": 124,
                "service": "furniture"
            },
            {
                "pos": 10,
                "id": 124,
                "service": "electronics"
            },
            {
                "pos": 1,
                "id": 123,
                "service": "furniture"
            },
            {
                "pos": 21,
                "id": 122,
                "service": "furniture"
            },
            {
                "pos": 3,
                "id": 125,
                "service": "electronics"
            },
            {
                "pos": 10,
                "id": 121,
                "service": "electronics"
            }
            ]
         },
            .
            .
            .
            .
            .

collection 中的每个文档都有 "impressions" 键,这是一个字典数组。在每个字典中,"id" 是实体的 ID,"service" 是服务类型,"pos" 是项目在搜索页面结果中的位置。我的目标是找出每个类别中每个 "id" 的展示次数。 因此,对于 "service" == "furniture" 的上述数据,我希望将其作为聚合结果:-

[
{"id": 123,"impressions_count":2},
{"id": 127,"impressions_count":1},
{"id": 124,"impressions_count":1},
{"id": 122,"impressions_count":1}
]

我尝试通过 python 脚本中的以下函数使用 MAPREDUCE 在 "id" 上进行聚合

def fetch_impressions():
    try:
        imp_collection = get_mongo_connection('Impressions')
        map = Code("""
                function(){
                    for( x in this.impressions){
                        var flat_id = x['id'];
                        var service_type = x['service']
                        emit(parseInt(flat_id),1);
                        }
                    };
                """)

                        """)
        reduce = Code("""
                        function(a,b){
                            return Array.sum(b);
                            };
                        """)

        results = imp_collection.map_reduce(map, reduce, 'aggregation_result')
        return results
    except Exception as e:
        raise Exception(e)

但我得到的结果是 None,可能是因为地图有误 function.I 是 Javascript 的新手,Mongo 请帮忙!

您可以使用 aggregation framework

import pymongo
conn = pymongo.MongoClient()
db = conn.test
col =  db.collection

for doc in col.aggregate([{'$unwind': '$impressions'}, 
    {'$match': {'impressions.service': 'furniture'}}, 
    {'$group': {'_id': '$impressions.id', 'impressions_count': {'$sum': 1}}}, 
    ]):
    print(doc)

或使用 $map and the $setDifference 运算符更有效。

col.aggregate([
    { "$project": { "impressions": {"$setDifference": [{ "$map": { "input": "$impressions", "as": "imp", "in": { "$cond": { "if": { "$eq": [ "$$imp.service", "furniture" ] }, "then": "$$imp.id", "else": 0 }}}}, [0]]}}}, 
    { "$unwind": "$impressions" }, 
    { "$group": { "_id": "$impressions", "impressions_count": { "$sum": 1 }}}
])

产生:

{'_id': 122.0, 'impressions_count': 1}
{'_id': 124.0, 'impressions_count': 1}
{'_id': 127.0, 'impressions_count': 1}
{'_id': 123.0, 'impressions_count': 2}

我制作了一个工具,让你 运行 MongoDB Map/Reduce 在 Python

https://mreduce.com

import random
import threading

import bson
import pymongo

import mreduce


mongo_client = pymongo.MongoClient("mongodb://your_mongodb_server")

def map_func(document):
    for impression in document["impressions"]:
        yield document["id"], 1

def reduce_func(id, prices):
    return sum(prices)

worker_functions = {
    "exampleMap": map_func,
    "exampleReduce": reduce_func
}

api = mreduce.API(
    api_key = "...",
    mongo_client = mongo_client
)

project_id = "..."

thread = threading.Thread(
    target=api.run,
    args=[project_id, worker_functions]
)
thread.start()

job = api.submit_job(
    projectId=project["_id"],
    mapFunctionName="exampleMap",
    reduceFunctionName="exampleReduce",
    inputDatabase="db",
    inputCollection="impressions",
    outputDatabase="db",
    outputCollection="impressions_results"
)
result = job.wait_for_result()
for key, value in result:
    print("Key: " + key, ", Value: " + str(value))