如何在 Hedis haskell pubSub 中发布

how to publish in Hedis haskell pubSub

我目前正在学习 Haskell。现在我目前不太擅长函数式编程。 我想制作一段代码,从订阅 Redis 中的某个主题获取数据,对其进行一些计算并将其发布到另一个主题。我猜有些语言特定的功能有问题。

我当前的代码:

{-# LANGUAGE OverloadedStrings #-}
module Main where

import Database.Redis
import System.IO

main = do 
    conn <- connect defaultConnectInfo
    runRedis conn $ do
        pubSub (subscribe ["commands"]) $ \msg -> do
            putStrLn $ "Message from " ++ show (msgChannel msg)
            publish "results" "Result of a very interesting calculation"
            return mempty

现在我收到错误: • No instance for (RedisCtx IO f0) arising from a use of ‘publish’

将发布放在 pubsub 之外将使它起作用。但是我想发布一个结果!我无法从文档中得到任何智慧。我错过了什么?

您对 putStrLn 的使用使类型检查器(正确地!)推断出您的 do 块旨在位于 IO 上下文中,然后调用 publish 要求上下文是 RedisCtx 的实例,而 IO 不是。

通常在 Redis 上下文中,解决方案是使用 MonadIO liftIO :: IO a -> m aIO 操作提升到 Redis 上下文中=86=],如文档中的示例:

runRedis conn $ do
    set "hello" "hello"
    set "world" "world"
    helloworld <- multiExec $ do
        hello <- get "hello"
        world <- get "world"
        return $ (,) <$> hello <*> world
    <strong>liftIO (print helloworld)</strong>

MonadIO 是一组类型,您可以在其上下文中执行 IO 操作。

然而,在这种情况下,情况正好相反:pubSub returns 的功能参数是一个 IO 操作,但 publish 期望一个 RedisCtx monad.

我不清楚 pubSub 是否允许你在回调中进行 runRedis 调用,就像这样,尽管我认为它应该进行类型检查:

{-# LANGUAGE OverloadedStrings #-}
module Main where

import Database.Redis
import System.IO

main = do 
    conn <- connect defaultConnectInfo
    runRedis conn $ do
        pubSub (subscribe ["commands"]) $ \msg -> do
            putStrLn $ "Message from " ++ show (msgChannel msg)
            <strong>runRedis conn $</strong> publish "results" "Result of a very interesting calculation"
            return mempty

根据浏览文档,每个 runRedis 调用都从连接池中获取一个连接,连接池的默认大小为 50;然而,如果没有可用的连接,它会阻塞,所以我担心的是,因为 pubSub 的文档说它是“单线程”,这可能会死锁等待一个不会被释放的连接,因为你重新进入“嵌套”runRedis 调用。

我想我接下来要尝试的是使用更灵活的 pubSubForever API;在 hedis test suite 中有一个使用 pubSubForever 和单独的线程来发布和处理订阅事件的示例。

main = do
  ctrl <- newPubSubController [("foo", msgHandler)] []
  conn <- connect defaultConnectInfo

  withAsync (publishThread conn) $ \_pubT -> do
    withAsync (handlerThread conn ctrl) $ \_handlerT -> do

      void $ hPutStrLn stderr "Press enter to subscribe to bar" >> getLine
      void $ addChannels ctrl [("bar", msgHandler)] []
      -- …
      -- (Add/remove various subscriptions.)
      -- …

publishThread 使用 runRedis 并调用 publish:

publishThread :: Connection -> IO ()
publishThread c = <strong>runRedis</strong> c $ loop (0 :: Int)
  where
    loop i = do
      let msg = encodeUtf8 $ pack $ "Publish iteration " ++ show i
      void $ <strong>publish</strong> "foo" ("foo" <> msg)
      -- …
      liftIO $ threadDelay $ 2*1000*1000
      loop (i+1)

handlerThread 使用 pubSubForever:

handlerThread :: Connection -> PubSubController -> IO ()
handlerThread conn ctrl = forever $
       <strong>pubSubForever</strong> conn ctrl onInitialComplete
         `catch` (\(e :: SomeException) -> do
           hPutStrLn stderr $ "Got error: " ++ show e
           threadDelay $ 50*1000)

这包含在对 forever to resubscribe if the connection is lost, per the docs for pubSubForever 的调用中:

[…] if the network connection to Redis dies, pubSubForever will throw a ConnectionLost. When such an exception is thrown, you can recall pubSubForever with the same PubSubController which will open a new connection and resubscribe to all the channels which are tracked in the PubSubController.

此测试使用 async 包中的 Control.Concurrent.Async 来管理任务,我认为这是个好主意。如果你想避免这种依赖,你可以使用 forkIO 代替(例如 Chan 或 STM TChan 从处理程序发送事件),唯一的问题是这不会如果分叉线程由于异常而终止,则自动通知其他线程,而 Async 提供了一些很好的异常安全保证。