Tornado + aioredis:为什么我的 redis 调用会阻塞?

Tornado + aioredis: why are my redis calls blocking?

我尝试在 Tornado 和 Redis 上构建一个具有两个 API 端点的简单系统:

  1. 一个 API 从 Redis 读取一个值,或者等到这个值存在(with BRPOP : value = yield from redis.brpop("test")
  2. 一个 API 写这个值(用 LPUSH : redis.lpush("test", "the value"))。

所以我希望能够以任何顺序调用那些 API。事实上,如果我调用 2. 然后调用 1.,它会按预期工作,调用 1. returns 会立即获得值。

问题是,如果我调用 1. 然后调用 2.,两个请求都将永远阻塞 return。

同时,在请求阻塞时,我仍然可以直接在 Redis 中 LPUSH/BRPOP,即使是同一个键。同样,我可以在 Tornado 中调用其他 Handlers。所以我猜这个块既不在 Redis 也不在 Tornado 中,而是在我对 aioredis 的使用中?也许异步循环?但我不明白我错在哪里。有什么建议吗?

感谢您的帮助。

这是我的代码:

import tornado.ioloop
import tornado.web
from tornado import web, gen
from tornado.options import options, define
import aioredis
import asyncio


class WaitValueHandler(tornado.web.RequestHandler):
    @asyncio.coroutine
    def get(self):
        redis = self.application.redis
        value = yield from redis.brpop("test")
        self.write("I received a value: %s" % value)


class WriteValueHandler(tornado.web.RequestHandler):
    @asyncio.coroutine
    def get(self):
        redis = self.application.redis
        res = yield from redis.lpush("test", "here is the value")
        self.write("Ok ")


class Application(tornado.web.Application):
    def __init__(self):
        tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOMainLoop')

        handlers = [
            (r"/get", WaitValueHandler),
            (r"/put", WriteValueHandler)
        ]

        super().__init__(handlers, debug=True)

    def init_with_loop(self, loop):
        self.redis = loop.run_until_complete(
            aioredis.create_redis(('localhost', 6379), loop=loop)
        )

if __name__ == "__main__":
    application = Application()
    application.listen(8888)

    loop = asyncio.get_event_loop()
    application.init_with_loop(loop)
    loop.run_forever()

好的,我知道为什么了,as the doc states :

Blocking operations (like blpop, brpop or long-running LUA scripts) in shared mode mode will block connection and thus may lead to whole program malfunction.

This blocking issue can be easily solved by using exclusive connection for such operations:

redis = await aioredis.create_redis_pool(
    ('localhost', 6379),
    minsize=1,
    maxsize=1)

async def task():
   # Exclusive mode
   with await redis as r:
       await r.set('key', 'val')
asyncio.ensure_future(task())
asyncio.ensure_future(task())
# Both tasks will first acquire connection.