异步代码 运行 同步,似乎没有任何行会阻塞
async code running synchronously, doesn't seem to have any lines that will be blocking
运行 on Windows 10, Python 3.6.3, 运行ning inside PyCharm IDE, 这个代码:
import asyncio
import json
import datetime
import time
from aiohttp import ClientSession
async def get_tags():
url_tags = f"{BASE_URL}tags?access_token={token}"
async with ClientSession() as session:
async with session.get(url_tags) as response:
return await response.read()
async def get_trips(vehicles):
url_trips = f"{BASE_URL}fleet/trips?access_token={token}"
for vehicle in vehicles:
body_trips = {"groupId": groupid, "vehicleId": vehicle['id'], "startMs": int(start_ms), "endMs": int(end_ms)}
async with ClientSession() as session:
async with session.post(url_trips, json=body_trips) as response:
yield response.read()
async def main():
tags = await get_tags()
tag_list = json.loads(tags.decode('utf8'))['tags']
veh = tag_list[0]['vehicles'][0:5]
return [await v async for v in get_trips(veh)]
t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
t2 = time.time()
print(t2 - t1)
似乎 运行 完全同步,时间随着循环大小的增加而线性增加。按照我读过的一本书 "Using Asyncio in Python 3" 中的示例,代码应该是异步的;我在这里错过了什么吗? C# 中的类似代码在几秒钟内完成大约 2,000 个请求,这里大约需要 14 秒到 运行 20 个请求(6 秒到 运行 10)。
编辑:
重写了一些代码:
async def get_trips(vehicle):
url_trips = f"{BASE_URL}fleet/trips?access_token={token}"
#for vehicle in vehicles:
body_trips = {"groupId": groupid, "vehicleId": vehicle['id'], "startMs": int(start_ms), "endMs": int(end_ms)}
async with ClientSession() as session:
async with session.post(url_trips, json=body_trips) as response:
res = await response.read()
return res
t1 = time.time()
loop = asyncio.new_event_loop()
x = loop.run_until_complete(get_tags())
tag_list = json.loads(x.decode('utf8'))['tags']
veh = tag_list[0]['vehicles'][0:10]
tasks = []
for v in veh:
tasks.append(loop.create_task(get_trips(v)))
loop.run_until_complete(asyncio.wait(tasks))
t2 = time.time()
print(t2 - t1)
这实际上是 运行ning 异步,但现在我不能使用我的 get_trips 函数中的 return 值,而且我没有真正看到一个明确的方法来用它。我看到的几乎所有教程都只是打印结果,这基本上没有用。对于 async 在 Python 中应该如何工作以及为什么一些带有 async 关键字的东西 运行 同步而其他的却不同步,我有点困惑。
简单的问题是:如何将任务的 return 结果添加到列表或字典中?更高级的问题,有人可以解释为什么我在第一个示例中的代码 运行 是同步的,而第二部分中的代码 运行 是异步的吗?
编辑 2:
正在替换:
loop.run_until_complete(asyncio.wait(tasks))
与:
x = loop.run_until_complete(asyncio.gather(*tasks))
修复了简单的问题;现在只是好奇为什么异步列表理解不会 运行 异步
now just curious why the async list comprehension doesn't run asynchronously
因为您的理解迭代了一个异步生成器,该生成器生成一个任务,然后您立即等待该任务,从而破坏了并行性。这大致相当于:
for vehicle in vehicles:
trips = await fetch_trips(vehicle)
# do something with trips
要使其并行,您可以使用 wait
或 gather
,正如您已经发现的那样,但这不是强制性的。一旦你创建了一个任务,它就会运行并行。例如,这也应该有效:
# step 1: create the tasks and store (task, vehicle) pairs in a list
tasks = [(loop.create_task(get_trips(v)), v)
for v in vehicles]
# step 2: await them one by one, while others are running:
for t, v in tasks:
trips = await t
# do something with trips for vehicle v
运行 on Windows 10, Python 3.6.3, 运行ning inside PyCharm IDE, 这个代码:
import asyncio
import json
import datetime
import time
from aiohttp import ClientSession
async def get_tags():
url_tags = f"{BASE_URL}tags?access_token={token}"
async with ClientSession() as session:
async with session.get(url_tags) as response:
return await response.read()
async def get_trips(vehicles):
url_trips = f"{BASE_URL}fleet/trips?access_token={token}"
for vehicle in vehicles:
body_trips = {"groupId": groupid, "vehicleId": vehicle['id'], "startMs": int(start_ms), "endMs": int(end_ms)}
async with ClientSession() as session:
async with session.post(url_trips, json=body_trips) as response:
yield response.read()
async def main():
tags = await get_tags()
tag_list = json.loads(tags.decode('utf8'))['tags']
veh = tag_list[0]['vehicles'][0:5]
return [await v async for v in get_trips(veh)]
t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
t2 = time.time()
print(t2 - t1)
似乎 运行 完全同步,时间随着循环大小的增加而线性增加。按照我读过的一本书 "Using Asyncio in Python 3" 中的示例,代码应该是异步的;我在这里错过了什么吗? C# 中的类似代码在几秒钟内完成大约 2,000 个请求,这里大约需要 14 秒到 运行 20 个请求(6 秒到 运行 10)。
编辑:
重写了一些代码:
async def get_trips(vehicle):
url_trips = f"{BASE_URL}fleet/trips?access_token={token}"
#for vehicle in vehicles:
body_trips = {"groupId": groupid, "vehicleId": vehicle['id'], "startMs": int(start_ms), "endMs": int(end_ms)}
async with ClientSession() as session:
async with session.post(url_trips, json=body_trips) as response:
res = await response.read()
return res
t1 = time.time()
loop = asyncio.new_event_loop()
x = loop.run_until_complete(get_tags())
tag_list = json.loads(x.decode('utf8'))['tags']
veh = tag_list[0]['vehicles'][0:10]
tasks = []
for v in veh:
tasks.append(loop.create_task(get_trips(v)))
loop.run_until_complete(asyncio.wait(tasks))
t2 = time.time()
print(t2 - t1)
这实际上是 运行ning 异步,但现在我不能使用我的 get_trips 函数中的 return 值,而且我没有真正看到一个明确的方法来用它。我看到的几乎所有教程都只是打印结果,这基本上没有用。对于 async 在 Python 中应该如何工作以及为什么一些带有 async 关键字的东西 运行 同步而其他的却不同步,我有点困惑。
简单的问题是:如何将任务的 return 结果添加到列表或字典中?更高级的问题,有人可以解释为什么我在第一个示例中的代码 运行 是同步的,而第二部分中的代码 运行 是异步的吗?
编辑 2:
正在替换:
loop.run_until_complete(asyncio.wait(tasks))
与:
x = loop.run_until_complete(asyncio.gather(*tasks))
修复了简单的问题;现在只是好奇为什么异步列表理解不会 运行 异步
now just curious why the async list comprehension doesn't run asynchronously
因为您的理解迭代了一个异步生成器,该生成器生成一个任务,然后您立即等待该任务,从而破坏了并行性。这大致相当于:
for vehicle in vehicles:
trips = await fetch_trips(vehicle)
# do something with trips
要使其并行,您可以使用 wait
或 gather
,正如您已经发现的那样,但这不是强制性的。一旦你创建了一个任务,它就会运行并行。例如,这也应该有效:
# step 1: create the tasks and store (task, vehicle) pairs in a list
tasks = [(loop.create_task(get_trips(v)), v)
for v in vehicles]
# step 2: await them one by one, while others are running:
for t, v in tasks:
trips = await t
# do something with trips for vehicle v