Python 3 asyncio: run_until_complete() 在等待 ProcessPoolExecutor 作业完成时阻塞
Python 3 asyncio: run_until_complete() blocks when waiting for ProcessPoolExecutor job done
我正在尝试使用 ProcessPoolExecutor() 将用于测试自动化的 TCP echo client and server 合并到单个模块中,它按预期工作。
唯一的问题是我无法完成事件循环。我可以看到执行程序目标 run_client() 最后一行的调试输出,但看起来执行程序本身仍然阻塞。
代码:
import asyncio
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
async def server_handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message, addr))
print("Send: %r" % message)
writer.write(data)
await writer.drain()
print("Close the client socket")
writer.close()
async def echo_client_handler(message, loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = await reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
def run_client():
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(echo_client_handler(message, loop))
loop.close()
print('run_client last line')
executor = ProcessPoolExecutor(1)
loop = asyncio.get_event_loop()
coro = asyncio.start_server(server_handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
#loop.run_forever()
client = asyncio.ensure_future(loop.run_in_executor(executor, run_client))
loop.run_until_complete(client)
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
输出:
Serving on ('127.0.0.1', 8888)
Send: 'Hello World!'
Received 'Hello World!' from ('127.0.0.1', 51157)
Send: 'Hello World!'
Close the client socket
Received: 'Hello World!'
Close the socket
run_client last line
在此输出之后,它会进入等待 IO 的消息循环。
期待您的帮助。抱歉,我是一日异步主义者 :)
您不能在子进程中为您的客户端 运行 使用相同的事件循环,您需要一个 new loop:
def run_client():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
[...]
我正在尝试使用 ProcessPoolExecutor() 将用于测试自动化的 TCP echo client and server 合并到单个模块中,它按预期工作。
唯一的问题是我无法完成事件循环。我可以看到执行程序目标 run_client() 最后一行的调试输出,但看起来执行程序本身仍然阻塞。
代码:
import asyncio
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
async def server_handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message, addr))
print("Send: %r" % message)
writer.write(data)
await writer.drain()
print("Close the client socket")
writer.close()
async def echo_client_handler(message, loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = await reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
def run_client():
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(echo_client_handler(message, loop))
loop.close()
print('run_client last line')
executor = ProcessPoolExecutor(1)
loop = asyncio.get_event_loop()
coro = asyncio.start_server(server_handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
#loop.run_forever()
client = asyncio.ensure_future(loop.run_in_executor(executor, run_client))
loop.run_until_complete(client)
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
输出:
Serving on ('127.0.0.1', 8888)
Send: 'Hello World!'
Received 'Hello World!' from ('127.0.0.1', 51157)
Send: 'Hello World!'
Close the client socket
Received: 'Hello World!'
Close the socket
run_client last line
在此输出之后,它会进入等待 IO 的消息循环。
期待您的帮助。抱歉,我是一日异步主义者 :)
您不能在子进程中为您的客户端 运行 使用相同的事件循环,您需要一个 new loop:
def run_client():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
[...]