使用 python 多处理从 mongodb 读取和删除

Read & delete from mongodb with python multiprocessing

我有一个 mongodb 和一个集合 incoming 和一个集合 target。工作进程当前正在执行以下操作(已简化):

def worker(number):
    incomings = db.incoming.find()
    buffersize=5
    readcounter=0
    for incoming in incomings:
        readcounter+=1
        documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
        documentsToInsert.append(incoming)
        documentToDelete={'_id':incoming["_id"]}
        documentsToDelete.append(documentToDelete)
        if readcounter >= readbuffer:
            readcounter=0
            db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
            db.target.insert_many([docs for docs in documentsToInsert],ordered=False)

当然,remove 和 insert_many 语句被 try/except 包围。

由于数据传入速度快于 the/one 工作人员处理数据的速度,我需要变得更快,例如通过在所有 cpu 上生成它,为了提高效率无论如何都应该这样做。我正在通过以下代码执行此操作:

if __name__== "__main__":
    procs=[]
    number=0
    for cpu in range(multiprocessing.cpu_count()):
        procs.append(multiprocessing.Process(target = worker, args = (number,)))
        number+=1
    for proc in procs:
        proc.start()
    for proc in procs:
        proc.join()
    print("=====================FIN=========================")

问题是,当一个线程正在读取buffersize个文档时,其他线程获取相同的文档,导致只有一个线程成功插入target,其他线程的困境正在产生重复键异常。这种效果使只有一个过程有用。没有多线程,remove/insert_many 组合工作正常,我可以轻松地使用更高的缓冲区大小。

我考虑过使用附加字段将数据插入 incoming 以使工作人员 number 符合条件,但这会占用额外的磁盘 space 并占用额外的处理时间,另外,在生成的时候,我不知道有多少工人在处理数据。

我已经尝试在每个线程中休眠一个随机时间,但这是完全不可预测的,并且不能防止错误本身。

我该怎么做才能使所有线程处理不同的数据?

根据我的评论,我认为使用 RabbitMQ 之类的消息代理最适合您的用例。使用 RabbitMQ 和类似的消息代理(我没有使用过 0mq),您不需要提供其他线程,只需根据需要启动任意数量的线程,每个线程订阅,代理将依次传递消息。

谢谢@Belly Buster for the idea of decoupling the treatments by using *MQ. I've solved it by using ZeroMQ, which is brokerless, but in this case, I've implemented a load balancing broker, based on the Load balancing broker example for ZeroMQ。客户端正在从数据库中读取,而工作人员正在处理他们通过 ZeroMQ 获得的条目。我试图在代码中添加一些全面的注释,以便阐明几点。该代码缺少一些我编写的实用程序 类,它们不属于该解决方案;这段代码只是为了回答这个问题,希望有人觉得它有用。

"""
Original Author: Brandon Carpenter (hashstat) <brandon(dot)carpenter(at)pnnl(dot)gov>
This code was part of the ZeroMQ Tutorial and implements the Load-balancing broker pattern.
Modified by @https://whosebug.com/users/2226028/michael
"""

from __future__ import print_function

import multiprocessing
import zmq
import io
import pymongo
from pymongo import MongoClient
import time
from pprint import pprint
import ast
import json
from bson.json_util import dumps
from datetime import datetime
from PairConfig import PairConfig
from PairController import PairController
import ctypes
import sys
from random import randint

NBR_CLIENTS = 1
NBR_WORKERS = 3

# Load the configuration file
# this is a configuration class which is not documented here
pairConfig=PairConfig("verify.ini")

# multiprocessing shared variables setup
manager = multiprocessing.Manager()
insertbuffer=manager.list()
deletebuffer=manager.list()
totalcounter=multiprocessing.Value(ctypes.c_int,0)

def client_task(ident):
    try:
        """Basic request-reply client using REQ socket."""
        client = MongoClient(pairConfig.config.get('db','url'))
        db=client.databasename
        socket = zmq.Context().socket(zmq.REQ)
        socket.identity = u"Client-{}".format(ident).encode("ascii")
        socket.connect("ipc://frontend.ipc")

        while True:
            incomings = db.incoming.find()
            # this makes it safe(r) to run this on different nodes
            incomings.skip(randint(randint(1,500),randint(5000,500000)))
            for incoming in incomings:
                pair = {'primarykey' : incoming["primarykey"], 'value' : incoming["value"]}
                # Send request, get reply
                socket.send_string(b"%s" % pair)
                reply = socket.recv()
    except KeyboardInterrupt:
        print("\nexit client")

