如何在循环中使用 asyncio 结构安排多个任务
How can I schedule multiple tasks using asyncio constructs in a loop
我的用例是 运行 一些性能测试,所以我想创建一个应用程序,我 运行 1 个任务 4 次,计算该任务的平均时间,然后 运行 2 个异步任务,计算平均值,然后 运行 4 个异步任务,计算平均值,然后 8 等等。
但是,我无法运行这样。当我这样做时,似乎所有任务之前都已执行,但我得到了错误的时间。
我尝试了一些命中和试验,现在使用下面的代码,我在 run_tasks
函数的 loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
行得到了 TypeError: An asyncio.Future, a coroutine or an awaitable is required
sys:1: RuntimeWarning: coroutine 'go' was never awaited
。
下面是我的代码:
async def go(date):
pool = await aiopg.create_pool("**db connection**")
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(""" some query """)
time.sleep(1)
ret = []
async for row in cur:
ret.append(row)
def date_range(date1, date2):
for n in range(int((date2 - date1).days)+1):
yield date1 + timedelta(n)
def run_tasks():
start_dt = datetime(2017, 8, 9)
end_dt = datetime(2017, 8, 10)
tasks = []
some_tasks = []
avg_time_run = []
for dt in date_range(start_dt, end_dt):
#tasks.append(asyncio.ensure_future(go(dt.strftime("%Y-%m-%d %H:%M:%S"))))
tasks.append(go(dt.strftime("%Y-%m-%d %H:%M:%S")))
i = 1
prev = 0
while i < 2: # i < 128
# Get i number of tasks from task list
for k in range(prev, i):
some_tasks.append(tasks[k])
prev = len(some_tasks)
time_run = []
for j in range(0, 4): # repeat task 4 times
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
# loop.close()
end = time.time()
diff = end - start
time_run.append(diff)
print("ith SomeTask: {}, {}".format(i, some_tasks))
print("Total time: {}".format(diff))
# get average of each task run 4 times
avg_time_run.append(sum(time_run) / float(len(time_run)))
i *= 2
return avg_time_run
print(run_tasks())
一些提示将不胜感激。我应该把 await 放在哪里,因为它 asyncio.wait
asyncio.ensure_future(some_tasks)
您正在将协程列表传递给 asyncio.ensure_future
。正如您在 documentation this is not how this function works: you should pass single coroutine to create asyncio.Task 中看到的那样。这就是为什么你会得到 TypeError
,你会得到 RuntimeWarning
,然后自从创建 go
协程后就没有等待上述所有结果。
在这种情况下您根本不需要 asyncio.Task
,只需将协程列表传递给 asyncio.wait
:
loop.run_until_complete(asyncio.wait(some_tasks))
还有一件重要的事情:
time.sleep(1)
你永远不应该在协同程序中这样做:它会冻结你的事件循环(以及它随处可见的所有协同程序)。请阅读 了解 asyncio 的一般工作原理。
如果你想在协程中休眠一段时间,请使用 asyncio.sleep:
await asyncio.sleep(1)
答案代码:
async def run(date): // for adopt, check above go() function
conn = await asyncpg.connect("db connections")
values = await conn.fetch("""some query """)
await asyncio.sleep(1)
await conn.close()
def date_range(date1, date2):
for n in range(int((date2 - date1).days)+1):
yield date1 + timedelta(n)
def run_tasks():
start_dt = datetime(2017, 8, 9)
end_dt = datetime(2017, 8, 10)
tasks = []
avg_time_run = []
i = 1
while i < 9: # num of tasks incremented
time_run = []
start = time.time()
loop = asyncio.get_event_loop()
for dt in date_range(start_dt, end_dt):
if len(tasks) < i:
print(dt)
tasks.append(asyncio.ensure_future(run(dt.strftime("%Y-%m-%d %H:%M:%S"))))
if len(tasks) == i:
for j in range(0, 4): # repeat task 4 times
print("J counter: {}".format(j))
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
diff = end - start
time_run.append(diff)
print("Num of Tasks executing: {}, {}".format(i, tasks))
print("Task len: {}".format(len(tasks)))
print("Total time: {}".format(diff))
# get average of each task run 4 times
avg_time_run.append(sum(time_run) / float(len(time_run)))
start_dt = end_dt + timedelta(days=1)
end_dt = end_dt + timedelta(days=(i * 2 - i))
i *= 2
print(start_dt)
print(end_dt)
#loop.close()
return avg_time_run
print(run_tasks())
我的用例是 运行 一些性能测试,所以我想创建一个应用程序,我 运行 1 个任务 4 次,计算该任务的平均时间,然后 运行 2 个异步任务,计算平均值,然后 运行 4 个异步任务,计算平均值,然后 8 等等。
但是,我无法运行这样。当我这样做时,似乎所有任务之前都已执行,但我得到了错误的时间。
我尝试了一些命中和试验,现在使用下面的代码,我在 run_tasks
函数的 loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
行得到了 TypeError: An asyncio.Future, a coroutine or an awaitable is required
sys:1: RuntimeWarning: coroutine 'go' was never awaited
。
下面是我的代码:
async def go(date):
pool = await aiopg.create_pool("**db connection**")
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(""" some query """)
time.sleep(1)
ret = []
async for row in cur:
ret.append(row)
def date_range(date1, date2):
for n in range(int((date2 - date1).days)+1):
yield date1 + timedelta(n)
def run_tasks():
start_dt = datetime(2017, 8, 9)
end_dt = datetime(2017, 8, 10)
tasks = []
some_tasks = []
avg_time_run = []
for dt in date_range(start_dt, end_dt):
#tasks.append(asyncio.ensure_future(go(dt.strftime("%Y-%m-%d %H:%M:%S"))))
tasks.append(go(dt.strftime("%Y-%m-%d %H:%M:%S")))
i = 1
prev = 0
while i < 2: # i < 128
# Get i number of tasks from task list
for k in range(prev, i):
some_tasks.append(tasks[k])
prev = len(some_tasks)
time_run = []
for j in range(0, 4): # repeat task 4 times
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
# loop.close()
end = time.time()
diff = end - start
time_run.append(diff)
print("ith SomeTask: {}, {}".format(i, some_tasks))
print("Total time: {}".format(diff))
# get average of each task run 4 times
avg_time_run.append(sum(time_run) / float(len(time_run)))
i *= 2
return avg_time_run
print(run_tasks())
一些提示将不胜感激。我应该把 await 放在哪里,因为它 asyncio.wait
asyncio.ensure_future(some_tasks)
您正在将协程列表传递给 asyncio.ensure_future
。正如您在 documentation this is not how this function works: you should pass single coroutine to create asyncio.Task 中看到的那样。这就是为什么你会得到 TypeError
,你会得到 RuntimeWarning
,然后自从创建 go
协程后就没有等待上述所有结果。
在这种情况下您根本不需要 asyncio.Task
,只需将协程列表传递给 asyncio.wait
:
loop.run_until_complete(asyncio.wait(some_tasks))
还有一件重要的事情:
time.sleep(1)
你永远不应该在协同程序中这样做:它会冻结你的事件循环(以及它随处可见的所有协同程序)。请阅读
如果你想在协程中休眠一段时间,请使用 asyncio.sleep:
await asyncio.sleep(1)
答案代码:
async def run(date): // for adopt, check above go() function
conn = await asyncpg.connect("db connections")
values = await conn.fetch("""some query """)
await asyncio.sleep(1)
await conn.close()
def date_range(date1, date2):
for n in range(int((date2 - date1).days)+1):
yield date1 + timedelta(n)
def run_tasks():
start_dt = datetime(2017, 8, 9)
end_dt = datetime(2017, 8, 10)
tasks = []
avg_time_run = []
i = 1
while i < 9: # num of tasks incremented
time_run = []
start = time.time()
loop = asyncio.get_event_loop()
for dt in date_range(start_dt, end_dt):
if len(tasks) < i:
print(dt)
tasks.append(asyncio.ensure_future(run(dt.strftime("%Y-%m-%d %H:%M:%S"))))
if len(tasks) == i:
for j in range(0, 4): # repeat task 4 times
print("J counter: {}".format(j))
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
diff = end - start
time_run.append(diff)
print("Num of Tasks executing: {}, {}".format(i, tasks))
print("Task len: {}".format(len(tasks)))
print("Total time: {}".format(diff))
# get average of each task run 4 times
avg_time_run.append(sum(time_run) / float(len(time_run)))
start_dt = end_dt + timedelta(days=1)
end_dt = end_dt + timedelta(days=(i * 2 - i))
i *= 2
print(start_dt)
print(end_dt)
#loop.close()
return avg_time_run
print(run_tasks())