Python 中 "collect and execute in a bunch" 的最佳实践
Best practice to "collect and execute in a bunch" in Python
我只是资助自己实施一个基于计时器的版本 "handle a list of events as a bunch" 以保护资源 - 再次 - 我想知道是否有一个很好的通用 pythonic 方法。
您可能知道这一点:您正在处理重复发生的事件,例如鼠标移动、文件系统更改等,并且您必须进行一些计算作为对这些事件的反应,但如果您能稍微休息一下就更好了在事件流中处理它们。可能是因为较旧的事件会因较新的事件而失效(并且足以处理最旧的事件)或者因为事件可以以某种方式压缩在一起。
示例是:鼠标移动(仅绘制最新位置),"auto save" 在编辑器中或在文件系统上自动同步,或(在我的示例中)监控文件系统更改并重新编译某些东西。
通常我会查看如何使用 Timer
并考虑如何避免额外的线程,并为一个在我看来非常简单的问题提出一些半成品但复杂的解决方案。
出现很多问题:
- 如何避免并发处理(例如,如果我使用
threading.Timer
并启动一个线程来完成工作)
- 如何确保事件处理有时间限制(以防事件连续传入不间断)
- 如何尽可能避免线程
- 如何避免创建过于复杂的框架
- (随你便)
我想要的是这样工作的东西:
timer = SomeContinuousTimer()
new_events = []
while True:
event = wait_for(inotify_adapter.event_gen(), timer.timeout())
if event == timer.TIMEOUT:
my_handler_func(new_events)
else:
new_events.append(event)
timer.restart(1500)
但是 wait_for
必须像 select
那样工作,为此我需要文件描述符,上面的代码已经比我实际预期的要多一些。
我会很高兴的是这样使用:
bunch_handler = BunchHandler()
new_events = []
def read_events():
for event in inotify_adapter.event_gen():
new_events.append(event)
while True:
# will run `read_events` asynchronously until 1.5sec have passed since the
# last event
bunch_handler.read(read_fn=read_events, bunch_wait=1500)
handle_events(new_events)
这是我应该使用 async
/ await
的典型场景吗?对于 async
不是一个选项的情况,是否有框架?是否有针对这种情况的异步框架?
这不是很好,但它做了我想要的,并且可以作为一个例子来说明我在说什么:)
import asyncio
import time
async def event_collector(*, listener_fn, bunch_wait=1.0, max_wait=2.0):
"""Wait for (but don't handle) events and wait for a maximum of @bunch_wait seconds after the
last event before returning. Force return after @max_wait seconds"""
max_time_task = asyncio.Task(asyncio.sleep(max_wait))
while True:
resetable = asyncio.Task(asyncio.sleep(bunch_wait))
done, _ = await asyncio.wait(
{listener_fn.__anext__(), resetable, max_time_task},
return_when=asyncio.FIRST_COMPLETED)
if resetable in done or max_time_task in done:
return
resetable.cancel()
async def generate_events(events):
"""Simulates bursts of events with side-effects"""
while True:
for i in range(5):
await asyncio.sleep(.01)
events.append(i)
print("*" * len(events))
yield
await asyncio.sleep(3.200)
def handle_events(events):
"""Simulates an event handler operating on a given structure"""
print("Handle %d events" % len(events))
events.clear()
async def main():
new_events = []
t = time.time()
while True:
await event_collector(listener_fn=generate_events(new_events), bunch_wait=1.1, max_wait=2.2)
now = time.time()
print("%.2f" % (now - t))
t = now
handle_events(new_events)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这种方法有一些缺点:
* 您需要使用 async
异步监听事件
* event_collector 将在 max_wait
秒后 return 无论是否已经看到任何事件(因此如果没有事件发生,它就像超时一样)
* 而不是 重置 计时器,每次都会创建一个新计时器
我只是资助自己实施一个基于计时器的版本 "handle a list of events as a bunch" 以保护资源 - 再次 - 我想知道是否有一个很好的通用 pythonic 方法。
您可能知道这一点:您正在处理重复发生的事件,例如鼠标移动、文件系统更改等,并且您必须进行一些计算作为对这些事件的反应,但如果您能稍微休息一下就更好了在事件流中处理它们。可能是因为较旧的事件会因较新的事件而失效(并且足以处理最旧的事件)或者因为事件可以以某种方式压缩在一起。
示例是:鼠标移动(仅绘制最新位置),"auto save" 在编辑器中或在文件系统上自动同步,或(在我的示例中)监控文件系统更改并重新编译某些东西。
通常我会查看如何使用 Timer
并考虑如何避免额外的线程,并为一个在我看来非常简单的问题提出一些半成品但复杂的解决方案。
出现很多问题:
- 如何避免并发处理(例如,如果我使用
threading.Timer
并启动一个线程来完成工作) - 如何确保事件处理有时间限制(以防事件连续传入不间断)
- 如何尽可能避免线程
- 如何避免创建过于复杂的框架
- (随你便)
我想要的是这样工作的东西:
timer = SomeContinuousTimer()
new_events = []
while True:
event = wait_for(inotify_adapter.event_gen(), timer.timeout())
if event == timer.TIMEOUT:
my_handler_func(new_events)
else:
new_events.append(event)
timer.restart(1500)
但是 wait_for
必须像 select
那样工作,为此我需要文件描述符,上面的代码已经比我实际预期的要多一些。
我会很高兴的是这样使用:
bunch_handler = BunchHandler()
new_events = []
def read_events():
for event in inotify_adapter.event_gen():
new_events.append(event)
while True:
# will run `read_events` asynchronously until 1.5sec have passed since the
# last event
bunch_handler.read(read_fn=read_events, bunch_wait=1500)
handle_events(new_events)
这是我应该使用 async
/ await
的典型场景吗?对于 async
不是一个选项的情况,是否有框架?是否有针对这种情况的异步框架?
这不是很好,但它做了我想要的,并且可以作为一个例子来说明我在说什么:)
import asyncio
import time
async def event_collector(*, listener_fn, bunch_wait=1.0, max_wait=2.0):
"""Wait for (but don't handle) events and wait for a maximum of @bunch_wait seconds after the
last event before returning. Force return after @max_wait seconds"""
max_time_task = asyncio.Task(asyncio.sleep(max_wait))
while True:
resetable = asyncio.Task(asyncio.sleep(bunch_wait))
done, _ = await asyncio.wait(
{listener_fn.__anext__(), resetable, max_time_task},
return_when=asyncio.FIRST_COMPLETED)
if resetable in done or max_time_task in done:
return
resetable.cancel()
async def generate_events(events):
"""Simulates bursts of events with side-effects"""
while True:
for i in range(5):
await asyncio.sleep(.01)
events.append(i)
print("*" * len(events))
yield
await asyncio.sleep(3.200)
def handle_events(events):
"""Simulates an event handler operating on a given structure"""
print("Handle %d events" % len(events))
events.clear()
async def main():
new_events = []
t = time.time()
while True:
await event_collector(listener_fn=generate_events(new_events), bunch_wait=1.1, max_wait=2.2)
now = time.time()
print("%.2f" % (now - t))
t = now
handle_events(new_events)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这种方法有一些缺点:
* 您需要使用 async
异步监听事件
* event_collector 将在 max_wait
秒后 return 无论是否已经看到任何事件(因此如果没有事件发生,它就像超时一样)
* 而不是 重置 计时器,每次都会创建一个新计时器