python 和 asyncua 中的 Asyncio 队列

Asyncio queues in python and asyncua

我正在使用 asyncio 和 asyncua libraries in python 3.8.7 and I want to use asyncio.Queue 将数据从异步函数传递到主线程。

但是,我遇到的问题是我的队列在第一个 await 之后没有收到任何消息。

这是我的例子:

import threading
import time
import asyncio
from asyncua import Client
import aiohttp

async def main(queue1):
    queue1.put_nowait("main")
    client = Client(url='opc.tcp://localhost:4840/freeopcua/server/')
    #client = aiohttp.ClientSession()

    async with client:
        await queue1.put("in_async")
        queue1.put_nowait("in_async_2")

    queue1.put_nowait("after await")


def wrapper(queue1: asyncio.Queue):
    queue1.put_nowait("wrapper")
    asyncio.run(main(queue1))

if __name__ == '__main__':
    queue1 = asyncio.Queue()
    t = threading.Thread(target=wrapper, args=(queue1,))
    t.start()
    time.sleep(1)
    noEx = True
    while noEx:
        try:
            x = queue1.get_nowait()
            print(x)
        except asyncio.QueueEmpty:
            noEx = False

我得到:

wrapper
main

如果我使用其他一些异步库(示例中的 aiohttp),那么一切都会按预期工作:

wrapper
main
in_async
in_async_2
after await

我已验证 opc.tcp://localhost:4840/freeopcua/server/ 服务器上的 minimal asyncua server 可以正常工作 - 我可以使用此示例中的代码获取数据,但队列似乎不起作用。有什么我想念的吗?

这是我的猜测,但我认为你有竞争条件。这是主线程中的 while 循环:

while noEx:
    try:
        x = queue1.get_nowait()
        print(x)
    except asyncio.QueueEmpty:
        noEx = False

一旦打印出某些内容,循环就会尝试获取队列中的下一个内容。如果队列在那一刻为空,while 循环将退出,并且不会再打印任何内容。

如果在您的辅助线程中建立连接的速度足够快,队列将填充接下来的两条消息;但是如果那里有一点延迟,则您的主 while 循环可能在消息有机会打印之前已经退出。我很确定在进入 async with: 块之前可能存在(未知的)网络延迟。

我会尝试在 print(x) 语句之后插入一个显着的时间延迟,看看会发生什么(如果这能解决问题,您可以稍后改进代码)。或者更改 while 循环,使其永远运行,因为您始终可以使用 control-C 退出程序。