Python3 asyncio:处理字典中的任务并将结果存储在字典中

Python3 asyncio: Process tasks from dict and store result in dict

我正在尝试学习 asyncio。 我有一个应该被拉出的传感器列表。每个传感器拉动大约需要 1 秒。所以 asyncio 是执行此操作的正确任务。传感器可能会动态变化,因此 asyncio 应该会发生变化。

我已经有了以下代码,但现在我不知道如何将获取的值存储到生成的字典中。由于消费者将值存储到字典中,因此这个非常快的任务也可以在生产者中完成,因此根本不需要消费者,但我想对于 asyncio paradgim 来说必须有一个消费者。

也许我也觉得太复杂了,这里可以使用代码更少的更简单的编程范式吗? 另请参阅代码中的评论以获取更详细的问题和建议。

#!/usr/bin/python3

import asyncio
import random

sensors={ #This list changes often
"sensor1" : "http://abc.example.org",
"sensor2" : "http://outside.example.org",
"temperature" : "http://xe.example.com",
"outdoor" : "http://anywhere.example.org"
}

results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    q = asyncio.Queue()
    queries = [asyncio.create_task(querySensor(sensorname,q)) for sensorname in sensors]
    process = [asyncio.create_task(storeValues(sensorname, q)) for sensorname in sensors] #Since value is only stored to dict, one consumer should be sufficient, or no consumer at all, since producer could also store variable into dict
    await asyncio.gather(*queries)
    await q.join()
    for c in process:
        c.cancel()

async def querySensor(sensorname: str, q: asyncio.Queue):
    res = str(random.randint(0,100))
    resString = "Result for " + sensorname + " is " + res
    await q.put(resString)

async def storeValues(sensorname: str, q: asyncio.Queue):
    while True:
        res = await q.get()
        print("Value: ", res)
        q.task_done()

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    for result in results: #Now results should be in results
        print(result, "measured:", results[result])

解决方案

感谢您的回答。结果代码现在是:

#!/usr/bin/python3

import asyncio
import random

sensors={ #This list changes often
"sensor1" : "http://abc.example.org",
"sensor2" : "http://outside.example.org",
"temperature" : "http://xe.example.com",
"outdoor" : "http://anywhere.example.org"
}

results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    queries = [asyncio.create_task(querySensor(sensorname)) for sensorname in sensors]
    await asyncio.gather(*queries)

async def querySensor(sensorname: str):
    res = str(random.randint(0,100))
    resString = "Result for " + sensorname + " is " + res
    results[sensorname] = resString

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    for result in results: #Now results should be in results
        print(result, "measured:", results[result])

获取实际值背后有一些更复杂的代码。所以这只是一个例子。我想隐藏额外的代码层。

这两个答案对我来说都很有价值。为了支持新用户的声誉,我接受 Sreejith 的回答以将此问题标记为已解决。

此代码的主要问题是您从未实际存储 传感器值给你的结果字典。如果 stooreValues 中的代码包含 results.setdefault(sensorname, []).append(res) 行,您就会看到结果。 (字典 .setdefault 方法是一种实用程序,如果不存在则在字典中创建一个值,并且 return 该值或现有值:因此我们在第一次调用每个传感器时创建一个空列表,并且继续追加它)。

但是,正如您所指出的,在此代码中没有必要使用 producer/consumer 分隔模式(任何将使用“结果”字典的代码实际上都是消费者)

...
from aiohttp_requests import requests


results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    queries = [asyncio.create_task(querySensor(sensorname)) for sensorname in sensors]
    await asyncio.gather(*queries)


async def querySensor(sensorname: str):
    res = str(random.randint(0,100))
    # important, when writting the actual call to read the sensor, to use
    # an async expresion and await for it. 
    response = await requests.get(sensors[sensorname], ...)
    text = await response.text()
    resString = f"Result for {sensorname} is {text}"
    results.setdefault(sensorname, []).append(resString)

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    for result in results: #Now results should be in results
        print(result, "measured:", results[result])

    

这个例子使用的是https://pypi.org/project/aiohttp-requests/

尚不清楚您的确切目标是什么。如果您只想更新字典,代码可能会简单得多。如果您还有其他期待,请告诉我。

sensors={ #This list changes often
"sensor1" : "http://abc.example.org",
"sensor2" : "http://outside.example.org",
"temperature" : "http://xe.example.com",
"outdoor" : "http://anywhere.example.org"
}

results = dict() #Result from sensor query should go here

async def queryAll(sensors):
    queries = [asyncio.create_task(querySensor(sensorname, results)) for sensorname in sensors]
    await asyncio.gather(*queries)

async def querySensor(sensorname: str, q: dict):
    res = str(random.randint(0, 100))
    resString = "Result for " + sensorname + " is " + res
    q[sensorname] = resString

if __name__ == "__main__":
    asyncio.run(queryAll(sensors))
    print(results)