在龙卷风中放弃期货
Abandoning Futures in Tornado
我正在考虑在龙卷风中使用扇出代理来查询多个后端服务器,以及让它在返回之前不等待所有响应的可能用例。
如果您使用WaitIterator
但在收到有用的响应后不继续等待,剩余的期货是否有问题?
也许其他期货的结果不会被清理?也许可以将回调添加到任何剩余的期货中以丢弃其结果?
#!./venv/bin/python
from tornado import gen
from tornado import httpclient
from tornado import ioloop
from tornado import web
import json
class MainHandler(web.RequestHandler):
@gen.coroutine
def get(self):
r1 = httpclient.HTTPRequest(
url="http://apihost1.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
r2 = httpclient.HTTPRequest(
url="http://apihost2.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
http = httpclient.AsyncHTTPClient()
wait = gen.WaitIterator(
r1=http.fetch(r1),
r2=http.fetch(r2)
)
while not wait.done():
try:
reply = yield wait.next()
except Exception as e:
print("Error {} from {}".format(e, wait.current_future))
else:
print("Result {} received from {} at {}".format(
reply, wait.current_future,
wait.current_index))
if reply.code == 200:
result = json.loads(reply.body)
self.write(json.dumps(dict(result, backend=wait.current_index)))
return
def make_app():
return web.Application([
(r'/', MainHandler)
])
if __name__ == '__main__':
app = make_app()
app.listen(8888)
ioloop.IOLoop.current().start()
所以我检查了 WaitIterator
的来源。
它跟踪添加回调的未来,当迭代器被触发时将结果排队,或者(如果你调用了 next()
)实现它给你的未来。
由于您等待的未来只能通过调用 .next()
创建,看来您可以退出 while not wait.done()
并且不会在没有观察者的情况下留下任何未来。
引用计数应该允许 WaitIterator
实例保留,直到所有 futures 触发它们的回调然后被回收。
Update 2017/08/02
Having tested further with subclassing WaitIterator
with extra logging, yes the iterator will be cleaned up when all the futures return, but if any of those futures return an exception it will be logged that this exception hasn't been observed.
ERROR:tornado.application:Future exception was never retrieved: HTTPError: HTTP 599: Timeout while connecting
In summary and answering my question: completing the WaitIterator
isn't necessary from a clean-up point of view, but it is probably desirable to do so from a logging point of view.
如果您想确定,将 wait 迭代器传递给将完成消耗它的新未来并添加观察者可能就足够了。例如
@gen.coroutine
def complete_wait_iterator(wait):
rounds = 0
while not wait.done():
rounds += 1
try:
reply = yield wait.next()
except Exception as e:
print("Not needed Error {} from {}".format(e, wait.current_future))
else:
print("Not needed result {} received from {} at {}".format(
reply, wait.current_future,
wait.current_index))
log.info('completer finished after {n} rounds'.format(n=rounds))
class MainHandler(web.RequestHandler):
@gen.coroutine
def get(self):
r1 = httpclient.HTTPRequest(
url="http://apihost1.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
r2 = httpclient.HTTPRequest(
url="http://apihost2.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
http = httpclient.AsyncHTTPClient()
wait = gen.WaitIterator(
r1=http.fetch(r1),
r2=http.fetch(r2)
)
while not wait.done():
try:
reply = yield wait.next()
except Exception as e:
print("Error {} from {}".format(e, wait.current_future))
else:
print("Result {} received from {} at {}".format(
reply, wait.current_future,
wait.current_index))
if reply.code == 200:
result = json.loads(reply.body)
self.write(json.dumps(dict(result, backend=wait.current_index)))
consumer = complete_wait_iterator(wait)
consumer.add_done_callback(lambda f: f.exception())
return
我正在考虑在龙卷风中使用扇出代理来查询多个后端服务器,以及让它在返回之前不等待所有响应的可能用例。
如果您使用WaitIterator
但在收到有用的响应后不继续等待,剩余的期货是否有问题?
也许其他期货的结果不会被清理?也许可以将回调添加到任何剩余的期货中以丢弃其结果?
#!./venv/bin/python
from tornado import gen
from tornado import httpclient
from tornado import ioloop
from tornado import web
import json
class MainHandler(web.RequestHandler):
@gen.coroutine
def get(self):
r1 = httpclient.HTTPRequest(
url="http://apihost1.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
r2 = httpclient.HTTPRequest(
url="http://apihost2.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
http = httpclient.AsyncHTTPClient()
wait = gen.WaitIterator(
r1=http.fetch(r1),
r2=http.fetch(r2)
)
while not wait.done():
try:
reply = yield wait.next()
except Exception as e:
print("Error {} from {}".format(e, wait.current_future))
else:
print("Result {} received from {} at {}".format(
reply, wait.current_future,
wait.current_index))
if reply.code == 200:
result = json.loads(reply.body)
self.write(json.dumps(dict(result, backend=wait.current_index)))
return
def make_app():
return web.Application([
(r'/', MainHandler)
])
if __name__ == '__main__':
app = make_app()
app.listen(8888)
ioloop.IOLoop.current().start()
所以我检查了 WaitIterator
的来源。
它跟踪添加回调的未来,当迭代器被触发时将结果排队,或者(如果你调用了 next()
)实现它给你的未来。
由于您等待的未来只能通过调用 .next()
创建,看来您可以退出 while not wait.done()
并且不会在没有观察者的情况下留下任何未来。
引用计数应该允许 WaitIterator
实例保留,直到所有 futures 触发它们的回调然后被回收。
Update 2017/08/02
Having tested further with subclassingWaitIterator
with extra logging, yes the iterator will be cleaned up when all the futures return, but if any of those futures return an exception it will be logged that this exception hasn't been observed.
ERROR:tornado.application:Future exception was never retrieved: HTTPError: HTTP 599: Timeout while connecting
In summary and answering my question: completing the
WaitIterator
isn't necessary from a clean-up point of view, but it is probably desirable to do so from a logging point of view.
如果您想确定,将 wait 迭代器传递给将完成消耗它的新未来并添加观察者可能就足够了。例如
@gen.coroutine
def complete_wait_iterator(wait):
rounds = 0
while not wait.done():
rounds += 1
try:
reply = yield wait.next()
except Exception as e:
print("Not needed Error {} from {}".format(e, wait.current_future))
else:
print("Not needed result {} received from {} at {}".format(
reply, wait.current_future,
wait.current_index))
log.info('completer finished after {n} rounds'.format(n=rounds))
class MainHandler(web.RequestHandler):
@gen.coroutine
def get(self):
r1 = httpclient.HTTPRequest(
url="http://apihost1.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
r2 = httpclient.HTTPRequest(
url="http://apihost2.localdomain/api/object/thing",
connect_timeout=4.0,
request_timeout=4.0,
)
http = httpclient.AsyncHTTPClient()
wait = gen.WaitIterator(
r1=http.fetch(r1),
r2=http.fetch(r2)
)
while not wait.done():
try:
reply = yield wait.next()
except Exception as e:
print("Error {} from {}".format(e, wait.current_future))
else:
print("Result {} received from {} at {}".format(
reply, wait.current_future,
wait.current_index))
if reply.code == 200:
result = json.loads(reply.body)
self.write(json.dumps(dict(result, backend=wait.current_index)))
consumer = complete_wait_iterator(wait)
consumer.add_done_callback(lambda f: f.exception())
return