def worker_task(ident,insertbuffer,deletebuffer,mylock):
    try:
        """Worker task, using a REQ socket to do load-balancing."""
        socket = zmq.Context().socket(zmq.REQ)
        socket.identity = u"Worker-{}".format(ident).encode("ascii")
        socket.connect("ipc://backend.ipc")

        socket.send(b"READY")

        # this is a helper class which is not documented here
        pairController=PairController(pairConfig)
        while True:
            address, empty, request = socket.recv_multipart()
            with totalcounter.get_lock():
                totalcounter.value+=1
            dictToInsert = ast.literal_eval(request.encode("ascii"))
            dictToInsert["last_checked"]=datetime.now()
            insertbuffer.append(dictToInsert)
            deletebuffer.append(dictToInsert["primarykey"])
            # ... do some timely treatment here - a lot of variable time gets burned here ...
            # result will be result1 and result2, for the sake of simplification I will fill it with random numbers here
            result1=randint(1,10)
            result2=randint(1,10)
            sys.stdout.write("%s %s insertbuffer: %d, deletebuffer: %d, totalcounter: %d, b: %s, r: %s            \r" % (socket.identity.decode("ascii"),dictToInsert["primarykey"],len(insertbuffer),len(deletebuffer),totalcounter.value,result1,result2))
            sys.stdout.flush()
            # readbuffer comes from an ini file ... I chose 500 for now
            if len(insertbuffer[:]) >= int(pairConfig.config.get('verify','readbuffer')) and ident==0:
                mylock.acquire()
                # these 2 methods are inside a class pairController which is not documented here,
                # it's basically one method for insert_many() and one method for remove(), 
                # each time with the respective buffer as a filter
                pairController.storePairs("history",insertbuffer[:])
                pairController.deletePairs("history",deletebuffer[:])
                # this empties the buffers for all filters:
                insertbuffer[:]=[]
                deletebuffer[:]=[]
                mylock.release()
            socket.send_multipart([address, b"", b"ok"])
    except KeyboardInterrupt:
        print("\nexit worker")

def main():
    """Load balancer main loop."""
    # Prepare context and sockets
    context = zmq.Context.instance()
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("ipc://frontend.ipc")
    backend = context.socket(zmq.ROUTER)
    backend.bind("ipc://backend.ipc")

    # Start background tasks
    mylock = multiprocessing.Lock()
    def start(task, *args):
        process = multiprocessing.Process(target=task, args=args)
        process.daemon = True
        process.start()
    for i in range(NBR_CLIENTS):
        start(client_task, i)
    for i in range(NBR_WORKERS):
        start(worker_task, i, insertbuffer, deletebuffer, mylock)

    # Initialize main loop state
    count = NBR_CLIENTS
    workers = []
    poller = zmq.Poller()
    # Only poll for requests from backend until workers are available
    poller.register(backend, zmq.POLLIN)

    while True:
        sockets = dict(poller.poll())

        if backend in sockets:
            # Handle worker activity on the backend
            request = backend.recv_multipart()
            worker, empty, client = request[:3]
            if not workers:
                # Poll for clients now that a worker is available
                poller.register(frontend, zmq.POLLIN)
            workers.append(worker)
            if client != b"READY" and len(request) > 3:
                # If client reply, send rest back to frontend
                empty, reply = request[3:]
                frontend.send_multipart([client, b"", reply])
                count -= 1

        if frontend in sockets:
            # Get next client request, route to last-used worker
            client, empty, request = frontend.recv_multipart()
            worker = workers.pop(0)
            backend.send_multipart([worker, b"", client, b"", request])
            if not workers:
                # Don't poll clients if no workers are available
                poller.unregister(frontend)

    # Clean up
    backend.close()
    frontend.close()
    context.term()

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\nexit main")