haskell 的基本 mqtt

basic mqtt with haskell

我将 mqtt-hs 移动到 LTS-5.13 并使用 stack 编译得很好。然后我创建了以下订阅者来收听主题层次结构。这是订阅者代码

{-# LANGUAGE OverloadedStrings #-}
module Main(main) where

import Control.Concurrent (threadWaitRead)
import System.Posix.Types
import Network.MQTT
import Network.MQTT.Logger

main :: IO ()
main = do
  conn <- connect defaultConfig { cLogger = warnings stdLogger, cHost = "192.168.0.1" }
  qos <- subscribe conn Confirm "/something" callback
  putStrLn "control-c to finish"
  threadWaitRead $ Fd 1

callback topic payload = putStrLn $ "A message was published to " ++ show topic ++ ": " ++ show payload

当我向主题发送内容时(也使用简单的 Haskell 发布者),我在该程序的控制台中得到了这个:

control-c to finish
[Error] {handle: <socket: 3>}: recvLoop: end of file
[Error] recvLoop: No reconnect, terminating.

但没有其他输出。

发布者和订阅者都连接到代理 (Mosquitto)。订阅者似乎没问题,只有在收到发布者的消息时才会发生上述情况。当我停止代理时,也会出现上述消息。

有什么想法吗?

更新

我克隆了最新的 mqtt-hs (0.3.0) 并引入了这个变化

--- a/Network/MQTT.hs
+++ b/Network/MQTT.hs
@@ -448,9 +448,10 @@ recvLoop :: MQTT -> IO ()
 recvLoop m = loopWithReconnect m "recvLoop" $ \mqtt -> do
     h <- readMVar (handle mqtt)
     eof <- hIsEOF h
-    if eof
-      then ioError $ mkIOError eofErrorType "" (Just h) Nothing
-      else getMessage mqtt >>= dispatchMessage mqtt
+    getMessage mqtt >>= dispatchMessage mqtt
+    -- if eof
+    --   then ioError $ mkIOError eofErrorType "" (Just h) Nothing
+    --   else getMessage mqtt >>= dispatchMessage mqtt
   `catch`
     \e -> logWarning mqtt $ "recvLoop: Caught " ++ show (e :: MQTTException)

即,我禁用了 EOF 检查。现在消息被打印到控制台,但是订阅者进入了一个循环,并尽可能快地抛出 [Warning] recvLoop: Caught EOF

这是 mosquito 错误还是 mqtt-hs 中的错误?

更新 2

我可以确认它可以在没有破解的情况下与 ActiveMQ 一起工作。话虽如此,如果 mqtt-hs 可以从代理关闭与订阅者的连接中恢复,那就更好了。

我相信您可能配置有误。我目前在生产中使用 mqtt-hs 并且几乎没有任何问题(除非项目的 github 中已经纠正了一个错误)。新更新使用 TChan 以在持久连接中正确传递消息,由于处理程序尚未可用,因此有可能丢失某些消息。

您定义了与代理的连接,但没有告诉客户端应该如何处理重新连接。作为参考,我目前正在使用类似以下内容的内容:

mqttConfig :: HostName -> MQTTConfig
mqttConfig host = defaultConfig { cClean = False
                                , cClientID = "myClientId"
                                , cHost = host
                                , cUsername = Just "username"
                                , cPassword = Just "password"
                                , cKeepAlive = Just 10
                                , cReconnPeriod = Just 1
                                , cLogger = stdLogger }

尽管此处使用的版本略有更新,但几乎不需要更改,并且当代理(此处也包括 mosquitto)出现故障然后恢复时会根据需要进行重新连接。