跨多进程共享基于异步等待协程的复杂对象
Share async-await coroutine based complex object across multiprocess
我知道一般来说,对象不应该在多进程之间共享以及由此产生的问题。但是我的要求是必须这样做。
我有一个复杂的对象,里面有所有漂亮的协程 async-await。
运行 在此对象上的 运行 长进程的函数在其自身的单独进程中。现在,我想在主进程中 运行 一个 IPython shell 并在这个复杂的对象上操作,而那个长 运行ning 进程在另一个进程中 运行ning过程。
为了跨进程共享这个复杂的对象,我尝试了在 SO 上遇到的多处理 BaseManager 方法:
import multiprocessing
import multiprocessing.managers as m
class MyManager(m.BaseManager):
pass
MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
manager.start()
c = manager.complex_asynio_based_class()
process = multiprocessing.Process(
target=long_running_process,
args=(c,),
)
但这给出了错误:
Unserializable message: Traceback (most recent call last):
File "/usr/3.6/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
send(msg)
File "/usr/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects
它不起作用,因为对象中有协程。我想不出更好的解决方案来让它工作,我一直坚持下去。
如果不是 Python,我会在 运行ning 进程中生成一个线程,并且仍然能够对其进行操作。
如果我没记错的话,这应该是多进程应用程序的常见模式 运行 后台进程和仅对其执行一些只读操作的主进程,就像我的情况一样,并且不修改它。我想知道一般是怎么做的?
如何在多进程之间共享无法拾取的复杂对象?
运行 协程无法在进程之间自动共享,因为协程在拥有异步 class 的进程中的特定事件循环内运行。协同程序的状态无法被 pickle,即使可以,它在事件循环的上下文之外也没有意义。
您可以为异步 class 创建一个基于回调的适配器,每个协程方法都由具有 "start doing X and call this function when done" 语义的基于回调的方法表示。如果回调是多处理感知的,则可以从其他进程调用这些操作。然后,您可以在 each 进程中启动一个事件循环,并在代理的基于回调的调用上创建一个协程外观。
例如,考虑一个简单的异步 class:
class Async:
async def repeat(self, n, s):
for i in range(n):
print(s, i, os.getpid())
await asyncio.sleep(.2)
return s
基于回调的适配器可以使用 public asyncio
API 将 repeat
协程转换为 classic 异步函数 JavaScript "callback hell" 风格:
class CallbackAdapter:
def repeat_start(self, n, s, on_success):
fut = asyncio.run_coroutine_threadsafe(
self._async.repeat(n, s), self._loop)
# Once the coroutine is done, notify the caller.
fut.add_done_callback(lambda _f: on_success(fut.result()))
(可以自动转换,上面手工写的代码只是概念。)
CallbackAdapter
可以注册到多处理,因此不同的进程可以通过多处理提供的代理启动适配器的方法(以及因此启动原始异步协程)。这只要求作为 on_success
传递的回调是多处理友好的。
作为最后一步,可以绕一圈,为基于回调的 API (!) 创建一个异步适配器,在另一个进程中启动一个事件循环,并利用异步和 async def
。这个适配器对适配器 class 将运行一个功能齐全的 repeat
协程,它可以有效地代理原始 Async.repeat
协程,而无需尝试 pickle 协程状态。
下面是上述方法的示例实现:
import asyncio, multiprocessing.managers, threading, os
class Async:
# The async class we are bridging. This class is unaware of multiprocessing
# or of any of the code that follows.
async def repeat(self, n, s):
for i in range(n):
print(s, i, 'pid', os.getpid())
await asyncio.sleep(.2)
return s
def start_asyncio_thread():
# Since the manager controls the main thread, we have to spin up the event
# loop in a dedicated thread and use asyncio.run_coroutine_threadsafe to
# submit stuff to the loop.
setup_done = threading.Event()
loop = None
def loop_thread():
nonlocal loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
setup_done.set()
loop.run_forever()
threading.Thread(target=loop_thread).start()
setup_done.wait()
return loop
class CallbackAdapter:
_loop = None
# the callback adapter to the async class, also running in the
# worker process
def __init__(self, obj):
self._async = obj
if CallbackAdapter._loop is None:
CallbackAdapter._loop = start_asyncio_thread()
def repeat_start(self, n, s, on_success):
# Submit a coroutine to the event loop and obtain a Task/Future. This
# is normally done with loop.create_task, but repeat_start will be
# called from the main thread, owned by the multiprocessng manager,
# while the event loop will run in a separate thread.
future = asyncio.run_coroutine_threadsafe(
self._async.repeat(n, s), self._loop)
# Once the coroutine is done, notify the caller.
# We could propagate exceptions by accepting an additional on_error
# callback, and nesting fut.result() in a try/except that decides
# whether to call on_success or on_error.
future.add_done_callback(lambda _f: on_success(future.result()))
def remote_event_future(manager):
# Return a function/future pair that can be used to locally monitor an
# event in another process.
#
# The returned function and future have the following property: when the
# function is invoked, possibly in another process, the future completes.
# The function can be passed as a callback argument to a multiprocessing
# proxy object and therefore invoked by a different process.
loop = asyncio.get_event_loop()
result_pipe = manager.Queue()
future = loop.create_future()
def _wait_for_remote():
result = result_pipe.get()
loop.call_soon_threadsafe(future.set_result, result)
t = threading.Thread(target=_wait_for_remote)
t.start()
return result_pipe.put, future
class AsyncAdapter:
# The async adapter for a callback-based API, e.g. the CallbackAdapter.
# Designed to run in a different process and communicate to the callback
# adapter via a multiprocessing proxy.
def __init__(self, cb_proxy, manager):
self._cb = cb_proxy
self._manager = manager
async def repeat(self, n, s):
set_result, future = remote_event_future(self._manager)
self._cb.repeat_start(n, s, set_result)
return await future
class CommManager(multiprocessing.managers.SyncManager):
pass
CommManager.register('Async', Async)
CommManager.register('CallbackAdapter', CallbackAdapter)
def get_manager():
manager = CommManager()
manager.start()
return manager
def other_process(manager, cb_proxy):
print('other_process (pid %d)' % os.getpid())
aadapt = AsyncAdapter(cb_proxy, manager)
loop = asyncio.get_event_loop()
# Create two coroutines printing different messages, and gather their
# results.
results = loop.run_until_complete(asyncio.gather(
aadapt.repeat(3, 'message A'),
aadapt.repeat(2, 'message B')))
print('coroutine results (pid %d): %s' % (os.getpid(), results))
print('other_process (pid %d) done' % os.getpid())
def start_other_process(loop, manager, async_proxy):
cb_proxy = manager.CallbackAdapter(async_proxy)
other = multiprocessing.Process(target=other_process,
args=(manager, cb_proxy,))
other.start()
return other
def main():
loop = asyncio.get_event_loop()
manager = get_manager()
async_proxy = manager.Async()
# Create two external processes that drive coroutines in our event loop.
# Note that all messages are printed with the same PID.
start_other_process(loop, manager, async_proxy)
start_other_process(loop, manager, async_proxy)
loop.run_forever()
if __name__ == '__main__':
main()
代码在 Python 3.5 上运行正常,但由于 a bug in multiprocessing.
而在 3.6 和 3.7 上运行失败
我使用 multiprocessing 模块和 asyncio 模块已经有一段时间了。
您不在进程之间共享对象。您在一个进程中创建一个对象(引用),return 一个代理对象并与其他进程共享。其他进程使用代理对象来调用引用对象的方法。
在您的代码中,引用对象是 complex_asynio_based_class 实例。
这是您可以参考的愚蠢代码。主线程是单个异步循环 运行 UDP 服务器和其他异步操作。长 运行 过程简单地检查循环状态。
import multiprocessing
import multiprocessing.managers as m
import asyncio
import logging
import time
logging.basicConfig(filename="main.log", level=logging.DEBUG)
class MyManager(m.BaseManager):
pass
class sinkServer(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
logging.info('Data received: {!r}'.format(message))
class complex_asynio_based_class:
def __init__(self, addr=('127.0.0.1', '8080')):
self.loop = asyncio.new_event_loop()
listen = self.loop.create_datagram_endpoint(sinkServer, local_addr=addr,
reuse_address=True, reuse_port=True)
self.loop.run_until_complete(listen)
for name, delay in zip("abcdef", (1,2,3,4,5,6)):
self.loop.run_until_complete(self.slow_op(name, delay))
def run(self):
self.loop.run_forever()
def stop(self):
self.loop.stop()
def is_running(self):
return self.loop.is_running()
async def slow_op(self, name, delay):
logging.info("my name: {}".format(name))
asyncio.sleep(delay)
def long_running_process(co):
logging.debug('address: {!r}'.format(co))
logging.debug("status: {}".format(co.is_running()))
time.sleep(6)
logging.debug("status: {}".format(co.is_running()))
MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
manager.start()
c = manager.complex_asynio_based_class()
process = multiprocessing.Process(
target=long_running_process,
args=(c,),
)
process.start()
c.run() #run the loop
我知道一般来说,对象不应该在多进程之间共享以及由此产生的问题。但是我的要求是必须这样做。
我有一个复杂的对象,里面有所有漂亮的协程 async-await。 运行 在此对象上的 运行 长进程的函数在其自身的单独进程中。现在,我想在主进程中 运行 一个 IPython shell 并在这个复杂的对象上操作,而那个长 运行ning 进程在另一个进程中 运行ning过程。
为了跨进程共享这个复杂的对象,我尝试了在 SO 上遇到的多处理 BaseManager 方法:
import multiprocessing
import multiprocessing.managers as m
class MyManager(m.BaseManager):
pass
MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
manager.start()
c = manager.complex_asynio_based_class()
process = multiprocessing.Process(
target=long_running_process,
args=(c,),
)
但这给出了错误:
Unserializable message: Traceback (most recent call last):
File "/usr/3.6/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
send(msg)
File "/usr/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects
它不起作用,因为对象中有协程。我想不出更好的解决方案来让它工作,我一直坚持下去。
如果不是 Python,我会在 运行ning 进程中生成一个线程,并且仍然能够对其进行操作。
如果我没记错的话,这应该是多进程应用程序的常见模式 运行 后台进程和仅对其执行一些只读操作的主进程,就像我的情况一样,并且不修改它。我想知道一般是怎么做的?
如何在多进程之间共享无法拾取的复杂对象?
运行 协程无法在进程之间自动共享,因为协程在拥有异步 class 的进程中的特定事件循环内运行。协同程序的状态无法被 pickle,即使可以,它在事件循环的上下文之外也没有意义。
您可以为异步 class 创建一个基于回调的适配器,每个协程方法都由具有 "start doing X and call this function when done" 语义的基于回调的方法表示。如果回调是多处理感知的,则可以从其他进程调用这些操作。然后,您可以在 each 进程中启动一个事件循环,并在代理的基于回调的调用上创建一个协程外观。
例如,考虑一个简单的异步 class:
class Async:
async def repeat(self, n, s):
for i in range(n):
print(s, i, os.getpid())
await asyncio.sleep(.2)
return s
基于回调的适配器可以使用 public asyncio
API 将 repeat
协程转换为 classic 异步函数 JavaScript "callback hell" 风格:
class CallbackAdapter:
def repeat_start(self, n, s, on_success):
fut = asyncio.run_coroutine_threadsafe(
self._async.repeat(n, s), self._loop)
# Once the coroutine is done, notify the caller.
fut.add_done_callback(lambda _f: on_success(fut.result()))
(可以自动转换,上面手工写的代码只是概念。)
CallbackAdapter
可以注册到多处理,因此不同的进程可以通过多处理提供的代理启动适配器的方法(以及因此启动原始异步协程)。这只要求作为 on_success
传递的回调是多处理友好的。
作为最后一步,可以绕一圈,为基于回调的 API (!) 创建一个异步适配器,在另一个进程中启动一个事件循环,并利用异步和 async def
。这个适配器对适配器 class 将运行一个功能齐全的 repeat
协程,它可以有效地代理原始 Async.repeat
协程,而无需尝试 pickle 协程状态。
下面是上述方法的示例实现:
import asyncio, multiprocessing.managers, threading, os
class Async:
# The async class we are bridging. This class is unaware of multiprocessing
# or of any of the code that follows.
async def repeat(self, n, s):
for i in range(n):
print(s, i, 'pid', os.getpid())
await asyncio.sleep(.2)
return s
def start_asyncio_thread():
# Since the manager controls the main thread, we have to spin up the event
# loop in a dedicated thread and use asyncio.run_coroutine_threadsafe to
# submit stuff to the loop.
setup_done = threading.Event()
loop = None
def loop_thread():
nonlocal loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
setup_done.set()
loop.run_forever()
threading.Thread(target=loop_thread).start()
setup_done.wait()
return loop
class CallbackAdapter:
_loop = None
# the callback adapter to the async class, also running in the
# worker process
def __init__(self, obj):
self._async = obj
if CallbackAdapter._loop is None:
CallbackAdapter._loop = start_asyncio_thread()
def repeat_start(self, n, s, on_success):
# Submit a coroutine to the event loop and obtain a Task/Future. This
# is normally done with loop.create_task, but repeat_start will be
# called from the main thread, owned by the multiprocessng manager,
# while the event loop will run in a separate thread.
future = asyncio.run_coroutine_threadsafe(
self._async.repeat(n, s), self._loop)
# Once the coroutine is done, notify the caller.
# We could propagate exceptions by accepting an additional on_error
# callback, and nesting fut.result() in a try/except that decides
# whether to call on_success or on_error.
future.add_done_callback(lambda _f: on_success(future.result()))
def remote_event_future(manager):
# Return a function/future pair that can be used to locally monitor an
# event in another process.
#
# The returned function and future have the following property: when the
# function is invoked, possibly in another process, the future completes.
# The function can be passed as a callback argument to a multiprocessing
# proxy object and therefore invoked by a different process.
loop = asyncio.get_event_loop()
result_pipe = manager.Queue()
future = loop.create_future()
def _wait_for_remote():
result = result_pipe.get()
loop.call_soon_threadsafe(future.set_result, result)
t = threading.Thread(target=_wait_for_remote)
t.start()
return result_pipe.put, future
class AsyncAdapter:
# The async adapter for a callback-based API, e.g. the CallbackAdapter.
# Designed to run in a different process and communicate to the callback
# adapter via a multiprocessing proxy.
def __init__(self, cb_proxy, manager):
self._cb = cb_proxy
self._manager = manager
async def repeat(self, n, s):
set_result, future = remote_event_future(self._manager)
self._cb.repeat_start(n, s, set_result)
return await future
class CommManager(multiprocessing.managers.SyncManager):
pass
CommManager.register('Async', Async)
CommManager.register('CallbackAdapter', CallbackAdapter)
def get_manager():
manager = CommManager()
manager.start()
return manager
def other_process(manager, cb_proxy):
print('other_process (pid %d)' % os.getpid())
aadapt = AsyncAdapter(cb_proxy, manager)
loop = asyncio.get_event_loop()
# Create two coroutines printing different messages, and gather their
# results.
results = loop.run_until_complete(asyncio.gather(
aadapt.repeat(3, 'message A'),
aadapt.repeat(2, 'message B')))
print('coroutine results (pid %d): %s' % (os.getpid(), results))
print('other_process (pid %d) done' % os.getpid())
def start_other_process(loop, manager, async_proxy):
cb_proxy = manager.CallbackAdapter(async_proxy)
other = multiprocessing.Process(target=other_process,
args=(manager, cb_proxy,))
other.start()
return other
def main():
loop = asyncio.get_event_loop()
manager = get_manager()
async_proxy = manager.Async()
# Create two external processes that drive coroutines in our event loop.
# Note that all messages are printed with the same PID.
start_other_process(loop, manager, async_proxy)
start_other_process(loop, manager, async_proxy)
loop.run_forever()
if __name__ == '__main__':
main()
代码在 Python 3.5 上运行正常,但由于 a bug in multiprocessing.
而在 3.6 和 3.7 上运行失败我使用 multiprocessing 模块和 asyncio 模块已经有一段时间了。
您不在进程之间共享对象。您在一个进程中创建一个对象(引用),return 一个代理对象并与其他进程共享。其他进程使用代理对象来调用引用对象的方法。
在您的代码中,引用对象是 complex_asynio_based_class 实例。
这是您可以参考的愚蠢代码。主线程是单个异步循环 运行 UDP 服务器和其他异步操作。长 运行 过程简单地检查循环状态。
import multiprocessing
import multiprocessing.managers as m
import asyncio
import logging
import time
logging.basicConfig(filename="main.log", level=logging.DEBUG)
class MyManager(m.BaseManager):
pass
class sinkServer(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
logging.info('Data received: {!r}'.format(message))
class complex_asynio_based_class:
def __init__(self, addr=('127.0.0.1', '8080')):
self.loop = asyncio.new_event_loop()
listen = self.loop.create_datagram_endpoint(sinkServer, local_addr=addr,
reuse_address=True, reuse_port=True)
self.loop.run_until_complete(listen)
for name, delay in zip("abcdef", (1,2,3,4,5,6)):
self.loop.run_until_complete(self.slow_op(name, delay))
def run(self):
self.loop.run_forever()
def stop(self):
self.loop.stop()
def is_running(self):
return self.loop.is_running()
async def slow_op(self, name, delay):
logging.info("my name: {}".format(name))
asyncio.sleep(delay)
def long_running_process(co):
logging.debug('address: {!r}'.format(co))
logging.debug("status: {}".format(co.is_running()))
time.sleep(6)
logging.debug("status: {}".format(co.is_running()))
MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
manager.start()
c = manager.complex_asynio_based_class()
process = multiprocessing.Process(
target=long_running_process,
args=(c,),
)
process.start()
c.run() #run the loop