使用 python 多处理从 mongodb 读取和删除
Read & delete from mongodb with python multiprocessing
我有一个 mongodb 和一个集合 incoming
和一个集合 target
。工作进程当前正在执行以下操作(已简化):
def worker(number):
incomings = db.incoming.find()
buffersize=5
readcounter=0
for incoming in incomings:
readcounter+=1
documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
documentsToInsert.append(incoming)
documentToDelete={'_id':incoming["_id"]}
documentsToDelete.append(documentToDelete)
if readcounter >= readbuffer:
readcounter=0
db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
db.target.insert_many([docs for docs in documentsToInsert],ordered=False)
当然,remove 和 insert_many 语句被 try/except
包围。
由于数据传入速度快于 the/one 工作人员处理数据的速度,我需要变得更快,例如通过在所有 cpu 上生成它,为了提高效率无论如何都应该这样做。我正在通过以下代码执行此操作:
if __name__== "__main__":
procs=[]
number=0
for cpu in range(multiprocessing.cpu_count()):
procs.append(multiprocessing.Process(target = worker, args = (number,)))
number+=1
for proc in procs:
proc.start()
for proc in procs:
proc.join()
print("=====================FIN=========================")
问题是,当一个线程正在读取buffersize
个文档时,其他线程获取相同的文档,导致只有一个线程成功插入target
,其他线程的困境正在产生重复键异常。这种效果使只有一个过程有用。没有多线程,remove/insert_many 组合工作正常,我可以轻松地使用更高的缓冲区大小。
我考虑过使用附加字段将数据插入 incoming
以使工作人员 number
符合条件,但这会占用额外的磁盘 space 并占用额外的处理时间,另外,在生成的时候,我不知道有多少工人在处理数据。
我已经尝试在每个线程中休眠一个随机时间,但这是完全不可预测的,并且不能防止错误本身。
我该怎么做才能使所有线程处理不同的数据?
根据我的评论,我认为使用 RabbitMQ 之类的消息代理最适合您的用例。使用 RabbitMQ 和类似的消息代理(我没有使用过 0mq),您不需要提供其他线程,只需根据需要启动任意数量的线程,每个线程订阅,代理将依次传递消息。
谢谢@Belly Buster for the idea of decoupling the treatments by using *MQ. I've solved it by using ZeroMQ, which is brokerless, but in this case, I've implemented a load balancing broker, based on the Load balancing broker example for ZeroMQ。客户端正在从数据库中读取,而工作人员正在处理他们通过 ZeroMQ 获得的条目。我试图在代码中添加一些全面的注释,以便阐明几点。该代码缺少一些我编写的实用程序 类,它们不属于该解决方案;这段代码只是为了回答这个问题,希望有人觉得它有用。
"""
Original Author: Brandon Carpenter (hashstat) <brandon(dot)carpenter(at)pnnl(dot)gov>
This code was part of the ZeroMQ Tutorial and implements the Load-balancing broker pattern.
Modified by @https://whosebug.com/users/2226028/michael
"""
from __future__ import print_function
import multiprocessing
import zmq
import io
import pymongo
from pymongo import MongoClient
import time
from pprint import pprint
import ast
import json
from bson.json_util import dumps
from datetime import datetime
from PairConfig import PairConfig
from PairController import PairController
import ctypes
import sys
from random import randint
NBR_CLIENTS = 1
NBR_WORKERS = 3
# Load the configuration file
# this is a configuration class which is not documented here
pairConfig=PairConfig("verify.ini")
# multiprocessing shared variables setup
manager = multiprocessing.Manager()
insertbuffer=manager.list()
deletebuffer=manager.list()
totalcounter=multiprocessing.Value(ctypes.c_int,0)
def client_task(ident):
try:
"""Basic request-reply client using REQ socket."""
client = MongoClient(pairConfig.config.get('db','url'))
db=client.databasename
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Client-{}".format(ident).encode("ascii")
socket.connect("ipc://frontend.ipc")
while True:
incomings = db.incoming.find()
# this makes it safe(r) to run this on different nodes
incomings.skip(randint(randint(1,500),randint(5000,500000)))
for incoming in incomings:
pair = {'primarykey' : incoming["primarykey"], 'value' : incoming["value"]}
# Send request, get reply
socket.send_string(b"%s" % pair)
reply = socket.recv()
except KeyboardInterrupt:
print("\nexit client")
def worker_task(ident,insertbuffer,deletebuffer,mylock):
try:
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Worker-{}".format(ident).encode("ascii")
socket.connect("ipc://backend.ipc")
socket.send(b"READY")
# this is a helper class which is not documented here
pairController=PairController(pairConfig)
while True:
address, empty, request = socket.recv_multipart()
with totalcounter.get_lock():
totalcounter.value+=1
dictToInsert = ast.literal_eval(request.encode("ascii"))
dictToInsert["last_checked"]=datetime.now()
insertbuffer.append(dictToInsert)
deletebuffer.append(dictToInsert["primarykey"])
# ... do some timely treatment here - a lot of variable time gets burned here ...
# result will be result1 and result2, for the sake of simplification I will fill it with random numbers here
result1=randint(1,10)
result2=randint(1,10)
sys.stdout.write("%s %s insertbuffer: %d, deletebuffer: %d, totalcounter: %d, b: %s, r: %s \r" % (socket.identity.decode("ascii"),dictToInsert["primarykey"],len(insertbuffer),len(deletebuffer),totalcounter.value,result1,result2))
sys.stdout.flush()
# readbuffer comes from an ini file ... I chose 500 for now
if len(insertbuffer[:]) >= int(pairConfig.config.get('verify','readbuffer')) and ident==0:
mylock.acquire()
# these 2 methods are inside a class pairController which is not documented here,
# it's basically one method for insert_many() and one method for remove(),
# each time with the respective buffer as a filter
pairController.storePairs("history",insertbuffer[:])
pairController.deletePairs("history",deletebuffer[:])
# this empties the buffers for all filters:
insertbuffer[:]=[]
deletebuffer[:]=[]
mylock.release()
socket.send_multipart([address, b"", b"ok"])
except KeyboardInterrupt:
print("\nexit worker")
def main():
"""Load balancer main loop."""
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("ipc://frontend.ipc")
backend = context.socket(zmq.ROUTER)
backend.bind("ipc://backend.ipc")
# Start background tasks
mylock = multiprocessing.Lock()
def start(task, *args):
process = multiprocessing.Process(target=task, args=args)
process.daemon = True
process.start()
for i in range(NBR_CLIENTS):
start(client_task, i)
for i in range(NBR_WORKERS):
start(worker_task, i, insertbuffer, deletebuffer, mylock)
# Initialize main loop state
count = NBR_CLIENTS
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b"READY" and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b"", reply])
count -= 1
if frontend in sockets:
# Get next client request, route to last-used worker
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
if not workers:
# Don't poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nexit main")
我有一个 mongodb 和一个集合 incoming
和一个集合 target
。工作进程当前正在执行以下操作(已简化):
def worker(number):
incomings = db.incoming.find()
buffersize=5
readcounter=0
for incoming in incomings:
readcounter+=1
documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
documentsToInsert.append(incoming)
documentToDelete={'_id':incoming["_id"]}
documentsToDelete.append(documentToDelete)
if readcounter >= readbuffer:
readcounter=0
db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
db.target.insert_many([docs for docs in documentsToInsert],ordered=False)
当然,remove 和 insert_many 语句被 try/except
包围。
由于数据传入速度快于 the/one 工作人员处理数据的速度,我需要变得更快,例如通过在所有 cpu 上生成它,为了提高效率无论如何都应该这样做。我正在通过以下代码执行此操作:
if __name__== "__main__":
procs=[]
number=0
for cpu in range(multiprocessing.cpu_count()):
procs.append(multiprocessing.Process(target = worker, args = (number,)))
number+=1
for proc in procs:
proc.start()
for proc in procs:
proc.join()
print("=====================FIN=========================")
问题是,当一个线程正在读取buffersize
个文档时,其他线程获取相同的文档,导致只有一个线程成功插入target
,其他线程的困境正在产生重复键异常。这种效果使只有一个过程有用。没有多线程,remove/insert_many 组合工作正常,我可以轻松地使用更高的缓冲区大小。
我考虑过使用附加字段将数据插入 incoming
以使工作人员 number
符合条件,但这会占用额外的磁盘 space 并占用额外的处理时间,另外,在生成的时候,我不知道有多少工人在处理数据。
我已经尝试在每个线程中休眠一个随机时间,但这是完全不可预测的,并且不能防止错误本身。
我该怎么做才能使所有线程处理不同的数据?
根据我的评论,我认为使用 RabbitMQ 之类的消息代理最适合您的用例。使用 RabbitMQ 和类似的消息代理(我没有使用过 0mq),您不需要提供其他线程,只需根据需要启动任意数量的线程,每个线程订阅,代理将依次传递消息。
谢谢@Belly Buster for the idea of decoupling the treatments by using *MQ. I've solved it by using ZeroMQ, which is brokerless, but in this case, I've implemented a load balancing broker, based on the Load balancing broker example for ZeroMQ。客户端正在从数据库中读取,而工作人员正在处理他们通过 ZeroMQ 获得的条目。我试图在代码中添加一些全面的注释,以便阐明几点。该代码缺少一些我编写的实用程序 类,它们不属于该解决方案;这段代码只是为了回答这个问题,希望有人觉得它有用。
"""
Original Author: Brandon Carpenter (hashstat) <brandon(dot)carpenter(at)pnnl(dot)gov>
This code was part of the ZeroMQ Tutorial and implements the Load-balancing broker pattern.
Modified by @https://whosebug.com/users/2226028/michael
"""
from __future__ import print_function
import multiprocessing
import zmq
import io
import pymongo
from pymongo import MongoClient
import time
from pprint import pprint
import ast
import json
from bson.json_util import dumps
from datetime import datetime
from PairConfig import PairConfig
from PairController import PairController
import ctypes
import sys
from random import randint
NBR_CLIENTS = 1
NBR_WORKERS = 3
# Load the configuration file
# this is a configuration class which is not documented here
pairConfig=PairConfig("verify.ini")
# multiprocessing shared variables setup
manager = multiprocessing.Manager()
insertbuffer=manager.list()
deletebuffer=manager.list()
totalcounter=multiprocessing.Value(ctypes.c_int,0)
def client_task(ident):
try:
"""Basic request-reply client using REQ socket."""
client = MongoClient(pairConfig.config.get('db','url'))
db=client.databasename
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Client-{}".format(ident).encode("ascii")
socket.connect("ipc://frontend.ipc")
while True:
incomings = db.incoming.find()
# this makes it safe(r) to run this on different nodes
incomings.skip(randint(randint(1,500),randint(5000,500000)))
for incoming in incomings:
pair = {'primarykey' : incoming["primarykey"], 'value' : incoming["value"]}
# Send request, get reply
socket.send_string(b"%s" % pair)
reply = socket.recv()
except KeyboardInterrupt:
print("\nexit client")
def worker_task(ident,insertbuffer,deletebuffer,mylock):
try:
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Worker-{}".format(ident).encode("ascii")
socket.connect("ipc://backend.ipc")
socket.send(b"READY")
# this is a helper class which is not documented here
pairController=PairController(pairConfig)
while True:
address, empty, request = socket.recv_multipart()
with totalcounter.get_lock():
totalcounter.value+=1
dictToInsert = ast.literal_eval(request.encode("ascii"))
dictToInsert["last_checked"]=datetime.now()
insertbuffer.append(dictToInsert)
deletebuffer.append(dictToInsert["primarykey"])
# ... do some timely treatment here - a lot of variable time gets burned here ...
# result will be result1 and result2, for the sake of simplification I will fill it with random numbers here
result1=randint(1,10)
result2=randint(1,10)
sys.stdout.write("%s %s insertbuffer: %d, deletebuffer: %d, totalcounter: %d, b: %s, r: %s \r" % (socket.identity.decode("ascii"),dictToInsert["primarykey"],len(insertbuffer),len(deletebuffer),totalcounter.value,result1,result2))
sys.stdout.flush()
# readbuffer comes from an ini file ... I chose 500 for now
if len(insertbuffer[:]) >= int(pairConfig.config.get('verify','readbuffer')) and ident==0:
mylock.acquire()
# these 2 methods are inside a class pairController which is not documented here,
# it's basically one method for insert_many() and one method for remove(),
# each time with the respective buffer as a filter
pairController.storePairs("history",insertbuffer[:])
pairController.deletePairs("history",deletebuffer[:])
# this empties the buffers for all filters:
insertbuffer[:]=[]
deletebuffer[:]=[]
mylock.release()
socket.send_multipart([address, b"", b"ok"])
except KeyboardInterrupt:
print("\nexit worker")
def main():
"""Load balancer main loop."""
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("ipc://frontend.ipc")
backend = context.socket(zmq.ROUTER)
backend.bind("ipc://backend.ipc")
# Start background tasks
mylock = multiprocessing.Lock()
def start(task, *args):
process = multiprocessing.Process(target=task, args=args)
process.daemon = True
process.start()
for i in range(NBR_CLIENTS):
start(client_task, i)
for i in range(NBR_WORKERS):
start(worker_task, i, insertbuffer, deletebuffer, mylock)
# Initialize main loop state
count = NBR_CLIENTS
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b"READY" and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b"", reply])
count -= 1
if frontend in sockets:
# Get next client request, route to last-used worker
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
if not workers:
# Don't poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nexit main")