运行 iPython 中的扭曲反应堆
Running twisted reactor in iPython
我知道这通常是用 twistd 完成的,但我想使用 iPython 来测试扭曲代码上的代码 'live'。
How to start twisted's reactor from ipython 问了基本相同的问题,但第一个解决方案不再适用于当前 ipython/twisted,而第二个也不可用(线程引发多个错误)。
https://gist.github.com/kived/8721434 有一个叫做 TPython 的东西声称可以做到这一点,但是 运行 似乎可以工作,除了客户端永远不会连接到服务器(而 运行 相同的客户端在python shell).
我有使用 Conch Manhole 吗,或者有没有办法让 iPython 玩得更好(可能使用 _threadedselect)。
作为参考,我要求使用 ipython 5.0.0,python 2.7.12,扭曲的 16.4.1
虽然这没有回答我认为的问题,但它确实回答了(某种程度上)我发布的问题。嵌入 ipython 的工作原理是您可以使用反应堆 运行.
访问业务对象
from twisted.internet import reactor
from twisted.internet.endpoints import serverFromString
from myfactory import MyFactory
class MyClass(object):
def __init__(self, **kwargs):
super(MyClass, self).__init__(**kwargs)
server = serverFromString(reactor, 'tcp:12345')
server.list(MyFactory(self))
def interact():
import IPython
IPython.embed()
reactor.callInThread(interact)
if __name__ == "__main__":
myclass = MyClass()
reactor.run()
用 python myclass.py
或类似的方式调用上面的内容。
一般来说,异步代码在实时解释器中 运行 可能会很麻烦。最好只在后台 运行 异步脚本并在单独的解释器中执行 iPython 操作。您可以使用文件或 TCP 进行相互通信。如果这让您难以理解,那是因为它并不总是那么简单,最好避免可能的麻烦。
但是,您会很高兴知道有一个名为 crochet
的很棒的项目可以在非异步应用程序中使用 Twisted。它确实是我最喜欢的模块之一,令我震惊的是它没有得到更广泛的使用(不过你可以改变它;D)。 crochet
模块有一个 run_in_reactor
装饰器,它 运行 是一个独立线程中的 Twisted 反应器,由 crochet
本身管理。这是一个快速 class 示例,它执行对星球大战 RESTFul API 的请求,然后将 JSON 响应存储在列表中。
from __future__ import print_function
import json
from twisted.internet import defer, task
from twisted.web.client import getPage
from crochet import run_in_reactor, setup as setup_crochet
setup_crochet()
class StarWarsPeople(object):
people_id = [_id for _id in range(1, 89)]
people = []
@run_in_reactor
def requestPeople(self):
"""
Request Star Wars JSON data from the SWAPI site.
This occurs in a Twisted reactor in a separate thread.
"""
for _id in self.people_id:
url = 'http://swapi.co/api/people/{0}'.format(_id).encode('utf-8')
d = getPage(url)
d.addCallback(self.appendJSON)
def appendJSON(self, response):
"""
A callback which will take the response from the getPage() request,
convert it to JSON, then append it to self.people, which can be
accessed outside of the crochet thread.
"""
response_json = json.loads(response.decode('utf-8'))
#print(response_json) # uncomment if you want to see output
self.people.append(response_json)
将其保存在文件中(示例:swapi.py
),打开 iPython,导入新创建的模块,然后 运行 像这样进行快速测试:
from swapi import StarWarsPeople
testing = StarWarsPeople()
testing.requestPeople()
from time import sleep
for x in range(5):
print(len(testing.people))
sleep(2)
如您所见,运行s 在后台运行,主线程中仍然会发生一些事情。您可以像往常一样继续使用 iPython 解释器。您甚至可以在后台设置检修孔 运行ning 来进行一些很酷的黑客攻击!
参考资料
我知道这通常是用 twistd 完成的,但我想使用 iPython 来测试扭曲代码上的代码 'live'。
How to start twisted's reactor from ipython 问了基本相同的问题,但第一个解决方案不再适用于当前 ipython/twisted,而第二个也不可用(线程引发多个错误)。
https://gist.github.com/kived/8721434 有一个叫做 TPython 的东西声称可以做到这一点,但是 运行 似乎可以工作,除了客户端永远不会连接到服务器(而 运行 相同的客户端在python shell).
我有使用 Conch Manhole 吗,或者有没有办法让 iPython 玩得更好(可能使用 _threadedselect)。
作为参考,我要求使用 ipython 5.0.0,python 2.7.12,扭曲的 16.4.1
虽然这没有回答我认为的问题,但它确实回答了(某种程度上)我发布的问题。嵌入 ipython 的工作原理是您可以使用反应堆 运行.
访问业务对象from twisted.internet import reactor
from twisted.internet.endpoints import serverFromString
from myfactory import MyFactory
class MyClass(object):
def __init__(self, **kwargs):
super(MyClass, self).__init__(**kwargs)
server = serverFromString(reactor, 'tcp:12345')
server.list(MyFactory(self))
def interact():
import IPython
IPython.embed()
reactor.callInThread(interact)
if __name__ == "__main__":
myclass = MyClass()
reactor.run()
用 python myclass.py
或类似的方式调用上面的内容。
一般来说,异步代码在实时解释器中 运行 可能会很麻烦。最好只在后台 运行 异步脚本并在单独的解释器中执行 iPython 操作。您可以使用文件或 TCP 进行相互通信。如果这让您难以理解,那是因为它并不总是那么简单,最好避免可能的麻烦。
但是,您会很高兴知道有一个名为 crochet
的很棒的项目可以在非异步应用程序中使用 Twisted。它确实是我最喜欢的模块之一,令我震惊的是它没有得到更广泛的使用(不过你可以改变它;D)。 crochet
模块有一个 run_in_reactor
装饰器,它 运行 是一个独立线程中的 Twisted 反应器,由 crochet
本身管理。这是一个快速 class 示例,它执行对星球大战 RESTFul API 的请求,然后将 JSON 响应存储在列表中。
from __future__ import print_function
import json
from twisted.internet import defer, task
from twisted.web.client import getPage
from crochet import run_in_reactor, setup as setup_crochet
setup_crochet()
class StarWarsPeople(object):
people_id = [_id for _id in range(1, 89)]
people = []
@run_in_reactor
def requestPeople(self):
"""
Request Star Wars JSON data from the SWAPI site.
This occurs in a Twisted reactor in a separate thread.
"""
for _id in self.people_id:
url = 'http://swapi.co/api/people/{0}'.format(_id).encode('utf-8')
d = getPage(url)
d.addCallback(self.appendJSON)
def appendJSON(self, response):
"""
A callback which will take the response from the getPage() request,
convert it to JSON, then append it to self.people, which can be
accessed outside of the crochet thread.
"""
response_json = json.loads(response.decode('utf-8'))
#print(response_json) # uncomment if you want to see output
self.people.append(response_json)
将其保存在文件中(示例:swapi.py
),打开 iPython,导入新创建的模块,然后 运行 像这样进行快速测试:
from swapi import StarWarsPeople
testing = StarWarsPeople()
testing.requestPeople()
from time import sleep
for x in range(5):
print(len(testing.people))
sleep(2)
如您所见,运行s 在后台运行,主线程中仍然会发生一些事情。您可以像往常一样继续使用 iPython 解释器。您甚至可以在后台设置检修孔 运行ning 来进行一些很酷的黑客攻击!