同步睡眠进入asyncio协程
Synchronous sleep into asyncio coroutine
我有一个协程如下:
async def download():
downloader = DataManager()
downloader.download()
DataManager.download()
方法如下所示:
def download(self):
start_multiple_docker_containers()
while True:
check_containers_statuses()
sleep(N) # synchronous sleep from time module
这是一个好习惯吗?如果不是,我如何在 download()
中使用 asyncio.sleep
?
或者这样的代码结构在概念上是错误的?
这很可能是一种不好的做法,因为 time.sleep() 会阻止所有内容,而您只想阻止特定的协程(我猜)。
您正在异步世界中进行同步操作。
下面的模式呢?
async def download():
downloader = DataManager()
downloader.start_multiple_docker_containers()
while True:
downloader.check_containers_statuses()
await syncio.sleep(N)
这是我的解决方案:
import asyncio
import time
# Mocks of domain-specific functions
# ----------------------------------
def get_container_status(container_id, initial_time):
"""This mocks container status to change to 'exited' in 10 seconds"""
if time.time() - initial_time < 10:
print("%s: container %s still running" % (time.time(), container_id))
return 'running'
else:
print("%s: container %s exited" % (time.time(), container_id))
return 'exited'
def is_download_complete(container_id, initial_time):
"""This mocks download finished in 20 seconds after program's start"""
if time.time() - initial_time < 20:
print("%s: download from %s in progress" % (time.time(), container_id))
return False
else:
print("%s: download from %s done" % (time.time(), container_id))
return True
def get_downloaded_data(container_id):
return "foo"
# Coroutines
# ----------
async def container_exited(container_id, initial_time):
while True:
await asyncio.sleep(1) # == setTimeout(1000), != sleep(1000)
if get_container_status(container_id, initial_time) == 'exited':
return container_id
async def download_data_by_container_id(container_id, initial_time):
container_id = await container_exited(container_id, initial_time)
while True:
await asyncio.sleep(1)
if is_download_complete(container_id, initial_time):
return get_downloaded_data(container_id)
# Main loop
# ---------
if __name__ == "__main__":
initial_time = time.time()
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(download_data_by_container_id("A", initial_time)),
asyncio.ensure_future(download_data_by_container_id("B", initial_time))
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
结果:
1487334722.321165: container A still running
1487334722.321412: container B still running
1487334723.325897: container A still running
1487334723.3259578: container B still running
1487334724.3285959: container A still running
1487334724.328662: container B still running
1487334725.3312798: container A still running
1487334725.331337: container B still running
1487334726.3340318: container A still running
1487334726.33409: container B still running
1487334727.336779: container A still running
1487334727.336842: container B still running
1487334728.339425: container A still running
1487334728.339506: container B still running
1487334729.34211: container A still running
1487334729.342168: container B still running
1487334730.3448708: container A still running
1487334730.34493: container B still running
1487334731.34754: container A exited
1487334731.347598: container B exited
1487334732.350253: download from A in progress
1487334732.3503108: download from B in progress
1487334733.354369: download from A in progress
1487334733.354424: download from B in progress
1487334734.354686: download from A in progress
1487334734.3548028: download from B in progress
1487334735.358371: download from A in progress
1487334735.358461: download from B in progress
1487334736.3610592: download from A in progress
1487334736.361115: download from B in progress
1487334737.363115: download from A in progress
1487334737.363211: download from B in progress
1487334738.3664992: download from A in progress
1487334738.36656: download from B in progress
1487334739.369131: download from A in progress
1487334739.36919: download from B in progress
1487334740.371079: download from A in progress
1487334740.37119: download from B in progress
1487334741.374521: download from A done
1487334741.3745651: download from B done
至于 sleep()
功能 - 不,您不应该使用它。它会阻塞整个 python 解释器 1 秒,这不是您想要的。
记住,你没有并行(线程等),你有并发。
即你有一个只有 1 个执行线程的 python 解释器,你的主循环和所有协程 运行 相互抢占。您希望您的解释器将 99.999% 的工作时间花在由 asyncio 创建的主循环中,轮询套接字并等待超时。
你所有的协程都应该return尽可能快并且绝对不应该包含阻塞sleep
- 如果你调用它,它会阻塞整个解释器并阻止主循环从套接字获取信息或 运行ning 协程响应数据,到达那些套接字。
因此,您应该等待 asyncio.sleep()
,它本质上等同于 Javascript 的 setTimeout()
- 它只是告诉主循环在特定时间它应该唤醒这个协程并继续 运行ning 它。
推荐阅读:
我是 asyncio 的新手,但似乎如果你 运行 像这样同步代码
f = app.loop.run_in_executor(None, your_sync_function, app,param1,param2,...)
然后 your_sync_function
运行 在一个单独的线程中,您可以在不干扰异步循环的情况下执行 time.sleep()
。它会阻塞循环执行器的线程,但不会阻塞 asyncio 线程。至少,它看起来是这样的。
如果您想将来自 your_sync_function
的消息发送回 asyncio 循环,请查看 janus
库
关于此的更多提示:
https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html
我有一个协程如下:
async def download():
downloader = DataManager()
downloader.download()
DataManager.download()
方法如下所示:
def download(self):
start_multiple_docker_containers()
while True:
check_containers_statuses()
sleep(N) # synchronous sleep from time module
这是一个好习惯吗?如果不是,我如何在 download()
中使用 asyncio.sleep
?
或者这样的代码结构在概念上是错误的?
这很可能是一种不好的做法,因为 time.sleep() 会阻止所有内容,而您只想阻止特定的协程(我猜)。
您正在异步世界中进行同步操作。
下面的模式呢?
async def download():
downloader = DataManager()
downloader.start_multiple_docker_containers()
while True:
downloader.check_containers_statuses()
await syncio.sleep(N)
这是我的解决方案:
import asyncio
import time
# Mocks of domain-specific functions
# ----------------------------------
def get_container_status(container_id, initial_time):
"""This mocks container status to change to 'exited' in 10 seconds"""
if time.time() - initial_time < 10:
print("%s: container %s still running" % (time.time(), container_id))
return 'running'
else:
print("%s: container %s exited" % (time.time(), container_id))
return 'exited'
def is_download_complete(container_id, initial_time):
"""This mocks download finished in 20 seconds after program's start"""
if time.time() - initial_time < 20:
print("%s: download from %s in progress" % (time.time(), container_id))
return False
else:
print("%s: download from %s done" % (time.time(), container_id))
return True
def get_downloaded_data(container_id):
return "foo"
# Coroutines
# ----------
async def container_exited(container_id, initial_time):
while True:
await asyncio.sleep(1) # == setTimeout(1000), != sleep(1000)
if get_container_status(container_id, initial_time) == 'exited':
return container_id
async def download_data_by_container_id(container_id, initial_time):
container_id = await container_exited(container_id, initial_time)
while True:
await asyncio.sleep(1)
if is_download_complete(container_id, initial_time):
return get_downloaded_data(container_id)
# Main loop
# ---------
if __name__ == "__main__":
initial_time = time.time()
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(download_data_by_container_id("A", initial_time)),
asyncio.ensure_future(download_data_by_container_id("B", initial_time))
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
结果:
1487334722.321165: container A still running
1487334722.321412: container B still running
1487334723.325897: container A still running
1487334723.3259578: container B still running
1487334724.3285959: container A still running
1487334724.328662: container B still running
1487334725.3312798: container A still running
1487334725.331337: container B still running
1487334726.3340318: container A still running
1487334726.33409: container B still running
1487334727.336779: container A still running
1487334727.336842: container B still running
1487334728.339425: container A still running
1487334728.339506: container B still running
1487334729.34211: container A still running
1487334729.342168: container B still running
1487334730.3448708: container A still running
1487334730.34493: container B still running
1487334731.34754: container A exited
1487334731.347598: container B exited
1487334732.350253: download from A in progress
1487334732.3503108: download from B in progress
1487334733.354369: download from A in progress
1487334733.354424: download from B in progress
1487334734.354686: download from A in progress
1487334734.3548028: download from B in progress
1487334735.358371: download from A in progress
1487334735.358461: download from B in progress
1487334736.3610592: download from A in progress
1487334736.361115: download from B in progress
1487334737.363115: download from A in progress
1487334737.363211: download from B in progress
1487334738.3664992: download from A in progress
1487334738.36656: download from B in progress
1487334739.369131: download from A in progress
1487334739.36919: download from B in progress
1487334740.371079: download from A in progress
1487334740.37119: download from B in progress
1487334741.374521: download from A done
1487334741.3745651: download from B done
至于 sleep()
功能 - 不,您不应该使用它。它会阻塞整个 python 解释器 1 秒,这不是您想要的。
记住,你没有并行(线程等),你有并发。
即你有一个只有 1 个执行线程的 python 解释器,你的主循环和所有协程 运行 相互抢占。您希望您的解释器将 99.999% 的工作时间花在由 asyncio 创建的主循环中,轮询套接字并等待超时。
你所有的协程都应该return尽可能快并且绝对不应该包含阻塞sleep
- 如果你调用它,它会阻塞整个解释器并阻止主循环从套接字获取信息或 运行ning 协程响应数据,到达那些套接字。
因此,您应该等待 asyncio.sleep()
,它本质上等同于 Javascript 的 setTimeout()
- 它只是告诉主循环在特定时间它应该唤醒这个协程并继续 运行ning 它。
推荐阅读:
我是 asyncio 的新手,但似乎如果你 运行 像这样同步代码
f = app.loop.run_in_executor(None, your_sync_function, app,param1,param2,...)
然后 your_sync_function
运行 在一个单独的线程中,您可以在不干扰异步循环的情况下执行 time.sleep()
。它会阻塞循环执行器的线程,但不会阻塞 asyncio 线程。至少,它看起来是这样的。
如果您想将来自 your_sync_function
的消息发送回 asyncio 循环,请查看 janus
库
关于此的更多提示:
https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html