为什么 asyncio 的 run_in_executor 在发出 HTTP 请求时并行度如此之低?
Why asyncio's run_in_executor gives so little parallelization when making HTTP requests?
我编写了一个基准实用程序来批量查询 REST 端点。它通过三种方式实现:
- 依次使用请求库,
- 同时使用请求库,但用
loop.run_in_executor()
、 包装每个请求
- 同时,使用 aiohttp 库。
以下是不同批量大小的结果:
batch_size=16
concur_times seq_times concur_aiohttp_times
count 50.000000 50.000000 50.000000
mean 0.123786 0.235883 0.087843
std 0.009733 0.018039 0.029977
min 0.108682 0.210515 0.071560
25% 0.118666 0.222436 0.075565
50% 0.121978 0.231876 0.080050
75% 0.125740 0.242939 0.086345
max 0.169194 0.283809 0.267874
batch_size=4
concur_times seq_times concur_aiohttp_times
count 50.000000 50.000000 50.000000
mean 0.080764 0.091276 0.052807
std 0.008342 0.016509 0.033814
min 0.069541 0.078517 0.041993
25% 0.076142 0.082242 0.044563
50% 0.079046 0.085540 0.045735
75% 0.081645 0.092659 0.049428
max 0.111622 0.170785 0.281397
如结果所示,aiohttp 例程的并行性始终更高。更重要的是,对于小批量 (4),使用 loop.run_in_executor
("concur_times" 列)的第二种方法仅比顺序方法实现了 1/9 的加速。
这是为什么?我的代码有问题吗?我把它包括在下面。
我已经尝试将网络 IO 换成 sleep
和 asyncio.sleep
,这产生了方法 2 和 3 同样快而方法 1 慢 batch_size 倍的预期结果.
代码:
import asyncio
import requests
from cytoolz.curried import *
import pandas as pd
from timeit import default_timer as now
url = 'https://jsonplaceholder.typicode.com/todos/'
def dl_todo_with_requests(session, n):
response = session.get(url + str(n))
assert(response.status_code == 200)
text = response.text
return text
dl_todo_with_requests = curry(dl_todo_with_requests)
def seq_dl(todos_to_get):
with requests.Session() as session:
todos = pipe(
todos_to_get,
map( dl_todo_with_requests(session) ),
list )
return todos
get_todos_from_futures = lambda futures: \
pipe( futures,
map( lambda fut: fut.result() ),
list
)
async def concur_dl(todos_to_get):
loop = asyncio.get_running_loop()
with requests.Session() as session:
completed_futures, _pending = await \
pipe(
todos_to_get,
map( lambda n:
loop.run_in_executor(
None,
lambda: dl_todo_with_requests(session, n)
)),
list,
asyncio.wait
);
todos = get_todos_from_futures(completed_futures)
return todos
import aiohttp
async def concur_dl_aiohttp(todos_to_get):
async def dl(session, todo):
async with session.get(url + str(todo)) as resp:
assert(resp.status == 200)
return resp.text()
dl = curry(dl)
async with aiohttp.ClientSession() as session:
loop = asyncio.get_running_loop()
unexecuted = pipe(
todos_to_get,
map( dl(session) ),
list )
completed_futures, _pending = await asyncio.wait(unexecuted)
todos = get_todos_from_futures(completed_futures)
return todos
def check_todos_received(todos):
assert(len(todos) == len(todos_to_get))
todo_has_content = lambda todo: len(todo) > len('{}')
assert(all(map(todo_has_content, todos)))
return True
def measure_it(f):
start = now();
f()
elapsed = now() - start
return elapsed
inspect = lambda f, it: map(do(f), it)
inspect = curry(inspect)
def bench(n_iters=50,batch_size=4):
todos_to_get = range(1,batch_size+1)
seq_dl(todos_to_get)
# heat caches, if any
measure_seq = lambda: pipe(
seq_dl(todos_to_get),
inspect(check_todos_received) )
measure_concur = lambda: pipe(
asyncio.run(concur_dl(todos_to_get)),
inspect(check_todos_received) )
measure_concur_aiohttp = lambda: pipe(
asyncio.run(concur_dl_aiohttp(todos_to_get)),
inspect(check_todos_received) )
do_the_bench = lambda dl_f, title: \
pipe( range(n_iters),
inspect(
lambda n: \
print("doing %s/%s %s batch download" \
% (n+1,n_iters,title))),
map(lambda _: measure_it(dl_f)),
list )
concur_times = do_the_bench(measure_concur,'concurrent')
concur_aiohttp_times = do_the_bench(measure_concur_aiohttp,'concurrent_aiohttp')
seq_times = do_the_bench(measure_seq,'sequential')
return dict(
concur_times=concur_times,
seq_times=seq_times,
concur_aiohttp_times=concur_aiohttp_times)
基准是 运行 这样的:bench(n_iters=50,batch_size=4)
。然后通过 lambda output: pandas.DataFrame(output).describe()
传递输出以生成表格。
asyncio run_in_executor
的默认执行器是 ThreadPoolExecutor, which uses Python threads. So it is also affected by the GIL, as described in this 线程。
在您的情况下,一次只有一个异步作业线程运行,因此 aiohttp 显示出更好的性能。
我编写了一个基准实用程序来批量查询 REST 端点。它通过三种方式实现:
- 依次使用请求库,
- 同时使用请求库,但用
loop.run_in_executor()
、 包装每个请求
- 同时,使用 aiohttp 库。
以下是不同批量大小的结果:
batch_size=16
concur_times seq_times concur_aiohttp_times
count 50.000000 50.000000 50.000000
mean 0.123786 0.235883 0.087843
std 0.009733 0.018039 0.029977
min 0.108682 0.210515 0.071560
25% 0.118666 0.222436 0.075565
50% 0.121978 0.231876 0.080050
75% 0.125740 0.242939 0.086345
max 0.169194 0.283809 0.267874
batch_size=4
concur_times seq_times concur_aiohttp_times
count 50.000000 50.000000 50.000000
mean 0.080764 0.091276 0.052807
std 0.008342 0.016509 0.033814
min 0.069541 0.078517 0.041993
25% 0.076142 0.082242 0.044563
50% 0.079046 0.085540 0.045735
75% 0.081645 0.092659 0.049428
max 0.111622 0.170785 0.281397
如结果所示,aiohttp 例程的并行性始终更高。更重要的是,对于小批量 (4),使用 loop.run_in_executor
("concur_times" 列)的第二种方法仅比顺序方法实现了 1/9 的加速。
这是为什么?我的代码有问题吗?我把它包括在下面。
我已经尝试将网络 IO 换成 sleep
和 asyncio.sleep
,这产生了方法 2 和 3 同样快而方法 1 慢 batch_size 倍的预期结果.
代码:
import asyncio
import requests
from cytoolz.curried import *
import pandas as pd
from timeit import default_timer as now
url = 'https://jsonplaceholder.typicode.com/todos/'
def dl_todo_with_requests(session, n):
response = session.get(url + str(n))
assert(response.status_code == 200)
text = response.text
return text
dl_todo_with_requests = curry(dl_todo_with_requests)
def seq_dl(todos_to_get):
with requests.Session() as session:
todos = pipe(
todos_to_get,
map( dl_todo_with_requests(session) ),
list )
return todos
get_todos_from_futures = lambda futures: \
pipe( futures,
map( lambda fut: fut.result() ),
list
)
async def concur_dl(todos_to_get):
loop = asyncio.get_running_loop()
with requests.Session() as session:
completed_futures, _pending = await \
pipe(
todos_to_get,
map( lambda n:
loop.run_in_executor(
None,
lambda: dl_todo_with_requests(session, n)
)),
list,
asyncio.wait
);
todos = get_todos_from_futures(completed_futures)
return todos
import aiohttp
async def concur_dl_aiohttp(todos_to_get):
async def dl(session, todo):
async with session.get(url + str(todo)) as resp:
assert(resp.status == 200)
return resp.text()
dl = curry(dl)
async with aiohttp.ClientSession() as session:
loop = asyncio.get_running_loop()
unexecuted = pipe(
todos_to_get,
map( dl(session) ),
list )
completed_futures, _pending = await asyncio.wait(unexecuted)
todos = get_todos_from_futures(completed_futures)
return todos
def check_todos_received(todos):
assert(len(todos) == len(todos_to_get))
todo_has_content = lambda todo: len(todo) > len('{}')
assert(all(map(todo_has_content, todos)))
return True
def measure_it(f):
start = now();
f()
elapsed = now() - start
return elapsed
inspect = lambda f, it: map(do(f), it)
inspect = curry(inspect)
def bench(n_iters=50,batch_size=4):
todos_to_get = range(1,batch_size+1)
seq_dl(todos_to_get)
# heat caches, if any
measure_seq = lambda: pipe(
seq_dl(todos_to_get),
inspect(check_todos_received) )
measure_concur = lambda: pipe(
asyncio.run(concur_dl(todos_to_get)),
inspect(check_todos_received) )
measure_concur_aiohttp = lambda: pipe(
asyncio.run(concur_dl_aiohttp(todos_to_get)),
inspect(check_todos_received) )
do_the_bench = lambda dl_f, title: \
pipe( range(n_iters),
inspect(
lambda n: \
print("doing %s/%s %s batch download" \
% (n+1,n_iters,title))),
map(lambda _: measure_it(dl_f)),
list )
concur_times = do_the_bench(measure_concur,'concurrent')
concur_aiohttp_times = do_the_bench(measure_concur_aiohttp,'concurrent_aiohttp')
seq_times = do_the_bench(measure_seq,'sequential')
return dict(
concur_times=concur_times,
seq_times=seq_times,
concur_aiohttp_times=concur_aiohttp_times)
基准是 运行 这样的:bench(n_iters=50,batch_size=4)
。然后通过 lambda output: pandas.DataFrame(output).describe()
传递输出以生成表格。
asyncio run_in_executor
的默认执行器是 ThreadPoolExecutor, which uses Python threads. So it is also affected by the GIL, as described in this 线程。
在您的情况下,一次只有一个异步作业线程运行,因此 aiohttp 显示出更好的性能。