正确设计 Multiprocessing.Manager 自定义对象
Properly Designing a Multiprocessing.Manager Custom Object
我想使用 multiprocessing.Manager() 对象,这样我就可以将信息从工作人员异步发送到管理器,从而将信息发送到服务器。我有大约 10 个实例将 PDF 写入磁盘。然后我想使用多处理包中的管理器对象将该数据发送到我的 S3 存储桶,因为我不想阻止本地内容生成。
所以我想知道如果我创建一个自定义管理器对象,这是执行此操作的正确方法吗?提交给管理器对象的每个进程都会排队吗?或者如果我调用多个上传,管理员会挂断一些调用吗?
下面是我想做的事情的示例代码:
from multiprocessing.managers import BaseManager
class UploadClass(object):
def upload(self, filePath, params, destUrl):
# do stuff
return results
class MyManager(BaseManager):
pass
MyManager.register('uploads', UploadClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
upload = manager.uploads()
# do this wait for completion or do they perform this async
print upload.upload(r"< path >", {...}, "some url")
print upload.upload(r"< path >", {...}, "some url")
直接回答您的一些问题:
Will each process submitted to the manager object get queued?
Manager
服务器生成一个新线程来处理每个传入请求,因此您的所有请求都将立即开始处理。你可以在 multiprocessing/managers.py
:
里面看到这个
def serve_forever(self):
'''
Run the server forever
'''
current_process()._manager_server = self
try:
try:
while 1:
try:
c = self.listener.accept()
except (OSError, IOError):
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
t.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.stop = 999
self.listener.close()
if I call multiple uploads, will the manager drop some of the calls?
不,none 个呼叫将被挂断。
# do this wait for completion or do they perform this async
print upload.upload(r"< path >", {...}, "some url")
print upload.upload(r"< path >", {...}, "some url")
对 upload.upload
的两次调用将是同步的;在 UploadClass.upload
完成之前,他们不会 return。但是,如果您有多个 scripts/threads/processes 并发调用 upload.upload
,则每个唯一的调用将同时发生在 Manager
服务器进程中它自己的线程内。
你最重要的问题是:
is this the proper way to do this?
如果我正确理解这个问题,我会说不。如果您只有一个脚本,然后在该脚本中生成十个 multiprocessing.Process
实例来写出 PDF,那么您应该只使用另一个 multiprocessing.Process
来处理上传:
def upload(self, q):
for payload in iter(q.get, None): # Keep getting from the queue until a None is found
filePath, params, destUrl = payload
# do stuff
def write_pdf(pdf_file_info, q):
# write a pdf to disk here
q.put((filepath, params, destUrl)) # Send work to the uploader
# Move on with whatever comes next.
if __name__ == '__main__':
pdf_queue = multiprocessing.Queue()
# Start uploader
upload_proc = multiprocessing.Process(upload, args=(pdf_queue,))
upload_proc.start()
# Start pdf writers
procs = []
for pdf in pdfs_to_write:
p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue))
p.start()
p.append(procs)
# Wait for pdf writers and uploader to finish.
for p in procs:
p.join()
pdf_queue.put(None) # Sending None breaks the for loop inside upload
upload_proc.join()
如果您真的可以并发上传,那么根本不需要单独的 upload
过程 - 直接从 pdf 编写过程上传即可。
不过,很难从你的问题中判断这是否正是你正在做的。一旦您澄清,我将调整最后一块以适合您的特定用例。
我想使用 multiprocessing.Manager() 对象,这样我就可以将信息从工作人员异步发送到管理器,从而将信息发送到服务器。我有大约 10 个实例将 PDF 写入磁盘。然后我想使用多处理包中的管理器对象将该数据发送到我的 S3 存储桶,因为我不想阻止本地内容生成。
所以我想知道如果我创建一个自定义管理器对象,这是执行此操作的正确方法吗?提交给管理器对象的每个进程都会排队吗?或者如果我调用多个上传,管理员会挂断一些调用吗?
下面是我想做的事情的示例代码:
from multiprocessing.managers import BaseManager
class UploadClass(object):
def upload(self, filePath, params, destUrl):
# do stuff
return results
class MyManager(BaseManager):
pass
MyManager.register('uploads', UploadClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
upload = manager.uploads()
# do this wait for completion or do they perform this async
print upload.upload(r"< path >", {...}, "some url")
print upload.upload(r"< path >", {...}, "some url")
直接回答您的一些问题:
Will each process submitted to the manager object get queued?
Manager
服务器生成一个新线程来处理每个传入请求,因此您的所有请求都将立即开始处理。你可以在 multiprocessing/managers.py
:
def serve_forever(self):
'''
Run the server forever
'''
current_process()._manager_server = self
try:
try:
while 1:
try:
c = self.listener.accept()
except (OSError, IOError):
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
t.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.stop = 999
self.listener.close()
if I call multiple uploads, will the manager drop some of the calls?
不,none 个呼叫将被挂断。
# do this wait for completion or do they perform this async print upload.upload(r"< path >", {...}, "some url") print upload.upload(r"< path >", {...}, "some url")
对 upload.upload
的两次调用将是同步的;在 UploadClass.upload
完成之前,他们不会 return。但是,如果您有多个 scripts/threads/processes 并发调用 upload.upload
,则每个唯一的调用将同时发生在 Manager
服务器进程中它自己的线程内。
你最重要的问题是:
is this the proper way to do this?
如果我正确理解这个问题,我会说不。如果您只有一个脚本,然后在该脚本中生成十个 multiprocessing.Process
实例来写出 PDF,那么您应该只使用另一个 multiprocessing.Process
来处理上传:
def upload(self, q):
for payload in iter(q.get, None): # Keep getting from the queue until a None is found
filePath, params, destUrl = payload
# do stuff
def write_pdf(pdf_file_info, q):
# write a pdf to disk here
q.put((filepath, params, destUrl)) # Send work to the uploader
# Move on with whatever comes next.
if __name__ == '__main__':
pdf_queue = multiprocessing.Queue()
# Start uploader
upload_proc = multiprocessing.Process(upload, args=(pdf_queue,))
upload_proc.start()
# Start pdf writers
procs = []
for pdf in pdfs_to_write:
p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue))
p.start()
p.append(procs)
# Wait for pdf writers and uploader to finish.
for p in procs:
p.join()
pdf_queue.put(None) # Sending None breaks the for loop inside upload
upload_proc.join()
如果您真的可以并发上传,那么根本不需要单独的 upload
过程 - 直接从 pdf 编写过程上传即可。
不过,很难从你的问题中判断这是否正是你正在做的。一旦您澄清,我将调整最后一块以适合您的特定用例。