haskell 的基本 mqtt
basic mqtt with haskell
我将 mqtt-hs 移动到 LTS-5.13 并使用 stack 编译得很好。然后我创建了以下订阅者来收听主题层次结构。这是订阅者代码
{-# LANGUAGE OverloadedStrings #-}
module Main(main) where
import Control.Concurrent (threadWaitRead)
import System.Posix.Types
import Network.MQTT
import Network.MQTT.Logger
main :: IO ()
main = do
conn <- connect defaultConfig { cLogger = warnings stdLogger, cHost = "192.168.0.1" }
qos <- subscribe conn Confirm "/something" callback
putStrLn "control-c to finish"
threadWaitRead $ Fd 1
callback topic payload = putStrLn $ "A message was published to " ++ show topic ++ ": " ++ show payload
当我向主题发送内容时(也使用简单的 Haskell 发布者),我在该程序的控制台中得到了这个:
control-c to finish
[Error] {handle: <socket: 3>}: recvLoop: end of file
[Error] recvLoop: No reconnect, terminating.
但没有其他输出。
发布者和订阅者都连接到代理 (Mosquitto)。订阅者似乎没问题,只有在收到发布者的消息时才会发生上述情况。当我停止代理时,也会出现上述消息。
有什么想法吗?
更新
我克隆了最新的 mqtt-hs (0.3.0) 并引入了这个变化
--- a/Network/MQTT.hs
+++ b/Network/MQTT.hs
@@ -448,9 +448,10 @@ recvLoop :: MQTT -> IO ()
recvLoop m = loopWithReconnect m "recvLoop" $ \mqtt -> do
h <- readMVar (handle mqtt)
eof <- hIsEOF h
- if eof
- then ioError $ mkIOError eofErrorType "" (Just h) Nothing
- else getMessage mqtt >>= dispatchMessage mqtt
+ getMessage mqtt >>= dispatchMessage mqtt
+ -- if eof
+ -- then ioError $ mkIOError eofErrorType "" (Just h) Nothing
+ -- else getMessage mqtt >>= dispatchMessage mqtt
`catch`
\e -> logWarning mqtt $ "recvLoop: Caught " ++ show (e :: MQTTException)
即,我禁用了 EOF 检查。现在消息被打印到控制台,但是订阅者进入了一个循环,并尽可能快地抛出 [Warning] recvLoop: Caught EOF
。
这是 mosquito 错误还是 mqtt-hs 中的错误?
更新 2
我可以确认它可以在没有破解的情况下与 ActiveMQ 一起工作。话虽如此,如果 mqtt-hs 可以从代理关闭与订阅者的连接中恢复,那就更好了。
我相信您可能配置有误。我目前在生产中使用 mqtt-hs
并且几乎没有任何问题(除非项目的 github 中已经纠正了一个错误)。新更新使用 TChan
以在持久连接中正确传递消息,由于处理程序尚未可用,因此有可能丢失某些消息。
您定义了与代理的连接,但没有告诉客户端应该如何处理重新连接。作为参考,我目前正在使用类似以下内容的内容:
mqttConfig :: HostName -> MQTTConfig
mqttConfig host = defaultConfig { cClean = False
, cClientID = "myClientId"
, cHost = host
, cUsername = Just "username"
, cPassword = Just "password"
, cKeepAlive = Just 10
, cReconnPeriod = Just 1
, cLogger = stdLogger }
尽管此处使用的版本略有更新,但几乎不需要更改,并且当代理(此处也包括 mosquitto)出现故障然后恢复时会根据需要进行重新连接。
我将 mqtt-hs 移动到 LTS-5.13 并使用 stack 编译得很好。然后我创建了以下订阅者来收听主题层次结构。这是订阅者代码
{-# LANGUAGE OverloadedStrings #-}
module Main(main) where
import Control.Concurrent (threadWaitRead)
import System.Posix.Types
import Network.MQTT
import Network.MQTT.Logger
main :: IO ()
main = do
conn <- connect defaultConfig { cLogger = warnings stdLogger, cHost = "192.168.0.1" }
qos <- subscribe conn Confirm "/something" callback
putStrLn "control-c to finish"
threadWaitRead $ Fd 1
callback topic payload = putStrLn $ "A message was published to " ++ show topic ++ ": " ++ show payload
当我向主题发送内容时(也使用简单的 Haskell 发布者),我在该程序的控制台中得到了这个:
control-c to finish
[Error] {handle: <socket: 3>}: recvLoop: end of file
[Error] recvLoop: No reconnect, terminating.
但没有其他输出。
发布者和订阅者都连接到代理 (Mosquitto)。订阅者似乎没问题,只有在收到发布者的消息时才会发生上述情况。当我停止代理时,也会出现上述消息。
有什么想法吗?
更新
我克隆了最新的 mqtt-hs (0.3.0) 并引入了这个变化
--- a/Network/MQTT.hs
+++ b/Network/MQTT.hs
@@ -448,9 +448,10 @@ recvLoop :: MQTT -> IO ()
recvLoop m = loopWithReconnect m "recvLoop" $ \mqtt -> do
h <- readMVar (handle mqtt)
eof <- hIsEOF h
- if eof
- then ioError $ mkIOError eofErrorType "" (Just h) Nothing
- else getMessage mqtt >>= dispatchMessage mqtt
+ getMessage mqtt >>= dispatchMessage mqtt
+ -- if eof
+ -- then ioError $ mkIOError eofErrorType "" (Just h) Nothing
+ -- else getMessage mqtt >>= dispatchMessage mqtt
`catch`
\e -> logWarning mqtt $ "recvLoop: Caught " ++ show (e :: MQTTException)
即,我禁用了 EOF 检查。现在消息被打印到控制台,但是订阅者进入了一个循环,并尽可能快地抛出 [Warning] recvLoop: Caught EOF
。
这是 mosquito 错误还是 mqtt-hs 中的错误?
更新 2
我可以确认它可以在没有破解的情况下与 ActiveMQ 一起工作。话虽如此,如果 mqtt-hs 可以从代理关闭与订阅者的连接中恢复,那就更好了。
我相信您可能配置有误。我目前在生产中使用 mqtt-hs
并且几乎没有任何问题(除非项目的 github 中已经纠正了一个错误)。新更新使用 TChan
以在持久连接中正确传递消息,由于处理程序尚未可用,因此有可能丢失某些消息。
您定义了与代理的连接,但没有告诉客户端应该如何处理重新连接。作为参考,我目前正在使用类似以下内容的内容:
mqttConfig :: HostName -> MQTTConfig
mqttConfig host = defaultConfig { cClean = False
, cClientID = "myClientId"
, cHost = host
, cUsername = Just "username"
, cPassword = Just "password"
, cKeepAlive = Just 10
, cReconnPeriod = Just 1
, cLogger = stdLogger }
尽管此处使用的版本略有更新,但几乎不需要更改,并且当代理(此处也包括 mosquitto)出现故障然后恢复时会根据需要进行重新连接。