如何在循环中使用 asyncio 结构安排多个任务

How can I schedule multiple tasks using asyncio constructs in a loop

我的用例是 运行 一些性能测试,所以我想创建一个应用程序,我 运行 1 个任务 4 次,计算该任务的平均时间,然后 运行 2 个异步任务,计算平均值,然后 运行 4 个异步任务,计算平均值,然后 8 等等。

但是,我无法运行这样。当我这样做时,似乎所有任务之前都已执行,但我得到了错误的时间。

我尝试了一些命中和试验,现在使用下面的代码,我在 run_tasks 函数的 loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks))) 行得到了 TypeError: An asyncio.Future, a coroutine or an awaitable is required sys:1: RuntimeWarning: coroutine 'go' was never awaited

下面是我的代码:

async def go(date):
    pool = await aiopg.create_pool("**db connection**")
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:

            await cur.execute(""" some query """)
            time.sleep(1)

            ret = []
            async for row in cur:
                ret.append(row)


def date_range(date1, date2):
    for n in range(int((date2 - date1).days)+1):
        yield date1 + timedelta(n)


def run_tasks():

    start_dt = datetime(2017, 8, 9)
    end_dt = datetime(2017, 8, 10)

    tasks = []
    some_tasks = []

    avg_time_run = []

    for dt in date_range(start_dt, end_dt):
        #tasks.append(asyncio.ensure_future(go(dt.strftime("%Y-%m-%d %H:%M:%S"))))
        tasks.append(go(dt.strftime("%Y-%m-%d %H:%M:%S")))

    i = 1
    prev = 0
    while i < 2: # i < 128

        # Get i number of tasks from task list
        for k in range(prev, i):
            some_tasks.append(tasks[k])

        prev = len(some_tasks)
        time_run = []
        for j in range(0, 4):  # repeat task 4 times
            start = time.time()
            loop = asyncio.get_event_loop()

            loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
            # loop.close()

            end = time.time()
            diff = end - start
            time_run.append(diff)
            print("ith SomeTask: {}, {}".format(i, some_tasks))
            print("Total time: {}".format(diff))

        # get average of each task run 4 times
        avg_time_run.append(sum(time_run) / float(len(time_run)))
        i *= 2

    return avg_time_run


print(run_tasks())    

一些提示将不胜感激。我应该把 await 放在哪里,因为它 asyncio.wait

asyncio.ensure_future(some_tasks)

您正在将协程列表传递给 asyncio.ensure_future。正如您在 documentation this is not how this function works: you should pass single coroutine to create asyncio.Task 中看到的那样。这就是为什么你会得到 TypeError,你会得到 RuntimeWarning,然后自从创建 go 协程后就没有等待上述所有结果。

在这种情况下您根本不需要 asyncio.Task,只需将协程列表传递给 asyncio.wait:

loop.run_until_complete(asyncio.wait(some_tasks))

还有一件重要的事情:

time.sleep(1)

你永远不应该在协同程序中这样做:它会冻结你的事件循环(以及它随处可见的所有协同程序)。请阅读 了解 asyncio 的一般工作原理。

如果你想在协程中休眠一段时间,请使用 asyncio.sleep:

await asyncio.sleep(1)

答案代码:

async def run(date): // for adopt, check above go() function
    conn = await asyncpg.connect("db connections")
    values = await conn.fetch("""some query """)
    await asyncio.sleep(1)
    await conn.close()


def date_range(date1, date2):
    for n in range(int((date2 - date1).days)+1):
        yield date1 + timedelta(n)


def run_tasks():

    start_dt = datetime(2017, 8, 9)
    end_dt = datetime(2017, 8, 10)

    tasks = []

    avg_time_run = []

    i = 1

    while i < 9:  # num of tasks incremented
        time_run = []

        start = time.time()
        loop = asyncio.get_event_loop()

        for dt in date_range(start_dt, end_dt):
            if len(tasks) < i:
                print(dt)
                tasks.append(asyncio.ensure_future(run(dt.strftime("%Y-%m-%d %H:%M:%S"))))

                if len(tasks) == i:

                    for j in range(0, 4):  # repeat task 4 times
                        print("J counter: {}".format(j))

                        loop.run_until_complete(asyncio.wait(tasks))

                        end = time.time()
                        diff = end - start
                        time_run.append(diff)
                        print("Num of Tasks executing: {}, {}".format(i, tasks))
                        print("Task len: {}".format(len(tasks)))
                        print("Total time: {}".format(diff))

        # get average of each task run 4 times
        avg_time_run.append(sum(time_run) / float(len(time_run)))
        start_dt = end_dt + timedelta(days=1)
        end_dt = end_dt + timedelta(days=(i * 2 - i))
        i *= 2

        print(start_dt)
        print(end_dt)
        #loop.close()

    return avg_time_run


print(run_tasks())