如何在 Hedis haskell pubSub 中发布
how to publish in Hedis haskell pubSub
我目前正在学习 Haskell。现在我目前不太擅长函数式编程。
我想制作一段代码,从订阅 Redis 中的某个主题获取数据,对其进行一些计算并将其发布到另一个主题。我猜有些语言特定的功能有问题。
我当前的代码:
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Database.Redis
import System.IO
main = do
conn <- connect defaultConnectInfo
runRedis conn $ do
pubSub (subscribe ["commands"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
publish "results" "Result of a very interesting calculation"
return mempty
现在我收到错误: • No instance for (RedisCtx IO f0) arising from a use of ‘publish’
将发布放在 pubsub 之外将使它起作用。但是我想发布一个结果!我无法从文档中得到任何智慧。我错过了什么?
您对 putStrLn
的使用使类型检查器(正确地!)推断出您的 do
块旨在位于 IO
上下文中,然后调用 publish
要求上下文是 RedisCtx
的实例,而 IO
不是。
通常在 Redis
上下文中,解决方案是使用 MonadIO
liftIO :: IO a -> m a
将 IO
操作提升到 Redis
上下文中=86=],如文档中的示例:
runRedis conn $ do
set "hello" "hello"
set "world" "world"
helloworld <- multiExec $ do
hello <- get "hello"
world <- get "world"
return $ (,) <$> hello <*> world
<strong>liftIO (print helloworld)</strong>
MonadIO
是一组类型,您可以在其上下文中执行 IO
操作。
然而,在这种情况下,情况正好相反:pubSub
returns 的功能参数是一个 IO
操作,但 publish
期望一个 RedisCtx
monad.
我不清楚 pubSub
是否允许你在回调中进行 runRedis
调用,就像这样,尽管我认为它应该进行类型检查:
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Database.Redis
import System.IO
main = do
conn <- connect defaultConnectInfo
runRedis conn $ do
pubSub (subscribe ["commands"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
<strong>runRedis conn $</strong> publish "results" "Result of a very interesting calculation"
return mempty
根据浏览文档,每个 runRedis
调用都从连接池中获取一个连接,连接池的默认大小为 50;然而,如果没有可用的连接,它会阻塞,所以我担心的是,因为 pubSub
的文档说它是“单线程”,这可能会死锁等待一个不会被释放的连接,因为你重新进入“嵌套”runRedis
调用。
我想我接下来要尝试的是使用更灵活的 pubSubForever
API;在 hedis
test suite 中有一个使用 pubSubForever
和单独的线程来发布和处理订阅事件的示例。
main = do
ctrl <- newPubSubController [("foo", msgHandler)] []
conn <- connect defaultConnectInfo
withAsync (publishThread conn) $ \_pubT -> do
withAsync (handlerThread conn ctrl) $ \_handlerT -> do
void $ hPutStrLn stderr "Press enter to subscribe to bar" >> getLine
void $ addChannels ctrl [("bar", msgHandler)] []
-- …
-- (Add/remove various subscriptions.)
-- …
publishThread
使用 runRedis
并调用 publish
:
publishThread :: Connection -> IO ()
publishThread c = <strong>runRedis</strong> c $ loop (0 :: Int)
where
loop i = do
let msg = encodeUtf8 $ pack $ "Publish iteration " ++ show i
void $ <strong>publish</strong> "foo" ("foo" <> msg)
-- …
liftIO $ threadDelay $ 2*1000*1000
loop (i+1)
handlerThread
使用 pubSubForever
:
handlerThread :: Connection -> PubSubController -> IO ()
handlerThread conn ctrl = forever $
<strong>pubSubForever</strong> conn ctrl onInitialComplete
`catch` (\(e :: SomeException) -> do
hPutStrLn stderr $ "Got error: " ++ show e
threadDelay $ 50*1000)
这包含在对 forever
to resubscribe if the connection is lost, per the docs for pubSubForever
的调用中:
[…] if the network connection to Redis dies, pubSubForever
will throw a ConnectionLost
. When such an exception is thrown, you can recall pubSubForever
with the same PubSubController
which will open a new connection and resubscribe to all the channels which are tracked in the PubSubController
.
此测试使用 async
包中的 Control.Concurrent.Async
来管理任务,我认为这是个好主意。如果你想避免这种依赖,你可以使用 forkIO
代替(例如 Chan
或 STM TChan
从处理程序发送事件),唯一的问题是这不会如果分叉线程由于异常而终止,则自动通知其他线程,而 Async
提供了一些很好的异常安全保证。
我目前正在学习 Haskell。现在我目前不太擅长函数式编程。 我想制作一段代码,从订阅 Redis 中的某个主题获取数据,对其进行一些计算并将其发布到另一个主题。我猜有些语言特定的功能有问题。
我当前的代码:
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Database.Redis
import System.IO
main = do
conn <- connect defaultConnectInfo
runRedis conn $ do
pubSub (subscribe ["commands"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
publish "results" "Result of a very interesting calculation"
return mempty
现在我收到错误: • No instance for (RedisCtx IO f0) arising from a use of ‘publish’
将发布放在 pubsub 之外将使它起作用。但是我想发布一个结果!我无法从文档中得到任何智慧。我错过了什么?
您对 putStrLn
的使用使类型检查器(正确地!)推断出您的 do
块旨在位于 IO
上下文中,然后调用 publish
要求上下文是 RedisCtx
的实例,而 IO
不是。
通常在 Redis
上下文中,解决方案是使用 MonadIO
liftIO :: IO a -> m a
将 IO
操作提升到 Redis
上下文中=86=],如文档中的示例:
runRedis conn $ do
set "hello" "hello"
set "world" "world"
helloworld <- multiExec $ do
hello <- get "hello"
world <- get "world"
return $ (,) <$> hello <*> world
<strong>liftIO (print helloworld)</strong>
MonadIO
是一组类型,您可以在其上下文中执行 IO
操作。
然而,在这种情况下,情况正好相反:pubSub
returns 的功能参数是一个 IO
操作,但 publish
期望一个 RedisCtx
monad.
我不清楚 pubSub
是否允许你在回调中进行 runRedis
调用,就像这样,尽管我认为它应该进行类型检查:
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Database.Redis
import System.IO
main = do
conn <- connect defaultConnectInfo
runRedis conn $ do
pubSub (subscribe ["commands"]) $ \msg -> do
putStrLn $ "Message from " ++ show (msgChannel msg)
<strong>runRedis conn $</strong> publish "results" "Result of a very interesting calculation"
return mempty
根据浏览文档,每个 runRedis
调用都从连接池中获取一个连接,连接池的默认大小为 50;然而,如果没有可用的连接,它会阻塞,所以我担心的是,因为 pubSub
的文档说它是“单线程”,这可能会死锁等待一个不会被释放的连接,因为你重新进入“嵌套”runRedis
调用。
我想我接下来要尝试的是使用更灵活的 pubSubForever
API;在 hedis
test suite 中有一个使用 pubSubForever
和单独的线程来发布和处理订阅事件的示例。
main = do
ctrl <- newPubSubController [("foo", msgHandler)] []
conn <- connect defaultConnectInfo
withAsync (publishThread conn) $ \_pubT -> do
withAsync (handlerThread conn ctrl) $ \_handlerT -> do
void $ hPutStrLn stderr "Press enter to subscribe to bar" >> getLine
void $ addChannels ctrl [("bar", msgHandler)] []
-- …
-- (Add/remove various subscriptions.)
-- …
publishThread
使用 runRedis
并调用 publish
:
publishThread :: Connection -> IO ()
publishThread c = <strong>runRedis</strong> c $ loop (0 :: Int)
where
loop i = do
let msg = encodeUtf8 $ pack $ "Publish iteration " ++ show i
void $ <strong>publish</strong> "foo" ("foo" <> msg)
-- …
liftIO $ threadDelay $ 2*1000*1000
loop (i+1)
handlerThread
使用 pubSubForever
:
handlerThread :: Connection -> PubSubController -> IO ()
handlerThread conn ctrl = forever $
<strong>pubSubForever</strong> conn ctrl onInitialComplete
`catch` (\(e :: SomeException) -> do
hPutStrLn stderr $ "Got error: " ++ show e
threadDelay $ 50*1000)
这包含在对 forever
to resubscribe if the connection is lost, per the docs for pubSubForever
的调用中:
[…] if the network connection to Redis dies,
pubSubForever
will throw aConnectionLost
. When such an exception is thrown, you can recallpubSubForever
with the samePubSubController
which will open a new connection and resubscribe to all the channels which are tracked in thePubSubController
.
此测试使用 async
包中的 Control.Concurrent.Async
来管理任务,我认为这是个好主意。如果你想避免这种依赖,你可以使用 forkIO
代替(例如 Chan
或 STM TChan
从处理程序发送事件),唯一的问题是这不会如果分叉线程由于异常而终止,则自动通知其他线程,而 Async
提供了一些很好的异常安全保证。