将异步可迭代对象转换为同步可迭代列表的内置方法
Builtin way to transform asynchronous iterable to synchronous iterable list
Python3.6 现在 asynchronous iterables。是否有内置方法将异步可迭代对象转换为同步可迭代对象。
我目前有这个辅助函数,但感觉非常不符合 Python 风格。有更好的方法吗?
async def aiter_to_list(aiter):
l = []
async for i in aiter:
l.append(i)
return l
您的 "asynchronous to synchronous" 助手本身是异步的;根本没有什么大的变化。一般来说:不,你不能使异步同步。将提供一个异步值 "sometime later";你不能把它变成 "now" 因为这个值不存在 "now" 并且你 将不得不 异步等待它。
您可以使用 aiostream.stream.list:
from aiostream import stream
async def agen():
yield 1
yield 2
yield 3
async def main():
lst = await stream.list(agen())
print(lst) # prints [1, 2, 3]
documentation 中有更多运算符和示例。
这些函数允许您从 / 转换为 iterable
<==> async iterable
,而不仅仅是简单的列表。
基本进口
import asyncio
import threading
import time
DONE = object()
TIMEOUT = 0.001
函数 to_sync_iterable
会将任何异步可迭代对象转换为同步可迭代对象:
def to_sync_iterable(async_iterable, maxsize = 0):
def sync_iterable():
queue = asyncio.Queue(maxsize=maxsize)
loop = asyncio.get_event_loop()
t = threading.Thread(target=_run_coroutine, args=(loop, async_iterable, queue))
t.daemon = True
t.start()
while True:
if not queue.empty():
x = queue.get_nowait()
if x is DONE:
break
else:
yield x
else:
time.sleep(utils.TIMEOUT)
t.join()
return sync_iterable()
def _run_coroutine(loop, async_iterable, queue):
loop.run_until_complete(_consume_async_iterable(async_iterable, queue))
async def _consume_async_iterable(async_iterable, queue):
async for x in async_iterable:
await queue.put(x)
await queue.put(DONE)
你可以这样使用它:
async def slow_async_generator():
yield 0
await asyncio.sleep(1)
yield 1
await asyncio.sleep(1)
yield 2
await asyncio.sleep(1)
yield 3
for x in to_sync_iterable(slow_async_generator()):
print(x)
函数 to_async_iterable 会将任何同步可迭代对象转换为异步可迭代对象:
def to_async_iterable(iterable, maxsize = 0):
async def async_iterable():
queue = asyncio.Queue(maxsize=maxsize)
loop = asyncio.get_event_loop()
task = loop.run_in_executor(None, lambda: _consume_iterable(loop, iterable, queue))
while True:
x = await queue.get()
if x is DONE:
break
else:
yield x
await task
return async_iterable()
def _consume_iterable(loop, iterable, queue):
for x in iterable:
while True:
if not queue.full():
loop.call_soon_threadsafe(queue.put_nowait, x)
break
else:
time.sleep(TIMEOUT)
while True:
if not queue.full():
loop.call_soon_threadsafe(queue.put_nowait, DONE)
break
else:
time.sleep(TIMEOUT)
这个对asyncio程序特别有用,因为它不会阻塞事件循环,即使sync iterable阻塞了。你可以这样使用它:
def slow_sync_generator():
yield 0
time.sleep(1)
yield 1
time.sleep(1)
yield 2
time.sleep(1)
yield 3
async def async_task():
async for x in to_async_iterable(slow_sync_generator()):
print(x)
asyncio.get_event_loop().run_until_complete(async_task())
从Python 3.6你可以使用Asynchronous Comprehensions
async def async_iter():
for i in range(0,5):
yield i
# async comprehension
sync_list = [gen async for gen in async_iter()]
print(sync_list) # [0, 1, 2, 3, 4]
Python3.6 现在 asynchronous iterables。是否有内置方法将异步可迭代对象转换为同步可迭代对象。
我目前有这个辅助函数,但感觉非常不符合 Python 风格。有更好的方法吗?
async def aiter_to_list(aiter):
l = []
async for i in aiter:
l.append(i)
return l
您的 "asynchronous to synchronous" 助手本身是异步的;根本没有什么大的变化。一般来说:不,你不能使异步同步。将提供一个异步值 "sometime later";你不能把它变成 "now" 因为这个值不存在 "now" 并且你 将不得不 异步等待它。
您可以使用 aiostream.stream.list:
from aiostream import stream
async def agen():
yield 1
yield 2
yield 3
async def main():
lst = await stream.list(agen())
print(lst) # prints [1, 2, 3]
documentation 中有更多运算符和示例。
这些函数允许您从 / 转换为 iterable
<==> async iterable
,而不仅仅是简单的列表。
基本进口
import asyncio
import threading
import time
DONE = object()
TIMEOUT = 0.001
函数 to_sync_iterable
会将任何异步可迭代对象转换为同步可迭代对象:
def to_sync_iterable(async_iterable, maxsize = 0):
def sync_iterable():
queue = asyncio.Queue(maxsize=maxsize)
loop = asyncio.get_event_loop()
t = threading.Thread(target=_run_coroutine, args=(loop, async_iterable, queue))
t.daemon = True
t.start()
while True:
if not queue.empty():
x = queue.get_nowait()
if x is DONE:
break
else:
yield x
else:
time.sleep(utils.TIMEOUT)
t.join()
return sync_iterable()
def _run_coroutine(loop, async_iterable, queue):
loop.run_until_complete(_consume_async_iterable(async_iterable, queue))
async def _consume_async_iterable(async_iterable, queue):
async for x in async_iterable:
await queue.put(x)
await queue.put(DONE)
你可以这样使用它:
async def slow_async_generator():
yield 0
await asyncio.sleep(1)
yield 1
await asyncio.sleep(1)
yield 2
await asyncio.sleep(1)
yield 3
for x in to_sync_iterable(slow_async_generator()):
print(x)
函数 to_async_iterable 会将任何同步可迭代对象转换为异步可迭代对象:
def to_async_iterable(iterable, maxsize = 0):
async def async_iterable():
queue = asyncio.Queue(maxsize=maxsize)
loop = asyncio.get_event_loop()
task = loop.run_in_executor(None, lambda: _consume_iterable(loop, iterable, queue))
while True:
x = await queue.get()
if x is DONE:
break
else:
yield x
await task
return async_iterable()
def _consume_iterable(loop, iterable, queue):
for x in iterable:
while True:
if not queue.full():
loop.call_soon_threadsafe(queue.put_nowait, x)
break
else:
time.sleep(TIMEOUT)
while True:
if not queue.full():
loop.call_soon_threadsafe(queue.put_nowait, DONE)
break
else:
time.sleep(TIMEOUT)
这个对asyncio程序特别有用,因为它不会阻塞事件循环,即使sync iterable阻塞了。你可以这样使用它:
def slow_sync_generator():
yield 0
time.sleep(1)
yield 1
time.sleep(1)
yield 2
time.sleep(1)
yield 3
async def async_task():
async for x in to_async_iterable(slow_sync_generator()):
print(x)
asyncio.get_event_loop().run_until_complete(async_task())
从Python 3.6你可以使用Asynchronous Comprehensions
async def async_iter():
for i in range(0,5):
yield i
# async comprehension
sync_list = [gen async for gen in async_iter()]
print(sync_list) # [0, 1, 2, 3, 4]