Python: 并发等待异步协程和同步函数

Python: concurrently pending on async coroutine and synchronous function

我想在执行同步功能期间建立 SSH SOCKs 隧道(使用 asyncssh)。功能完成后我想拆除隧道并退出。

显然,某些异步函数必须 awaited 才能保持隧道正常工作,所以重要的是 conn.wait_closed() 和同步函数是同时执行的。所以我很确定我实际上需要第二个线程。 我首先使用 ThreadPoolExecutorrun_in_executor 尝试了一些更明智的事情,但最终得到了下面糟糕的多线程变体。

#! /usr/bin/env python3

import traceback
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

import asyncio, asyncssh, sys

_server="127.0.0.1"
_port=22
_proxy_port=8080


async def run_client():
    conn = await asyncio.wait_for(
        asyncssh.connect(
            _server,
            port=_port,
            options=asyncssh.SSHClientConnectionOptions(client_host_keysign=True),
        ),
        10,
    )

    listener = await conn.forward_socks('127.0.0.1', _proxy_port)
    return conn

async def do_stuff(func):
    try:
        conn = await run_client()
        print("SSH tunnel active")

        def start_loop(loop):
            asyncio.set_event_loop(loop)
            try:
                loop.run_forever()
            except Exception as e:
                print(f"worker loop: {e}")

        async def thread_func():
            ret=await func()
            print("Func done - tearing done worker thread and SSH connection")
            conn.close()
            #  asyncio.get_event_loop().stop()
            return ret

        func_loop = asyncio.new_event_loop()
        func_thread = Thread(target=start_loop, args=(func_loop,))
        func_thread.start()
        print("thread started")
        fut = asyncio.run_coroutine_threadsafe(thread_func(), func_loop)
        print(f"fut scheduled: {fut}")

        done = await asyncio.gather(asyncio.wrap_future(fut), conn.wait_closed())
        print("wait done")
        for ret in done:
            print(f"ret={ret}")

        # Canceling pending tasks and stopping the loop
        #  asyncio.gather(*asyncio.Task.all_tasks()).cancel()

        print("stopping func_loop")
        func_loop.call_soon_threadsafe(func_loop.stop())
        print("joining func_thread")
        func_thread.join()
        print("joined func_thread")

    except (OSError, asyncssh.Error) as exc:
        sys.exit('SSH connection failed: ' + str(exc))
    except (Exception) as exc:
        sys.exit('Unhandled exception: ' + str(exc))
        traceback.print_exc()


async def just_wait():
    print("starting just_wait")
    input()
    print("ending just_wait")
    return 42

asyncio.get_event_loop().run_until_complete(do_stuff(just_wait))

它实际上 "works" "correctly" 直到我在 join 工作线程时遇到异常。我想是因为我做的事情不是线程安全的。

Exception in callback None()
handle: <Handle>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
TypeError: 'NoneType' object is not callable

要测试代码,您必须为您的用户设置本地 SSH 服务器 运行 密钥文件。您可能需要更改 _port 变量。

我正在寻找异常的原因 and/or 该程序的一个版本需要较少的线程手动干预并且可能只使用一个事件循环。当我想 await 这两件事时,我不知道如何实现后者(如在 asyncio.gather 调用中)。

您的错误的直接原因是这一行:

# incorrect
func_loop.call_soon_threadsafe(func_loop.stop())

目的是在 运行 作为 func_loop 事件循环的线程中调用 func_loop.stop()。但正如所写,它在当前线程中调用func_loop.stop() 并将其return值(None)作为函数传递给call_soon_threadsafe调用。这导致 call_soon_threadsafe 抱怨 None 不可调用。要解决眼前的问题,您应该删除多余的括号并调用该方法:

# correct
func_loop.call_soon_threadsafe(func_loop.stop)

但是,代码写的肯定过于复杂了:

  • 当你已经在一个事件循环中时,创建一个新的事件循环没有意义
  • just_wait 不应该是 async def 因为它不等待任何东西,所以它显然不是异步的。
  • sys.exit 采用整数退出状态,而不是字符串。此外,在调用 sys.exit.
  • 之后尝试打印回溯 也没有多大意义。

要 运行 来自 asyncio 的非异步函数,只需将 run_in_executor 与函数一起使用并按原样将非异步函数传递给它。您不需要额外的线程或额外的事件循环,run_in_executor 将处理线程并将其与您当前的事件循环连接,有效地使同步功能可等待。例如(未经测试):

async def do_stuff(func):
    conn = await run_client()
    print("SSH tunnel active")
    loop = asyncio.get_event_loop()
    ret = await loop.run_in_executor(None, func)
    print(f"ret={ret}")
    conn.close()
    await conn.wait_closed()
    print("wait done")

def just_wait():
    # just_wait is a regular function; it can call blocking code,
    # but it cannot await
    print("starting just_wait")
    input()
    print("ending just_wait")
    return 42

asyncio.get_event_loop().run_until_complete(do_stuff(just_wait))

如果你需要等待 just_wait 中的东西,你可以将其设为 async 并使用 run_in_executor 作为其中的实际阻塞代码:

async def do_stuff():
    conn = await run_client()
    print("SSH tunnel active")
    loop = asyncio.get_event_loop()
    ret = await just_wait()
    print(f"ret={ret}")
    conn.close()
    await conn.wait_closed()
    print("wait done")

async def just_wait():
    # just_wait is an async function, it can await, but
    # must invoke blocking code through run_in_executor
    print("starting just_wait")
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, input)
    print("ending just_wait")
    return 42

asyncio.run(do_stuff())