为什么此 Haskell TCP 服务器会在接受时生成“无效参数”错误?
Why does this Haskell TCP server generate an `invalid argument` error with accept?
当 运行 这个 TCP 服务器,使用 Network.Simple.TCP 的 serve
命令时,我从 accept
得到一个无效参数;这两个例子都发生在这个例子中,这个例子被削减并从一个程序中稍微提取出来,而不是通过 TCP 接收 protobuf 消息(而不是文本消息)。但是,错误是相同的:
#!/usr/bin/env stack
{- stack script --nix --resolver lts-14.27
--nix-packages zlib
--no-nix-pure
--package bytestring
--package classy-prelude
--package conduit
--package exceptions
--package mtl
--package network
--package network-simple
--package stm
--package stm-conduit
--package text
--package unliftio
--ghc-options -Wall
-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Main where
import ClassyPrelude hiding (hClose)
import Conduit
import Control.Concurrent.STM.TBQueue (TBQueue, writeTBQueue)
import Control.Monad.Catch (MonadMask)
import Control.Monad.Writer
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Char8 as B
import Data.Conduit.Async (gatherFrom)
import qualified Data.Conduit.List as CL
import Data.Function ((&))
import qualified Data.Text as T
import GHC.IO.Handle (Handle, hClose)
import qualified Network.Simple.TCP as TCP
import qualified Network.Socket as NS
import UnliftIO.Concurrent (ThreadId, forkIO, threadDelay)
type Error = [String]
type Result r = Writer Error r
runResult :: Result r -> (r, Error)
runResult = runWriter
getPort :: NS.ServiceName
getPort = "29876"
waitForever :: IO ()
waitForever = do
threadDelay 10000
waitForever
-- | This signature is meant to simulate the same function from the proto-lens library,
-- | but without dealing with protobus for binary data.
decodeMessageDelimitedH :: Handle -> IO (Either String String)
decodeMessageDelimitedH h = do
sOut <- B.hGetLine h
pure $ Right $ B.unpack sOut
protoServe :: forall m. (MonadMask m, MonadResource m, MonadUnliftIO m) =>
(String -> Result [String])
-> ConduitT () [String] m ()
protoServe fromProto = start .| mapMC logFilterRead
.| CL.catMaybes .| mapMC msgToRecs
where
port = trace "getting protobuf port" getPort
start = do
let enQserver = serveTBQ (TCP.HostIPv4) port (decodeProto . fst)
gatherFrom 10000 enQserver
decodeProto :: NS.Socket -> m (Either String String)
decodeProto sock = bracket
connHandleIO
(liftIO . hClose)
(liftIO . decodeMessageDelimitedH)
where
connHandleIO :: m Handle
connHandleIO = liftIO $ sockToHandle sock
logFilterRead :: Either String String -> m (Maybe String)
logFilterRead pEi = case pEi of
Right p -> pure $ Just p
Left err -> trace err $ pure Nothing
msgToRecs :: String -> m [String]
msgToRecs p = case runResult $ fromProto p of
(rs, rErr) -> do
when (not $ null rErr) $ pure $ trace (intercalate "\n" rErr) ()
pure $ trace "completed msgToRecs" rs
-- | The handle only needs a read-view of the socket. Note that a TBQeueue is
-- | mutable but has STM's runtime safety checks in place.
sockToHandle :: NS.Socket -> IO Handle
sockToHandle sock = NS.socketToHandle sock ReadMode
-- | Based on serve and listen from Network.Simple.TCP
-- | Unlike `serve`, which never returns, `serveTBQ` immediately returns
-- | a `TBQueue` of results.
serveTBQ :: forall a m. (MonadMask m, MonadUnliftIO m)
=> TCP.HostPreference -- ^ Host to bind.
-> NS.ServiceName -- ^ Server service port name or number to bind.
-> ((NS.Socket, NS.SockAddr) -> m a)
-- ^ Computation to run in a different thread once an incoming connection is
-- accepted. Takes the connection socket and remote end address.
-> TBQueue a -- ^ enqueue computation results to this queue
-> m ()
-- ^ Returns a FIFO (queue) of results from concurrent requests
serveTBQ hp port rFun tbq = do
_ <- async $ withRunInIO $ \run -> TCP.serve hp port $ \(lsock, _) -> do
run $ void $ acceptTBQ lsock rFun tbq
putStrLn $ T.pack "exiting serveTBQ"
-- | Based on acceptFork from Network.Simple.TCP.
acceptTBQ :: forall a m.
MonadUnliftIO m
=> NS.Socket -- ^ Listening and bound socket.
-> ((NS.Socket, NS.SockAddr) -> m a)
-- ^ Computation to run in a different thread once an incoming connection is
-- accepted. Takes the connection socket and remote end address.
-> TBQueue a
-> m ThreadId
acceptTBQ lsock rFun tbq = mask $ \restore -> do
(csock, addr) <- trace ("running restore-accept on lsock: " <> (show lsock)) $ restore (liftIO $ NS.accept lsock)
onException (forkIO $ finally
(restore $ do
rVal <- trace "retrieved rVal in finally-restore" rFun (csock, addr)
atomically $ writeTBQueue tbq rVal)
(TCP.closeSock csock))
(TCP.closeSock csock)
retryForever :: forall m a. MonadUnliftIO m => m a -> m a
retryForever prog = catchAny prog progRetry
where
progRetry :: SomeException -> m a
progRetry ex = do
putStrLn $ pack $ show ex
threadDelay 4000000
retryForever prog
-- | Safer interface to sinkNull
sinkUnits :: MonadResource m => ConduitT () Void m ()
sinkUnits = sinkNull
main :: IO ()
main = retryForever $ do
putStrLn $ T.pack "starting tcp server"
let myProtoServe = protoServe (pure . words)
myProtoServe .| mapMC (putStrLn . T.pack . intercalate "_") .| sinkUnits & runConduitRes
putStrLn $ T.pack "tcp server exited"
waitForever
当 运行 上述服务器并执行 netcat
命令时,通过 TCP 输入一些文本,例如netcat 127.0.0.1 29876 < .bashrc
(用任何文本文件替换 .bashrc
),我看到如下输出:
starting tcp server
exiting serveTBQ
getting protobuf port
tcp server exited
running restore-accept on lsock: <socket: 16>
tcpConduitServer-exe: Network.Socket.accept: invalid argument (Invalid argument)
虽然我没有使用 TCP 的经验,但我不确定是什么导致了这种行为。因为我使用的是 serve
而不是编写自己的 TCP 逻辑,所以我对收到 TCP 错误感到有点惊讶。
这是一个包含脚本(以及非脚本版本)的存储库:
https://github.com/bbarker/tcpConduitServer
这是因为您在连接到客户端的套接字上调用了 NS.accept
。
如 Network.Simple.TCP.serve 中所述,TCP.serve
接受套接字并通过在不同线程中传递接受的套接字来调用其第三个参数。 serveTBQ
中的lsock
是连接到客户端的套接字,不是服务端接受新连接的套接字。
编写 TCP 服务器时,需要 1) 创建套接字,2) 将其绑定到地址和端口,3) 监听它,然后 4) 接受它。 TCP.serve
为您完成了所有这些任务,因此您需要做的就是 read/write 在套接字上 TCP.serve
提供给您的回调函数。
当 运行 这个 TCP 服务器,使用 Network.Simple.TCP 的 serve
命令时,我从 accept
得到一个无效参数;这两个例子都发生在这个例子中,这个例子被削减并从一个程序中稍微提取出来,而不是通过 TCP 接收 protobuf 消息(而不是文本消息)。但是,错误是相同的:
#!/usr/bin/env stack
{- stack script --nix --resolver lts-14.27
--nix-packages zlib
--no-nix-pure
--package bytestring
--package classy-prelude
--package conduit
--package exceptions
--package mtl
--package network
--package network-simple
--package stm
--package stm-conduit
--package text
--package unliftio
--ghc-options -Wall
-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Main where
import ClassyPrelude hiding (hClose)
import Conduit
import Control.Concurrent.STM.TBQueue (TBQueue, writeTBQueue)
import Control.Monad.Catch (MonadMask)
import Control.Monad.Writer
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Char8 as B
import Data.Conduit.Async (gatherFrom)
import qualified Data.Conduit.List as CL
import Data.Function ((&))
import qualified Data.Text as T
import GHC.IO.Handle (Handle, hClose)
import qualified Network.Simple.TCP as TCP
import qualified Network.Socket as NS
import UnliftIO.Concurrent (ThreadId, forkIO, threadDelay)
type Error = [String]
type Result r = Writer Error r
runResult :: Result r -> (r, Error)
runResult = runWriter
getPort :: NS.ServiceName
getPort = "29876"
waitForever :: IO ()
waitForever = do
threadDelay 10000
waitForever
-- | This signature is meant to simulate the same function from the proto-lens library,
-- | but without dealing with protobus for binary data.
decodeMessageDelimitedH :: Handle -> IO (Either String String)
decodeMessageDelimitedH h = do
sOut <- B.hGetLine h
pure $ Right $ B.unpack sOut
protoServe :: forall m. (MonadMask m, MonadResource m, MonadUnliftIO m) =>
(String -> Result [String])
-> ConduitT () [String] m ()
protoServe fromProto = start .| mapMC logFilterRead
.| CL.catMaybes .| mapMC msgToRecs
where
port = trace "getting protobuf port" getPort
start = do
let enQserver = serveTBQ (TCP.HostIPv4) port (decodeProto . fst)
gatherFrom 10000 enQserver
decodeProto :: NS.Socket -> m (Either String String)
decodeProto sock = bracket
connHandleIO
(liftIO . hClose)
(liftIO . decodeMessageDelimitedH)
where
connHandleIO :: m Handle
connHandleIO = liftIO $ sockToHandle sock
logFilterRead :: Either String String -> m (Maybe String)
logFilterRead pEi = case pEi of
Right p -> pure $ Just p
Left err -> trace err $ pure Nothing
msgToRecs :: String -> m [String]
msgToRecs p = case runResult $ fromProto p of
(rs, rErr) -> do
when (not $ null rErr) $ pure $ trace (intercalate "\n" rErr) ()
pure $ trace "completed msgToRecs" rs
-- | The handle only needs a read-view of the socket. Note that a TBQeueue is
-- | mutable but has STM's runtime safety checks in place.
sockToHandle :: NS.Socket -> IO Handle
sockToHandle sock = NS.socketToHandle sock ReadMode
-- | Based on serve and listen from Network.Simple.TCP
-- | Unlike `serve`, which never returns, `serveTBQ` immediately returns
-- | a `TBQueue` of results.
serveTBQ :: forall a m. (MonadMask m, MonadUnliftIO m)
=> TCP.HostPreference -- ^ Host to bind.
-> NS.ServiceName -- ^ Server service port name or number to bind.
-> ((NS.Socket, NS.SockAddr) -> m a)
-- ^ Computation to run in a different thread once an incoming connection is
-- accepted. Takes the connection socket and remote end address.
-> TBQueue a -- ^ enqueue computation results to this queue
-> m ()
-- ^ Returns a FIFO (queue) of results from concurrent requests
serveTBQ hp port rFun tbq = do
_ <- async $ withRunInIO $ \run -> TCP.serve hp port $ \(lsock, _) -> do
run $ void $ acceptTBQ lsock rFun tbq
putStrLn $ T.pack "exiting serveTBQ"
-- | Based on acceptFork from Network.Simple.TCP.
acceptTBQ :: forall a m.
MonadUnliftIO m
=> NS.Socket -- ^ Listening and bound socket.
-> ((NS.Socket, NS.SockAddr) -> m a)
-- ^ Computation to run in a different thread once an incoming connection is
-- accepted. Takes the connection socket and remote end address.
-> TBQueue a
-> m ThreadId
acceptTBQ lsock rFun tbq = mask $ \restore -> do
(csock, addr) <- trace ("running restore-accept on lsock: " <> (show lsock)) $ restore (liftIO $ NS.accept lsock)
onException (forkIO $ finally
(restore $ do
rVal <- trace "retrieved rVal in finally-restore" rFun (csock, addr)
atomically $ writeTBQueue tbq rVal)
(TCP.closeSock csock))
(TCP.closeSock csock)
retryForever :: forall m a. MonadUnliftIO m => m a -> m a
retryForever prog = catchAny prog progRetry
where
progRetry :: SomeException -> m a
progRetry ex = do
putStrLn $ pack $ show ex
threadDelay 4000000
retryForever prog
-- | Safer interface to sinkNull
sinkUnits :: MonadResource m => ConduitT () Void m ()
sinkUnits = sinkNull
main :: IO ()
main = retryForever $ do
putStrLn $ T.pack "starting tcp server"
let myProtoServe = protoServe (pure . words)
myProtoServe .| mapMC (putStrLn . T.pack . intercalate "_") .| sinkUnits & runConduitRes
putStrLn $ T.pack "tcp server exited"
waitForever
当 运行 上述服务器并执行 netcat
命令时,通过 TCP 输入一些文本,例如netcat 127.0.0.1 29876 < .bashrc
(用任何文本文件替换 .bashrc
),我看到如下输出:
starting tcp server
exiting serveTBQ
getting protobuf port
tcp server exited
running restore-accept on lsock: <socket: 16>
tcpConduitServer-exe: Network.Socket.accept: invalid argument (Invalid argument)
虽然我没有使用 TCP 的经验,但我不确定是什么导致了这种行为。因为我使用的是 serve
而不是编写自己的 TCP 逻辑,所以我对收到 TCP 错误感到有点惊讶。
这是一个包含脚本(以及非脚本版本)的存储库: https://github.com/bbarker/tcpConduitServer
这是因为您在连接到客户端的套接字上调用了 NS.accept
。
如 Network.Simple.TCP.serve 中所述,TCP.serve
接受套接字并通过在不同线程中传递接受的套接字来调用其第三个参数。 serveTBQ
中的lsock
是连接到客户端的套接字,不是服务端接受新连接的套接字。
编写 TCP 服务器时,需要 1) 创建套接字,2) 将其绑定到地址和端口,3) 监听它,然后 4) 接受它。 TCP.serve
为您完成了所有这些任务,因此您需要做的就是 read/write 在套接字上 TCP.serve
提供给您的回调函数。