asyncio/aiohttp - 如何进行一系列异步但依赖的请求?
asyncio/aiohttp - How to make series of async - but dependent - requests?
我有一系列异步请求对,其中一对由请求 A 和请求 B 组成。此外,请求 B 依赖于请求 A。换句话说,我需要将数据从响应 A 传递到请求B. 因此,我需要安排任务,以便每个任务发送请求 A,然后仅在响应 A returned 后发送请求 B。
from aiohttp import ClientSession
from typing import *
import asyncio
async def request_A(url: str, session: ClientSession) -> dict:
async with session.request('get', url) as response:
return await response.json()
async def request_B(url: str, data: dict, session: ClientSession) -> dict:
async with session.request('post', url, json=data) as response:
return await response.json()
async def request_chain(url_A: str, url_B: str, session: ClientSession) -> dict:
response_A_data = await request_A(url_A, session)
response_B_data = await request_B(url_B, response_A_data, session)
return response_B_data
async def schedule(url_chains: List[Tuple[str, str]]) -> list:
tasks = []
async with ClientSession() as session:
for url_chain in url_chains:
url_A, url_B = url_chain
task = asyncio.create_task(request_chain(url_A, url_B, session))
tasks.append(task)
return await asyncio.gather(*tasks)
def run_tasks(url_chains: List[Tuple[str, str]]) -> list:
return asyncio.run(schedule(url_chains))
现在,我的问题是:对于由一对请求组成的每个任务,在发送请求 B 之前是否保证请求 A return?请解释。我担心在任务中,当请求 A 正在等待时,请求 B 可能会执行。
如果不是,我如何才能保持任务异步和非阻塞,同时确保在任务中,请求 A 阻止请求 B 的执行,直到响应 A returned?
我知道我可以 运行 批量调用所有请求 A,然后 运行 批量调用所有请求 B,但出于我的用例的特定原因,我需要 运行 一批所有 (Request A, Request B) 对。
Per each task consisting of a pair of requests, is Request A
guaranteed to return before Request B is sent?
是的,async/await 模式的优点是你不必问自己这个问题,连续的代码行将始终按顺序执行(但不一定连续).这里你的函数 request_chain
保证 request_A
总是在 request_B
.
之前执行
while Request A is being awaited, Request B may execute
那不会发生,这基本上就是 await
的意思:坚持直到请求 A 返回,然后再继续。也就是说,await
对执行顺序没有影响。它只是 将控制权交给 ,因此 其他人 可以使用隐藏时间(在您的情况下,来自另一个 (A , B)请求对)。这就是为什么连续的代码行不一定连续执行,将控制权交给其他coroutine(我们刚才提到的其他人 ) 使用 await
允许此协程在 A 和 B 之间执行代码。
即使这有点不准确,您也可以记住:唯一将并行执行的代码是您自己安排的代码(在本例中使用 asyncio.gather
,安排多个 (A, B) 对 并行执行)。
I understand that I can run all Request A calls in a batch, then run all Request B calls in a batch, but for reasons specific to my use case, I need to run a batch of all ...
在这种特殊情况下,即使你可以 运行 一批 A 然后一批 B的,我认为您的解决方案会更好,因为它以更简单的方式突出了 A 和 B.
之间的关系
这是一个代码示例,您可以 运行 尝试一下(它与您在此处使用 public 数学 API 所做的相同),它只是计算"x*2+2"分两步,先是“*2”(相当于请求A),然后是“+2”(相当于请求B):
MATH_API_URL = "http://api.mathjs.org/v4"
from aiohttp import ClientSession
import asyncio
async def maths(session, url, expression):
params = {"expr" : expression}
print(f"\t> computing {expression}")
async with session.get(url, params=params) as response:
result = await response.text()
print(f"\t< {expression} = {result}")
return result
async def twice(session, x):
return await maths(session, MATH_API_URL, f"2 * {x}")
async def plus_two(session, x):
return await maths(session, MATH_API_URL, f"2 + {x}")
async def twice_plus_two(session, x):
twice_x = await twice(session, x)
return await plus_two(session, twice_x)
async def main(inputs):
async with ClientSession() as session:
return await asyncio.gather(*(twice_plus_two(session, x) for x in inputs))
inputs = list(range(3))
print([x*2+2 for x in inputs])
print(asyncio.run(main(inputs)))
此代码输出安排请求的顺序:
[2, 4, 6]
> computing 2 * 0
> computing 2 * 1
> computing 2 * 2
< 2 * 1 = 2
> computing 2 + 2
< 2 * 0 = 0
> computing 2 + 0
< 2 * 2 = 4
> computing 2 + 4
< 2 + 2 = 4
< 2 + 4 = 6
< 2 + 0 = 2
['2', '4', '6']
查看“*2”返回后如何安排“+2”。
我有一系列异步请求对,其中一对由请求 A 和请求 B 组成。此外,请求 B 依赖于请求 A。换句话说,我需要将数据从响应 A 传递到请求B. 因此,我需要安排任务,以便每个任务发送请求 A,然后仅在响应 A returned 后发送请求 B。
from aiohttp import ClientSession
from typing import *
import asyncio
async def request_A(url: str, session: ClientSession) -> dict:
async with session.request('get', url) as response:
return await response.json()
async def request_B(url: str, data: dict, session: ClientSession) -> dict:
async with session.request('post', url, json=data) as response:
return await response.json()
async def request_chain(url_A: str, url_B: str, session: ClientSession) -> dict:
response_A_data = await request_A(url_A, session)
response_B_data = await request_B(url_B, response_A_data, session)
return response_B_data
async def schedule(url_chains: List[Tuple[str, str]]) -> list:
tasks = []
async with ClientSession() as session:
for url_chain in url_chains:
url_A, url_B = url_chain
task = asyncio.create_task(request_chain(url_A, url_B, session))
tasks.append(task)
return await asyncio.gather(*tasks)
def run_tasks(url_chains: List[Tuple[str, str]]) -> list:
return asyncio.run(schedule(url_chains))
现在,我的问题是:对于由一对请求组成的每个任务,在发送请求 B 之前是否保证请求 A return?请解释。我担心在任务中,当请求 A 正在等待时,请求 B 可能会执行。
如果不是,我如何才能保持任务异步和非阻塞,同时确保在任务中,请求 A 阻止请求 B 的执行,直到响应 A returned?
我知道我可以 运行 批量调用所有请求 A,然后 运行 批量调用所有请求 B,但出于我的用例的特定原因,我需要 运行 一批所有 (Request A, Request B) 对。
Per each task consisting of a pair of requests, is Request A guaranteed to return before Request B is sent?
是的,async/await 模式的优点是你不必问自己这个问题,连续的代码行将始终按顺序执行(但不一定连续).这里你的函数 request_chain
保证 request_A
总是在 request_B
.
while Request A is being awaited, Request B may execute
那不会发生,这基本上就是 await
的意思:坚持直到请求 A 返回,然后再继续。也就是说,await
对执行顺序没有影响。它只是 将控制权交给 ,因此 其他人 可以使用隐藏时间(在您的情况下,来自另一个 (A , B)请求对)。这就是为什么连续的代码行不一定连续执行,将控制权交给其他coroutine(我们刚才提到的其他人 ) 使用 await
允许此协程在 A 和 B 之间执行代码。
即使这有点不准确,您也可以记住:唯一将并行执行的代码是您自己安排的代码(在本例中使用 asyncio.gather
,安排多个 (A, B) 对 并行执行)。
I understand that I can run all Request A calls in a batch, then run all Request B calls in a batch, but for reasons specific to my use case, I need to run a batch of all ...
在这种特殊情况下,即使你可以 运行 一批 A 然后一批 B的,我认为您的解决方案会更好,因为它以更简单的方式突出了 A 和 B.
之间的关系这是一个代码示例,您可以 运行 尝试一下(它与您在此处使用 public 数学 API 所做的相同),它只是计算"x*2+2"分两步,先是“*2”(相当于请求A),然后是“+2”(相当于请求B):
MATH_API_URL = "http://api.mathjs.org/v4"
from aiohttp import ClientSession
import asyncio
async def maths(session, url, expression):
params = {"expr" : expression}
print(f"\t> computing {expression}")
async with session.get(url, params=params) as response:
result = await response.text()
print(f"\t< {expression} = {result}")
return result
async def twice(session, x):
return await maths(session, MATH_API_URL, f"2 * {x}")
async def plus_two(session, x):
return await maths(session, MATH_API_URL, f"2 + {x}")
async def twice_plus_two(session, x):
twice_x = await twice(session, x)
return await plus_two(session, twice_x)
async def main(inputs):
async with ClientSession() as session:
return await asyncio.gather(*(twice_plus_two(session, x) for x in inputs))
inputs = list(range(3))
print([x*2+2 for x in inputs])
print(asyncio.run(main(inputs)))
此代码输出安排请求的顺序:
[2, 4, 6]
> computing 2 * 0
> computing 2 * 1
> computing 2 * 2
< 2 * 1 = 2
> computing 2 + 2
< 2 * 0 = 0
> computing 2 + 0
< 2 * 2 = 4
> computing 2 + 4
< 2 + 2 = 4
< 2 + 4 = 6
< 2 + 0 = 2
['2', '4', '6']
查看“*2”返回后如何安排“+2”。