对异步端点的调用被另一个线程阻塞

Call to async endpoint gets blocked by another thread

我有一个 tornado 网络服务,每分钟将处理大约 500 个请求。所有这些请求都将到达 1 个特定端点。我使用 Cython 编译了一个 C++ 程序,并在 tornado 服务中使用它作为我的处理器引擎。每个到达 /check/ 的请求都将触发 C++ 程序中的函数调用(我将其称为 handler),并且 return 值将作为响应发送给用户。

这就是我包装 handler class 的方式。重要的一点是我没有在 __init__ 中实例化 handler。我的龙卷风代码中还有另一条路线,我想在授权请求到达该路线后开始加载 DataStructure。 (例如 /reload/


executors = ThreadPoolExecutor(max_workers=4)


class CheckerInstance(object):
    def __init__(self, *args, **kwargs):
        self.handler = None
        self.is_loading = False
        self.is_live = False

    def init(self):
        if not self.handler:
            self.handler = pDataStructureHandler()
            self.handler.add_words_from_file(self.data_file_name)
            self.end_loading()
            self.go_live()

    def renew(self):
        self.handler = None
        self.init()



class CheckHandler(tornado.web.RequestHandler):
    async def get(self):
        query = self.get_argument("q", None).encode('utf-8')
        answer = query

        if not checker_instance.is_live:
            self.write(dict(answer=self.get_argument("q", None), confidence=100))
            return

        checker_response = await checker_instance.get_response(query)
        answer = checker_response[0]
        confidence = checker_response[1]

        if self.request.connection.stream.closed():
            return
        self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))

    def on_connection_close(self):
        self.wait_future.cancel()


class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
    def prepare(self):
        self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')

    def new_file_exists(self):
        return True

    def can_reload(self):
        return not checker_instance.is_loading

    def get(self):
        error = False
        message = None

        if not self.can_reload():
            error = True
            message = 'another job is being processed!'
        else:
            if not self.new_file_exists():
                    error = True
                    message = 'no new file found!'
            else:
                checker_instance.go_fake()
                checker_instance.start_loading()
                tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
                message = 'job started!'

        if self.request.connection.stream.closed():
            return
        self.write(dict(
            success=not error, message=message
        ))

    def on_connection_close(self):
        self.wait_future.cancel()


def main():
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/check", CheckHandler),
            (r"/reload", InstanceReloadHandler),
            (r"/health", HealthHandler),
            (r"/log-event", SubmitLogHandler),
        ],
        debug=options.debug,
    )
    checker_instance = CheckerInstance()

我希望此服务在 checker_instance.renew 在另一个线程中 运行 启动后继续响应。但事实并非如此。当我点击 /reload/ 端点并且 renew 函数开始工作时,对 /check/ 的任何请求都会停止并等待重新加载过程完成,然后它会再次开始工作。加载 DataStructure 时,服务应处于 fake 模式,并使用与输入相同的查询响应人们。

我已经在我的开发环境中使用 i5 CPU(4 CPU 核)测试了这段代码,它工作得很好!但在生产环境中(3 个双线程 CPU 核心)/check/ 端点停止请求。

很难完全跟踪正在处理的事件,因为为了简洁起见,您已经删除了一些代码。例如,我在这里没有看到 get_response 实现,所以我不知道它是否正在等待可能依赖于 checker_instance.

状态的东西

我要探索的一个领域是将 checker_instance.renew 传递给 run_in_executor 时的线程安全性(或看似不存在)。这对我来说很可疑,因为您正在从一个单独的线程中改变 CheckerInstance 的单个实例的状态。虽然它可能不会明确地破坏事物,但这似乎确实会引入奇怪的竞争条件或意外的内存副本,这可能会解释您正在经历的意外行为

如果可能的话,我会让你想要卸载到线程的任何加载行为完全独立,当加载数据时,return 它作为函数结果,然后可以是反馈给你 checker_instance。如果您按原样使用代码执行此操作,您可能希望等待 run_in_executor 调用获取其结果,然后更新 checker_instance。这意味着重新加载 GET 请求将等到数据加载完毕。或者,在您的重新加载 GET 请求中,您可以 ioloop.spawn_callback 以这种方式触发 run_in_executor 的函数,从而允许重新加载请求完成而不是等待。