将异步可迭代对象转换为同步可迭代列表的内置方法

Builtin way to transform asynchronous iterable to synchronous iterable list

Python3.6 现在 asynchronous iterables。是否有内置方法将异步可迭代对象转换为同步可迭代对象。

我目前有这个辅助函数,但感觉非常不符合 Python 风格。有更好的方法吗?

async def aiter_to_list(aiter):
    l = []
    async for i in aiter:
        l.append(i)
    return l

您的 "asynchronous to synchronous" 助手本身是异步的;根本没有什么大的变化。一般来说:不,你不能使异步同步。将提供一个异步值 "sometime later";你不能把它变成 "now" 因为这个值不存在 "now" 并且你 将不得不 异步等待它。

您可以使用 aiostream.stream.list:

from aiostream import stream

async def agen():
    yield 1
    yield 2
    yield 3

async def main():
    lst = await stream.list(agen())
    print(lst)  # prints [1, 2, 3]

documentation 中有更多运算符和示例。

这些函数允许您从 / 转换为 iterable <==> async iterable,而不仅仅是简单的列表。

基本进口

import asyncio
import threading
import time

DONE = object()
TIMEOUT = 0.001

函数 to_sync_iterable 会将任何异步可迭代对象转换为同步可迭代对象:

def to_sync_iterable(async_iterable, maxsize = 0):

    def sync_iterable():

        queue = asyncio.Queue(maxsize=maxsize)
        loop = asyncio.get_event_loop()

        t = threading.Thread(target=_run_coroutine, args=(loop, async_iterable, queue))
        t.daemon = True
        t.start()

        while True:
            if not queue.empty():
                x = queue.get_nowait()

                if x is DONE:
                    break
                else:
                    yield x
            else:
                time.sleep(utils.TIMEOUT)

        t.join()

    return sync_iterable()

def _run_coroutine(loop, async_iterable, queue):

    loop.run_until_complete(_consume_async_iterable(async_iterable, queue))

async def _consume_async_iterable(async_iterable, queue):

    async for x in async_iterable:
        await queue.put(x)

    await queue.put(DONE)

你可以这样使用它:

async def slow_async_generator():
    yield 0

    await asyncio.sleep(1)
    yield 1

    await asyncio.sleep(1)
    yield 2

    await asyncio.sleep(1)
    yield 3


for x in to_sync_iterable(slow_async_generator()):
    print(x)

函数 to_async_iterable 会将任何同步可迭代对象转换为异步可迭代对象:

def to_async_iterable(iterable, maxsize = 0):

    async def async_iterable():
        queue = asyncio.Queue(maxsize=maxsize)
        loop = asyncio.get_event_loop()
        task = loop.run_in_executor(None, lambda: _consume_iterable(loop, iterable, queue))

        while True:
            x = await queue.get()

            if x is DONE:
                break
            else:
                yield x

        await task


    return async_iterable()

def _consume_iterable(loop, iterable, queue):

    for x in iterable:
        while True:
            if not queue.full():
                loop.call_soon_threadsafe(queue.put_nowait, x)
                break
            else:
                time.sleep(TIMEOUT)

    while True:
        if not queue.full():
            loop.call_soon_threadsafe(queue.put_nowait, DONE)
            break
        else:
            time.sleep(TIMEOUT)

这个对asyncio程序特别有用,因为它不会阻塞事件循环,即使sync iterable阻塞了。你可以这样使用它:

def slow_sync_generator():
    yield 0

    time.sleep(1)
    yield 1

    time.sleep(1)
    yield 2

    time.sleep(1)
    yield 3

async def async_task():
    async for x in to_async_iterable(slow_sync_generator()):
        print(x)

asyncio.get_event_loop().run_until_complete(async_task())

Python 3.6你可以使用Asynchronous Comprehensions

async def async_iter():
    for i in range(0,5):
        yield i

# async comprehension
sync_list = [gen async for gen in async_iter()]

print(sync_list) # [0, 1, 2, 3, 4]