使用 python 管理事件循环的早期 return
manage early return of event loop with python
我有一个服务运行正在执行以下循环
while True:
feedback = f1()
if check1(feedback):
break
feedback = f2()
if check2(feedback):
break
feedback = f3()
if check3(feedback):
break
time.sleep(10)
do_cleanup(feedback)
现在我想运行以不同的时间间隔检查这些反馈。一种天真的方法是将 time.sleep()
移动到 f
函数中。但这会导致阻塞。实现不同时间间隔的定期检查的最简单方法是什么?这里所有 f
函数都比 运行.
便宜
asyncio
中的事件循环听起来很不错。但是由于我的经验不足,我不知道check
和break
逻辑应该去哪里进行事件循环。
或者是否有任何其他 packages/code 模式来执行这种监控逻辑?
在 asyncio 中,您可以将服务拆分为三个独立的任务,每个任务都有自己的循环和计时 - 您可以将它们视为三个线程,除了它们都在同一个线程中调度,并且多任务协作暂停在 await
.
为此,让我们从调用函数并定期检查其结果的实用函数开始:
async def at_interval(f, check, seconds):
while True:
feedback = f()
if check(feedback):
return feedback
await asyncio.sleep(seconds)
return
相当于您原始代码中的 break
。
有了它,服务会产生三个这样的循环并等待其中任何一个完成。哪个先完成就携带我们正在等待的"feedback",我们可以处理掉其他的。
async def service():
loop = asyncio.get_event_loop()
t1 = loop.create_task(at_interval(f1, check1, 3))
t2 = loop.create_task(at_interval(f2, check2, 5))
t3 = loop.create_task(at_interval(f3, check3, 7))
done, pending = await asyncio.wait(
[t1, t2, t3], return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
feedback = await list(done)[0]
do_cleanup(feedback)
asyncio.get_event_loop().run_until_complete(service())
这与您的代码之间的一个小区别是,在 service
接受之前,这里有可能(尽管不太可能)多次检查失败。例如,如果由于运气不佳,上述任务中的两个最终共享唤醒的绝对时间到微秒,那么它们将被安排在同一个事件循环迭代中。两者都将 return 来自其对应的 at_interval
协程,并且 done
将包含多个反馈。代码通过选择一个反馈并对该反馈调用 do_cleanup
来处理它,但它也可以遍历所有反馈。
如果这是不可接受的,您可以轻松地向每个 at_interval
传递一个可取消除自身以外的所有任务的可调用对象。为简洁起见,目前在 service
中完成,但也可以在 at_interval
中完成。一项任务取消其他任务将确保只能存在一个反馈。
我有一个服务运行正在执行以下循环
while True:
feedback = f1()
if check1(feedback):
break
feedback = f2()
if check2(feedback):
break
feedback = f3()
if check3(feedback):
break
time.sleep(10)
do_cleanup(feedback)
现在我想运行以不同的时间间隔检查这些反馈。一种天真的方法是将 time.sleep()
移动到 f
函数中。但这会导致阻塞。实现不同时间间隔的定期检查的最简单方法是什么?这里所有 f
函数都比 运行.
asyncio
中的事件循环听起来很不错。但是由于我的经验不足,我不知道check
和break
逻辑应该去哪里进行事件循环。
或者是否有任何其他 packages/code 模式来执行这种监控逻辑?
在 asyncio 中,您可以将服务拆分为三个独立的任务,每个任务都有自己的循环和计时 - 您可以将它们视为三个线程,除了它们都在同一个线程中调度,并且多任务协作暂停在 await
.
为此,让我们从调用函数并定期检查其结果的实用函数开始:
async def at_interval(f, check, seconds):
while True:
feedback = f()
if check(feedback):
return feedback
await asyncio.sleep(seconds)
return
相当于您原始代码中的 break
。
有了它,服务会产生三个这样的循环并等待其中任何一个完成。哪个先完成就携带我们正在等待的"feedback",我们可以处理掉其他的。
async def service():
loop = asyncio.get_event_loop()
t1 = loop.create_task(at_interval(f1, check1, 3))
t2 = loop.create_task(at_interval(f2, check2, 5))
t3 = loop.create_task(at_interval(f3, check3, 7))
done, pending = await asyncio.wait(
[t1, t2, t3], return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
feedback = await list(done)[0]
do_cleanup(feedback)
asyncio.get_event_loop().run_until_complete(service())
这与您的代码之间的一个小区别是,在 service
接受之前,这里有可能(尽管不太可能)多次检查失败。例如,如果由于运气不佳,上述任务中的两个最终共享唤醒的绝对时间到微秒,那么它们将被安排在同一个事件循环迭代中。两者都将 return 来自其对应的 at_interval
协程,并且 done
将包含多个反馈。代码通过选择一个反馈并对该反馈调用 do_cleanup
来处理它,但它也可以遍历所有反馈。
如果这是不可接受的,您可以轻松地向每个 at_interval
传递一个可取消除自身以外的所有任务的可调用对象。为简洁起见,目前在 service
中完成,但也可以在 at_interval
中完成。一项任务取消其他任务将确保只能存在一个反馈。