运行 iPython 中的扭曲反应堆

Running twisted reactor in iPython

我知道这通常是用 twistd 完成的,但我想使用 iPython 来测试扭曲代码上的代码 'live'。

How to start twisted's reactor from ipython 问了基本相同的问题,但第一个解决方案不再适用于当前 ipython/twisted,而第二个也不可用(线程引发多个错误)。

https://gist.github.com/kived/8721434 有一个叫做 TPython 的东西声称可以做到这一点,但是 运行 似乎可以工作,除了客户端永远不会连接到服务器(而 运行 相同的客户端在python shell).

使用 Conch Manhole 吗,或者有没有办法让 iPython 玩得更好(可能使用 _threadedselect)。

作为参考,我要求使用 ipython 5.0.0,python 2.7.12,扭曲的 16.4.1

虽然这没有回答我认为的问题,但它确实回答了(某种程度上)我发布的问题。嵌入 ipython 的工作原理是您可以使用反应堆 运行.

访问业务对象
from twisted.internet import reactor
from twisted.internet.endpoints import serverFromString
from myfactory import MyFactory

class MyClass(object):
    def __init__(self, **kwargs):
        super(MyClass, self).__init__(**kwargs)
        server = serverFromString(reactor, 'tcp:12345')
        server.list(MyFactory(self))
        def interact():
            import IPython
            IPython.embed()
        reactor.callInThread(interact)

if __name__ == "__main__":
    myclass = MyClass()
    reactor.run()

python myclass.py 或类似的方式调用上面的内容。

一般来说,异步代码在实时解释器中 运行 可能会很麻烦。最好只在后台 运行 异步脚本并在单独的解释器中执行 iPython 操作。您可以使用文件或 TCP 进行相互通信。如果这让您难以理解,那是因为它并不总是那么简单,最好避免可能的麻烦。

但是,您会很高兴知道有一个名为 crochet 的很棒的项目可以在非异步应用程序中使用 Twisted。它确实是我最喜欢的模块之一,令我震惊的是它没有得到更广泛的使用(不过你可以改变它;D)。 crochet 模块有一个 run_in_reactor 装饰器,它 运行 是一个独立线程中的 Twisted 反应器,由 crochet 本身管理。这是一个快速 class 示例,它执行对星球大战 RESTFul API 的请求,然后将 JSON 响应存储在列表中。

from __future__ import print_function
import json

from twisted.internet import defer, task
from twisted.web.client import getPage

from crochet import run_in_reactor, setup as setup_crochet
setup_crochet()

class StarWarsPeople(object):
    people_id = [_id for _id in range(1, 89)]
    people = []

    @run_in_reactor
    def requestPeople(self):
        """
        Request Star Wars JSON data from the SWAPI site.
        This occurs in a Twisted reactor in a separate thread.
        """
        for _id in self.people_id:
            url = 'http://swapi.co/api/people/{0}'.format(_id).encode('utf-8')
            d = getPage(url)
            d.addCallback(self.appendJSON)

    def appendJSON(self, response):
        """
        A callback which will take the response from the getPage() request, 
        convert it to JSON, then append it to self.people, which can be 
        accessed outside of the crochet thread.
        """
        response_json = json.loads(response.decode('utf-8'))
        #print(response_json)    # uncomment if you want to see output
        self.people.append(response_json)

将其保存在文件中(示例:swapi.py),打开 iPython,导入新创建的模块,然后 运行 像这样进行快速测试:

from swapi import StarWarsPeople
testing = StarWarsPeople()
testing.requestPeople()

from time import sleep
for x in range(5):
    print(len(testing.people))
    sleep(2)

如您所见,运行s 在后台运行,主线程中仍然会发生一些事情。您可以像往常一样继续使用 iPython 解释器。您甚至可以在后台设置检修孔 运行ning 来进行一些很酷的黑客攻击!

参考资料