对异步端点的调用被另一个线程阻塞
Call to async endpoint gets blocked by another thread
我有一个 tornado 网络服务,每分钟将处理大约 500 个请求。所有这些请求都将到达 1 个特定端点。我使用 Cython
编译了一个 C++
程序,并在 tornado 服务中使用它作为我的处理器引擎。每个到达 /check/
的请求都将触发 C++
程序中的函数调用(我将其称为 handler
),并且 return 值将作为响应发送给用户。
这就是我包装 handler
class 的方式。重要的一点是我没有在 __init__
中实例化 handler
。我的龙卷风代码中还有另一条路线,我想在授权请求到达该路线后开始加载 DataStructure。 (例如 /reload/
)
executors = ThreadPoolExecutor(max_workers=4)
class CheckerInstance(object):
def __init__(self, *args, **kwargs):
self.handler = None
self.is_loading = False
self.is_live = False
def init(self):
if not self.handler:
self.handler = pDataStructureHandler()
self.handler.add_words_from_file(self.data_file_name)
self.end_loading()
self.go_live()
def renew(self):
self.handler = None
self.init()
class CheckHandler(tornado.web.RequestHandler):
async def get(self):
query = self.get_argument("q", None).encode('utf-8')
answer = query
if not checker_instance.is_live:
self.write(dict(answer=self.get_argument("q", None), confidence=100))
return
checker_response = await checker_instance.get_response(query)
answer = checker_response[0]
confidence = checker_response[1]
if self.request.connection.stream.closed():
return
self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))
def on_connection_close(self):
self.wait_future.cancel()
class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
def prepare(self):
self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')
def new_file_exists(self):
return True
def can_reload(self):
return not checker_instance.is_loading
def get(self):
error = False
message = None
if not self.can_reload():
error = True
message = 'another job is being processed!'
else:
if not self.new_file_exists():
error = True
message = 'no new file found!'
else:
checker_instance.go_fake()
checker_instance.start_loading()
tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
message = 'job started!'
if self.request.connection.stream.closed():
return
self.write(dict(
success=not error, message=message
))
def on_connection_close(self):
self.wait_future.cancel()
def main():
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/check", CheckHandler),
(r"/reload", InstanceReloadHandler),
(r"/health", HealthHandler),
(r"/log-event", SubmitLogHandler),
],
debug=options.debug,
)
checker_instance = CheckerInstance()
我希望此服务在 checker_instance.renew
在另一个线程中 运行 启动后继续响应。但事实并非如此。当我点击 /reload/
端点并且 renew
函数开始工作时,对 /check/
的任何请求都会停止并等待重新加载过程完成,然后它会再次开始工作。加载 DataStructure 时,服务应处于 fake
模式,并使用与输入相同的查询响应人们。
我已经在我的开发环境中使用 i5 CPU(4 CPU 核)测试了这段代码,它工作得很好!但在生产环境中(3 个双线程 CPU 核心)/check/
端点停止请求。
很难完全跟踪正在处理的事件,因为为了简洁起见,您已经删除了一些代码。例如,我在这里没有看到 get_response
实现,所以我不知道它是否正在等待可能依赖于 checker_instance.
状态的东西
我要探索的一个领域是将 checker_instance.renew
传递给 run_in_executor
时的线程安全性(或看似不存在)。这对我来说很可疑,因为您正在从一个单独的线程中改变 CheckerInstance 的单个实例的状态。虽然它可能不会明确地破坏事物,但这似乎确实会引入奇怪的竞争条件或意外的内存副本,这可能会解释您正在经历的意外行为
如果可能的话,我会让你想要卸载到线程的任何加载行为完全独立,当加载数据时,return 它作为函数结果,然后可以是反馈给你 checker_instance。如果您按原样使用代码执行此操作,您可能希望等待 run_in_executor
调用获取其结果,然后更新 checker_instance。这意味着重新加载 GET 请求将等到数据加载完毕。或者,在您的重新加载 GET 请求中,您可以 ioloop.spawn_callback
以这种方式触发 run_in_executor
的函数,从而允许重新加载请求完成而不是等待。
我有一个 tornado 网络服务,每分钟将处理大约 500 个请求。所有这些请求都将到达 1 个特定端点。我使用 Cython
编译了一个 C++
程序,并在 tornado 服务中使用它作为我的处理器引擎。每个到达 /check/
的请求都将触发 C++
程序中的函数调用(我将其称为 handler
),并且 return 值将作为响应发送给用户。
这就是我包装 handler
class 的方式。重要的一点是我没有在 __init__
中实例化 handler
。我的龙卷风代码中还有另一条路线,我想在授权请求到达该路线后开始加载 DataStructure。 (例如 /reload/
)
executors = ThreadPoolExecutor(max_workers=4)
class CheckerInstance(object):
def __init__(self, *args, **kwargs):
self.handler = None
self.is_loading = False
self.is_live = False
def init(self):
if not self.handler:
self.handler = pDataStructureHandler()
self.handler.add_words_from_file(self.data_file_name)
self.end_loading()
self.go_live()
def renew(self):
self.handler = None
self.init()
class CheckHandler(tornado.web.RequestHandler):
async def get(self):
query = self.get_argument("q", None).encode('utf-8')
answer = query
if not checker_instance.is_live:
self.write(dict(answer=self.get_argument("q", None), confidence=100))
return
checker_response = await checker_instance.get_response(query)
answer = checker_response[0]
confidence = checker_response[1]
if self.request.connection.stream.closed():
return
self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))
def on_connection_close(self):
self.wait_future.cancel()
class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
def prepare(self):
self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')
def new_file_exists(self):
return True
def can_reload(self):
return not checker_instance.is_loading
def get(self):
error = False
message = None
if not self.can_reload():
error = True
message = 'another job is being processed!'
else:
if not self.new_file_exists():
error = True
message = 'no new file found!'
else:
checker_instance.go_fake()
checker_instance.start_loading()
tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
message = 'job started!'
if self.request.connection.stream.closed():
return
self.write(dict(
success=not error, message=message
))
def on_connection_close(self):
self.wait_future.cancel()
def main():
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/check", CheckHandler),
(r"/reload", InstanceReloadHandler),
(r"/health", HealthHandler),
(r"/log-event", SubmitLogHandler),
],
debug=options.debug,
)
checker_instance = CheckerInstance()
我希望此服务在 checker_instance.renew
在另一个线程中 运行 启动后继续响应。但事实并非如此。当我点击 /reload/
端点并且 renew
函数开始工作时,对 /check/
的任何请求都会停止并等待重新加载过程完成,然后它会再次开始工作。加载 DataStructure 时,服务应处于 fake
模式,并使用与输入相同的查询响应人们。
我已经在我的开发环境中使用 i5 CPU(4 CPU 核)测试了这段代码,它工作得很好!但在生产环境中(3 个双线程 CPU 核心)/check/
端点停止请求。
很难完全跟踪正在处理的事件,因为为了简洁起见,您已经删除了一些代码。例如,我在这里没有看到 get_response
实现,所以我不知道它是否正在等待可能依赖于 checker_instance.
我要探索的一个领域是将 checker_instance.renew
传递给 run_in_executor
时的线程安全性(或看似不存在)。这对我来说很可疑,因为您正在从一个单独的线程中改变 CheckerInstance 的单个实例的状态。虽然它可能不会明确地破坏事物,但这似乎确实会引入奇怪的竞争条件或意外的内存副本,这可能会解释您正在经历的意外行为
如果可能的话,我会让你想要卸载到线程的任何加载行为完全独立,当加载数据时,return 它作为函数结果,然后可以是反馈给你 checker_instance。如果您按原样使用代码执行此操作,您可能希望等待 run_in_executor
调用获取其结果,然后更新 checker_instance。这意味着重新加载 GET 请求将等到数据加载完毕。或者,在您的重新加载 GET 请求中,您可以 ioloop.spawn_callback
以这种方式触发 run_in_executor
的函数,从而允许重新加载请求完成而不是等待。