将 mongoengine 与多处理一起使用 - 如何关闭 mongoengine 连接?
Using mongoengine with multiprocessing - how do you close mongoengine connections?
无论我尝试什么,当我尝试在 mongo 引擎数据库上使用多处理时,我总是遇到关于不分叉活动 mongo 连接的 "MongoClient opened before fork" 警告。 standard mongo advice 似乎只能从子进程中连接到数据库,但我认为我正在做的事情在功能上应该是等效的,因为我在使用多处理之前关闭了数据库,但我仍然遇到了问题。
没有最小示例或具有不适用解决方案的相关问题是 here, here, and specifically for the case of flask/celery and here
重现问题的最小示例:
from mongoengine import connect, Document, StringField, ListField, ReferenceField
from pathos.multiprocessing import ProcessingPool
class Base(Document):
key = StringField(primary_key=True)
name = StringField()
parent = ReferenceField('Parent', required=True)
class Parent(Document):
key = StringField(primary_key=True)
name = StringField()
bases = ListField(ReferenceField('Base'))
def remove_base(key):
db = connect('mydb')
mongo_b = Base.objects().get(key=key)
mongo_b.parent.update(pull__bases=mongo_b)
mongo_b.delete()
### setup
db = connect('mydb', connect=False)
Base(key='b1', name='test', parent='p1').save()
Base(key='b2', name='test', parent='p1').save()
Base(key='b3', name='test2', parent='p1').save()
p=Parent(key='p1', name='parent').save()
p.update(add_to_set__bases='b1')
p.update(add_to_set__bases='b2')
p.update(add_to_set__bases='b3')
### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects
# close db to avoid problems?!
db.close()
del db
# parallel map removing base objects and references from the db
# warning generated here
pp = ProcessingPool(2)
pp.map(remove_base, keys)
好的,我明白了。 Mongoengine 在各处缓存到数据库的连接。如果您手动删除它们,那么问题就解决了。添加以下导入
from mongoengine import connection
然后加入:
connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}
Base._collection = None
Parent._collection = None
“#close db”部分似乎解决了问题。
完整代码:
from mongoengine import connect, Document, StringField, ListField, ReferenceField, connection
from pathos.multiprocessing import ProcessingPool
class Base(Document):
key = StringField(primary_key=True)
name = StringField()
parent = ReferenceField('Parent', required=True)
class Parent(Document):
key = StringField(primary_key=True)
name = StringField()
bases = ListField(ReferenceField('Base'))
def remove_base(key):
db = connect('mydb', connect=False)
mongo_b = Base.objects().get(key=key)
mongo_b.parent.update(pull__bases=mongo_b)
mongo_b.delete()
def setup():
Base(key='b1', name='test', parent='p1').save()
Base(key='b2', name='test', parent='p1').save()
Base(key='b3', name='test2', parent='p1').save()
p=Parent(key='p1', name='parent').save()
p.update(add_to_set__bases='b1')
p.update(add_to_set__bases='b2')
p.update(add_to_set__bases='b3')
db = connect('mydb', connect=False)
setup()
### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects
### close db to avoid problems?!
db.close()
db = None
connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}
Base._collection = None
Parent._collection = None
### parallel map removing base objects from the db
pp = ProcessingPool(2)
pp.map(remove_base, keys)
这最近得到了改进,从 MongoEngine>=0.18.0 开始,应该使用方法 disconnect()
和 disconnect_all()
分别断开 1 个或所有现有连接 (changelog 0.18.0)
见官方doc
无论我尝试什么,当我尝试在 mongo 引擎数据库上使用多处理时,我总是遇到关于不分叉活动 mongo 连接的 "MongoClient opened before fork" 警告。 standard mongo advice 似乎只能从子进程中连接到数据库,但我认为我正在做的事情在功能上应该是等效的,因为我在使用多处理之前关闭了数据库,但我仍然遇到了问题。
没有最小示例或具有不适用解决方案的相关问题是 here, here, and specifically for the case of flask/celery and here
重现问题的最小示例:
from mongoengine import connect, Document, StringField, ListField, ReferenceField
from pathos.multiprocessing import ProcessingPool
class Base(Document):
key = StringField(primary_key=True)
name = StringField()
parent = ReferenceField('Parent', required=True)
class Parent(Document):
key = StringField(primary_key=True)
name = StringField()
bases = ListField(ReferenceField('Base'))
def remove_base(key):
db = connect('mydb')
mongo_b = Base.objects().get(key=key)
mongo_b.parent.update(pull__bases=mongo_b)
mongo_b.delete()
### setup
db = connect('mydb', connect=False)
Base(key='b1', name='test', parent='p1').save()
Base(key='b2', name='test', parent='p1').save()
Base(key='b3', name='test2', parent='p1').save()
p=Parent(key='p1', name='parent').save()
p.update(add_to_set__bases='b1')
p.update(add_to_set__bases='b2')
p.update(add_to_set__bases='b3')
### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects
# close db to avoid problems?!
db.close()
del db
# parallel map removing base objects and references from the db
# warning generated here
pp = ProcessingPool(2)
pp.map(remove_base, keys)
好的,我明白了。 Mongoengine 在各处缓存到数据库的连接。如果您手动删除它们,那么问题就解决了。添加以下导入
from mongoengine import connection
然后加入:
connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}
Base._collection = None
Parent._collection = None
“#close db”部分似乎解决了问题。
完整代码:
from mongoengine import connect, Document, StringField, ListField, ReferenceField, connection
from pathos.multiprocessing import ProcessingPool
class Base(Document):
key = StringField(primary_key=True)
name = StringField()
parent = ReferenceField('Parent', required=True)
class Parent(Document):
key = StringField(primary_key=True)
name = StringField()
bases = ListField(ReferenceField('Base'))
def remove_base(key):
db = connect('mydb', connect=False)
mongo_b = Base.objects().get(key=key)
mongo_b.parent.update(pull__bases=mongo_b)
mongo_b.delete()
def setup():
Base(key='b1', name='test', parent='p1').save()
Base(key='b2', name='test', parent='p1').save()
Base(key='b3', name='test2', parent='p1').save()
p=Parent(key='p1', name='parent').save()
p.update(add_to_set__bases='b1')
p.update(add_to_set__bases='b2')
p.update(add_to_set__bases='b3')
db = connect('mydb', connect=False)
setup()
### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects
### close db to avoid problems?!
db.close()
db = None
connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}
Base._collection = None
Parent._collection = None
### parallel map removing base objects from the db
pp = ProcessingPool(2)
pp.map(remove_base, keys)
这最近得到了改进,从 MongoEngine>=0.18.0 开始,应该使用方法 disconnect()
和 disconnect_all()
分别断开 1 个或所有现有连接 (changelog 0.18.0)
见官方doc