Redis 阻塞直到键存在

redis block until key exists

我是 Redis 的新手,想知道是否有一种方法能够 await get 通过它的键对一个值进行运算,直到该键存在。最小代码:

async def handler():
    data = await self._fetch(key)

async def _fetch(key):
    return self.redis_connection.get(key)

如您所知,如果 key 不存在,那么 return 就是 None。但由于在我的项目中,seting 键值对到 redis 发生在另一个应用程序中,我希望 redis_connection get 方法阻塞直到键存在。 这样的期望是否有效?

最接近此行为的方法是启用 keyspace notifications 并订阅相关频道(可能按模式订阅)。

但是请注意,通知依赖于不能保证传递消息的 PubSub(至多一次语义)。

除了@Itamar Haber提到的keyspace notification方法外,另一种解决方案是对LIST.

的阻塞操作
  1. handler 方法在空 LIST 上调用 BRPOPBRPOP notify-list timeout,并阻塞直到 notify-list 不为空。
  2. 另一个应用程序像往常一样完成设置键值对后将值推送到 LISTSET key value; LPUSH notify-list value.
  3. handler用你想要的值从阻塞操作中唤醒,notify-list被Redis自动销毁。

这个方案的好处是你不需要对你的handler方法做太多修改(使用键空间通知方案,你需要注册一个回调函数)。而缺点是必须依赖其他应用程序的通知(使用键空间通知解决方案,Redis 自动通知)。

如果不在您的客户端上实施某种轮询 redis GET,就不可能完成您想做的事情。在那种情况下,您的客户将不得不做类似的事情:

async def _fetch(key):
    val = self.redis_connection.get(key)
    while val is None:
       # Sleep and retry here
       asyncio.sleep(1)  
       val = self.redis_connection.get(key)
    return val

但是我会要求您完全重新考虑您用于解决此问题的模式。 在我看来,你需要做的事情是 Pub/Sub https://redis.io/topics/pubsub.

因此执行 SET 的应用程序成为发布者,执行 GET 的应用程序等待密钥可用成为订阅者。

我对此做了一些研究,看起来你可以用 asyncio_redis:

希望对您有所帮助。

Redis 5.0之后内置流,支持阻塞读。以下是带redis-py.

的示例代码
#add value to my_stream
redis.xadd('my_stream',{'key':'str_value'})

#read from beginning of stream
last_id='0'

#blocking read until there is value
last_stream_item = redis.xread({"my_stream":last_id},block=0) 

#update last_id
last_id = last_stream_item[0][1][0][0]

#wait for next value to arrive on stream 
last_stream_item = redis.xread({"my_stream":last_id},block=0)