twisted:在同步代码中处理传入事件

twisted: processing incoming events in synchronous code

假设在一个需要很长时间执行的双绞线 Python 程序中有一个同步函数,需要大量合理大小的工作来完成。如果该函数可以 return 延迟,这将是一个明智的选择,但是该函数恰好位于某些同步代码的深处,因此不可能产生延迟以继续。

有没有办法让 twisted 在不离开那个函数的情况下处理未完成的事件? IE。我想做的是

def my_func():
    results = []
    for item in a_lot_of_items():
        results.append(do_computation(item))
        reactor.process_outstanding_events()
    return results

当然,这对代码提出了可重入要求,但是,Qt 中仍然有 QCoreApplication.processEvents,twisted 中有什么吗?

你可以使用 deferToThread。

http://twistedmatrix.com/documents/13.2.0/core/howto/threading.html

该方法在单独的线程中运行您的计算,returns 在计算实际完成时回调的延迟。

应该这样做:

for item in items:
    reactor.callLater(0, heavy_func, item)

reactor.callLater 应该会让您回到事件循环中。

问题是如果 do_heavy_computation() 是阻塞的代码,那么执行将不会转到下一个函数。在这种情况下,使用 deferToThreadblockingCallFromThread 进行繁重的计算。另外,如果您不关心计算结果,则可以使用 callInThread。看看documentation on threads

一些基于事件循环的系统采用的解决方案(本质上是您通过 Qt 的 QCoreApplication.processEvents API 引用的解决方案)是使主循环可重入。在 Twisted 术语中,这意味着类似 (not working code):

def my_expensive_task_that_cannot_be_asynchronous():
    @inlineCallbacks
    def do_work(units):
        for unit in units:
            yield do_one_work_asynchronously(unit)
    work = do_work(some_work_units())
    work.addBoth(lambda ignored: reactor.stop())
    reactor.run()

def main():
    # Whatever your setup is...
    # Then, hypothetical event source triggering your
    # expensive function:
    reactor.callLater(
        30,
        my_expensive_task_that_cannot_be_asynchronous,
    )
    reactor.run()

注意这个程序中有 两个 reactor.run 调用。如果 Twisted 有一个可重入的事件循环,第二次调用将再次开始旋转反应器,而不是 return,直到遇到匹配的 reactor.stop 调用。反应器将处理它知道的所有事件,而不仅仅是 do_work 生成的事件,因此您将拥有您想要的行为。

这需要一个可重入的事件循环,因为 my_expensive_task_... 已经被反应器循环调用。反应堆循环在调用堆栈上。然后,调用 reactor.run 并且反应器循环现在再次 在调用堆栈上。所以通常的问题适用:事件循环不能在其框架中留下状态(否则它可能在嵌套调用完成时无效),它不能在任何调用其他代码期间保持其实例状态不一致,等等。

Twisted 没有可重入事件循环。这是一个已经被考虑过的特性,至少在过去被明确拒绝了。支持此功能会给实施 应用程序带来大量额外的复杂性(如上所述)。如果事件循环是可重入的,那么就很难避免要求所有应用程序代码也是可重入安全的。这否定了 Twisted 对并发采用的协作式多任务处理方法的主要好处之一(保证您的函数不会被重新输入)。

因此,当使用 Twisted 时,此解决方案已失效。

我不知道还有其他解决方案可以让您在反应器线程中继续 运行 此代码。您提到有问题的代码深深地嵌套在其他一些同步代码中。想到的其他选项是:

  • 使同步代码能够处理异步事物
  • 首先分解出昂贵的部分并计算它们,然后将结果传递给其余代码
  • 运行 所有 该代码,而不仅仅是计算量大的部分,在另一个线程中