{-# OPTIONS_HADDOCK prune #-} {-# LANGUAGE CPP #-} {-# LANGUAGE DoAndIfThenElse #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE LambdaCase #-} ----------------------------------------------------------------------------- -- | -- Module : Connection.Tcp -- -- Maintainer : joe@jerkface.net -- Stability : experimental -- -- A TCP client/server library. -- -- TODO: -- -- * interface tweaks -- module Connection.Tcp ( module Connection.Tcp , module PingMachine ) where import Data.ByteString (ByteString,hGetNonBlocking) import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) import Data.Conduit ( Source, Sink, Flush ) #if MIN_VERSION_containers(0,5,0) import qualified Data.Map.Strict as Map import Data.Map.Strict (Map) #else import qualified Data.Map as Map import Data.Map (Map) #endif import Data.Monoid ( (<>) ) #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM -- import Control.Concurrent.STM.TMVar -- import Control.Concurrent.STM.TChan -- import Control.Concurrent.STM.Delay import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException) import Control.Monad import Control.Monad.Fix -- import Control.Monad.STM -- import Control.Monad.Trans.Resource import Control.Monad.IO.Class (MonadIO (liftIO)) import System.IO.Error (isDoesNotExistError) import System.IO ( IOMode(..) , hSetBuffering , BufferMode(..) , hWaitForInput , hClose , hIsEOF , stderr , Handle , hFlush , hPutStrLn ) import Network.Socket as Socket import Network.BSD ( getProtocolNumber ) import Debug.Trace import Data.Time.Clock (getCurrentTime,diffUTCTime) -- import SockAddr () -- import System.Locale (defaultTimeLocale) import InterruptibleDelay import PingMachine import Network.StreamServer import Network.SocketLike hiding (sClose) import qualified Connection as G ;import Connection (Manager (..), Policy(..)) type Microseconds = Int -- | This object is passed with the 'Listen' and 'Connect' -- instructions in order to control the behavior of the -- connections that are established. It is parameterized -- by a user-suplied type @conkey@ that is used as a lookup -- key for connections. data ConnectionParameters conkey u = ConnectionParameters { pingInterval :: PingInterval -- ^ The miliseconds of idle to allow before a 'RequiresPing' -- event is signaled. , timeout :: TimeOut -- ^ The miliseconds of idle after 'RequiresPing' is signaled -- that are necessary for the connection to be considered -- lost and signalling 'EOF'. , makeConnKey :: RestrictedSocket -> IO (conkey,u) -- ^ This action creates a lookup key for a new connection. If 'duplex' -- is 'True' and the result is already assocatied with an established -- connection, then an 'EOF' will be forced before the the new -- connection becomes active. -- , duplex :: Bool -- ^ If True, then the connection will be treated as a normal -- two-way socket. Otherwise, a readable socket is established -- with 'Listen' and a writable socket is established with -- 'Connect' and they are associated when 'makeConnKey' yields -- same value for each. } -- | Use this function to select appropriate default values for -- 'ConnectionParameters' other than 'makeConnKey'. -- -- Current defaults: -- -- * 'pingInterval' = 28000 -- -- * 'timeout' = 2000 -- -- * 'duplex' = True -- connectionDefaults :: (RestrictedSocket -> IO (conkey,u)) -> ConnectionParameters conkey u connectionDefaults f = ConnectionParameters { pingInterval = 28000 , timeout = 2000 , makeConnKey = f , duplex = True } -- | Instructions for a 'Server' object -- -- To issue a command, put it into the 'serverCommand' TMVar. data ServerInstruction conkey u = Quit -- ^ kill the server. This command is automatically issued when -- the server is released. | Listen PortNumber (ConnectionParameters conkey u) -- ^ listen for incoming connections | Connect SockAddr (ConnectionParameters conkey u) -- ^ connect to addresses | ConnectWithEndlessRetry SockAddr (ConnectionParameters conkey u) Miliseconds -- ^ keep retrying the connection | Ignore PortNumber -- ^ stop listening on specified port | Send conkey ByteString -- ^ send bytes to an established connection #ifdef TEST deriving instance Show conkey => Show (ServerInstruction conkey u) instance Show (a -> b) where show _ = "" deriving instance Show conkey => Show (ConnectionParameters conkey u) #endif -- | This type specifies which which half of a half-duplex -- connection is of interest. data InOrOut = In | Out deriving (Enum,Eq,Ord,Show,Read) -- | These events may be read from 'serverEvent' TChannel. -- data ConnectionEvent b = Connection (STM Bool) (Source IO b) (Sink (Flush b) IO ()) -- ^ A new connection was established | ConnectFailure SockAddr -- ^ A 'Connect' command failed. | HalfConnection InOrOut -- ^ Half of a half-duplex connection is avaliable. | EOF -- ^ A connection was terminated | RequiresPing -- ^ 'pingInterval' miliseconds of idle was experienced #ifdef TEST instance Show (IO a) where show _ = "" instance Show (STM a) where show _ = "" instance Eq (ByteString -> IO Bool) where (==) _ _ = True instance Eq (IO (Maybe ByteString)) where (==) _ _ = True instance Eq (STM Bool) where (==) _ _ = True deriving instance Show b => Show (ConnectionEvent b) deriving instance Eq b => Eq (ConnectionEvent b) #endif -- | This is the per-connection state. data ConnectionRecord u = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler , cstate :: ConnectionState -- ^ used to send/receive data to the connection , cdata :: u -- ^ user data, stored in the connection map for convenience } -- | This object accepts commands and signals events and maintains -- the list of currently listening ports and established connections. data Server a u releaseKey b = Server { serverCommand :: TMVar (ServerInstruction a u) , serverEvent :: TChan ((a,u), ConnectionEvent b) , serverReleaseKey :: releaseKey , conmap :: TVar (Map a (ConnectionRecord u)) , listenmap :: TVar (Map PortNumber ServerHandle) , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) } control :: Server a u releaseKey b -> ServerInstruction a u -> IO () control sv = atomically . putTMVar (serverCommand sv) type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) noCleanUp :: MonadIO m => Allocate () m noCleanUp io _ = ( (,) () ) `liftM` liftIO io -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' -- to ensure proper cleanup. For example, -- -- > import Connection.Tcp -- > import Control.Monad.Trans.Resource (runResourceT) -- > import Control.Monad.IO.Class (liftIO) -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent.STM.TMVar (putTMVar) -- > import Control.Concurrent.STM.TChan (readTChan) -- > -- > main = runResourceT $ do -- > sv <- server allocate -- > let params = connectionDefaults (return . snd) -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) -- > let loop = do -- > (_,event) <- atomically $ readTChan (serverEvent sv) -- > case event of -- > Connection getPingFlag readData writeData -> do -- > forkIO $ do -- > fix $ \readLoop -> do -- > readData >>= mapM $ \bytes -> -- > putStrLn $ "got: " ++ show bytes -- > readLoop -- > case event of EOF -> return () -- > _ -> loop -- > liftIO loop -- -- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp' -- to do without automatic cleanup and be sure to remember to write 'Quit' to -- the 'serverCommand' variable. server :: -- forall (m :: * -> *) a u conkey releaseKey. (Show conkey, MonadIO m, Ord conkey) => Allocate releaseKey m -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) -> m (Server conkey u releaseKey x) server allocate sessionConduits = do (key,cmds) <- allocate (atomically newEmptyTMVar) (atomically . flip putTMVar Quit) server <- liftIO . atomically $ do tchan <- newTChan conmap <- newTVar Map.empty listenmap<- newTVar Map.empty retrymap <- newTVar Map.empty return Server { serverCommand = cmds , serverEvent = tchan , serverReleaseKey = key , conmap = conmap , listenmap = listenmap , retrymap = retrymap } liftIO $ do tid <- forkIO $ fix $ \loop -> do instr <- atomically $ takeTMVar cmds -- warn $ "instr = " <> bshow instr let again = do doit server instr -- warn $ "finished " <> bshow instr loop case instr of Quit -> closeAll server _ -> again labelThread tid "server" return server where closeAll server = do listening <- atomically . readTVar $ listenmap server mapM_ quitListening (Map.elems listening) let stopRetry (v,d) = do atomically $ writeTVar v False interruptDelay d retriers <- atomically $ do rmap <- readTVar $ retrymap server writeTVar (retrymap server) Map.empty return rmap mapM_ stopRetry (Map.elems retriers) cons <- atomically . readTVar $ conmap server atomically $ mapM_ (connClose . cstate) (Map.elems cons) atomically $ mapM_ (connWait . cstate) (Map.elems cons) atomically $ writeTVar (conmap server) Map.empty doit server (Listen port params) = do listening <- Map.member port `fmap` atomically (readTVar $ listenmap server) when (not listening) $ do hPutStrLn stderr $ "Started listening on "++show port let family = AF_INET6 address = case family of AF_INET -> SockAddrInet port iNADDR_ANY AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0 sserv <- flip streamServer address ServerConfig { serverWarn = hPutStrLn stderr , serverSession = \sock _ h -> do (conkey,u) <- makeConnKey params sock _ <- newConnection server sessionConduits params conkey u h In return () } atomically $ listenmap server `modifyTVar'` Map.insert port sserv doit server (Ignore port) = do hPutStrLn stderr $ "Stopping listen on "++show port mb <- atomically $ do map <- readTVar $ listenmap server modifyTVar' (listenmap server) $ Map.delete port return $ Map.lookup port map maybe (return ()) quitListening $ mb doit server (Send con bs) = do -- . void . forkIO $ do map <- atomically $ readTVar (conmap server) let post False = (trace ("cant send: "++show bs) $ return ()) post True = return () maybe (post False) (post <=< flip connWrite bs . cstate) $ Map.lookup con map doit server (Connect addr params) = join $ atomically $ do Map.lookup addr <$> readTVar (retrymap server) >>= return . \case Nothing -> forkit Just (v,d) -> do b <- atomically $ readTVar v interruptDelay d when (not b) forkit where forkit = void . forkIO $ do myThreadId >>= flip labelThread ( "Connect." ++ show addr ) proto <- getProtocolNumber "tcp" sock <- socket (socketFamily addr) Stream proto handle (\e -> do -- let t = ioeGetErrorType e when (isDoesNotExistError e) $ return () -- warn "GOTCHA" -- warn $ "connect-error: " <> bshow e (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? Socket.close sock atomically $ writeTChan (serverEvent server) $ ((conkey,u),ConnectFailure addr)) $ do connect sock addr (conkey,u) <- makeConnKey params (restrictSocket sock) h <- socketToHandle sock ReadWriteMode newConnection server sessionConduits params conkey u h Out return () doit server (ConnectWithEndlessRetry addr params interval) = do proto <- getProtocolNumber "tcp" void . forkIO $ do myThreadId >>= flip labelThread ("ConnectWithEndlessRetry." ++ show addr) timer <- interruptibleDelay (retryVar,action) <- atomically $ do map <- readTVar (retrymap server) action <- case Map.lookup addr map of Nothing -> return $ return () Just (v,d) -> do writeTVar v False return $ interruptDelay d v <- newTVar True writeTVar (retrymap server) $! Map.insert addr (v,timer) map return (v,action :: IO ()) action fix $ \retryLoop -> do utc <- getCurrentTime shouldRetry <- do handle (\(SomeException e) -> do -- Exceptions thrown by 'socket' need to be handled specially -- since we don't have enough information to broadcast a ConnectFailure -- on serverEvent. warn $ "Failed to create socket: " <> bshow e atomically $ readTVar retryVar) $ do sock <- socket (socketFamily addr) Stream proto handle (\(SomeException e) -> do -- Any thing else goes wrong and we broadcast ConnectFailure. do (conkey,u) <- makeConnKey params (restrictSocket sock) Socket.close sock atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr) `onException` return () atomically $ readTVar retryVar) $ do connect sock addr (conkey,u) <- makeConnKey params (restrictSocket sock) h <- socketToHandle sock ReadWriteMode threads <- newConnection server sessionConduits params conkey u h Out atomically $ do threadsWait threads readTVar retryVar fin_utc <- getCurrentTime when shouldRetry $ do let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) expected = fromIntegral interval when (shouldRetry && elapsed < expected) $ do debugNoise $ "Waiting to retry " <> bshow addr void $ startDelay timer (round $ 1000 * (expected-elapsed)) debugNoise $ "retry " <> bshow (shouldRetry,addr) when shouldRetry $ retryLoop -- INTERNAL ---------------------------------------------------------- {- hWriteUntilNothing h outs = fix $ \loop -> do mb <- atomically $ takeTMVar outs case mb of Just bs -> do S.hPutStrLn h bs warn $ "wrote " <> bs loop Nothing -> do warn $ "wrote Nothing" hClose h -} connRead :: ConnectionState -> IO (Maybe ByteString) connRead (WriteOnlyConnection w) = do -- atomically $ discardContents (threadsChannel w) return Nothing connRead conn = do c <- atomically $ getThreads threadsRead c where getThreads = case conn of SaneConnection c -> return c ReadOnlyConnection c -> return c ConnectionPair c w -> do -- discardContents (threadsChannel w) return c socketFamily :: SockAddr -> Family socketFamily (SockAddrInet _ _) = AF_INET socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 socketFamily (SockAddrUnix _) = AF_UNIX conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) -> ConnectionState -> ConnectionEvent x conevent sessionConduits con = Connection pingflag read write where pingflag = swapTVar (pingFlag (connPingTimer con)) False (read,write) = sessionConduits (connRead con) (connWrite con) newConnection :: Ord a => Server a u1 releaseKey x -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) -> ConnectionParameters conkey u -> a -> u1 -> Handle -> InOrOut -> IO ConnectionThreads newConnection server sessionConduits params conkey u h inout = do hSetBuffering h NoBuffering let (idle_ms,timeout_ms) = case (inout,duplex params) of (Out,False) -> ( 0, 0 ) _ -> ( pingInterval params , timeout params ) new <- do pinglogic <- forkPingMachine idle_ms timeout_ms connectionThreads h pinglogic started <- atomically $ newEmptyTMVar kontvar <- atomically newEmptyTMVar -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ? let _ = kontvar :: TMVar (STM (IO ())) forkIO $ do myThreadId >>= flip labelThread ("connecting...") getkont <- atomically $ takeTMVar kontvar kont <- atomically getkont kont atomically $ do current <- fmap (Map.lookup conkey) $ readTVar (conmap server) case current of Nothing -> do (newCon,e) <- return $ if duplex params then let newcon = SaneConnection new in ( newcon, ((conkey,u), conevent sessionConduits newcon) ) else ( case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new , ((conkey,u), HalfConnection inout) ) modifyTVar' (conmap server) $ Map.insert conkey ConnectionRecord { ckont = kontvar , cstate = newCon , cdata = u } announce e putTMVar kontvar $ return $ do myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice. atomically $ putTMVar started () -- Wait for something interesting. handleEOF conkey u kontvar newCon Just what@ConnectionRecord { ckont =mvar }-> do putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread. putTMVar mvar $ do -- The action returned by updateConMap, eventually invokes handleEOF, -- so the sequencer thread will not be terminated. kont <- updateConMap conkey u new what putTMVar started () return kont return new where announce e = writeTChan (serverEvent server) e -- This function loops and will not quit unless an action is posted to the -- mvar that does not in turn invoke this function, or if an EOF occurs. handleEOF conkey u mvar newCon = do action <- atomically . foldr1 orElse $ [ takeTMVar mvar >>= id -- passed continuation , connWait newCon >> return eof , connWaitPing newCon >>= return . sendPing -- , pingWait pingTimer >>= return . sendPing ] action :: IO () where eof = do -- warn $ "EOF " <>bshow conkey connCancelPing newCon atomically $ do connFlush newCon announce ((conkey,u),EOF) modifyTVar' (conmap server) $ Map.delete conkey -- warn $ "fin-EOF "<>bshow conkey sendPing PingTimeOut = do {- utc <- getCurrentTime let utc' = formatTime defaultTimeLocale "%s" utc warn $ "ping:TIMEOUT " <> bshow utc' -} atomically (connClose newCon) eof sendPing PingIdle = do {- utc <- getCurrentTime let utc' = formatTime defaultTimeLocale "%s" utc -- warn $ "ping:IDLE " <> bshow utc' -} atomically $ announce ((conkey,u),RequiresPing) handleEOF conkey u mvar newCon updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do new' <- if duplex params then do announce ((conkey,u),EOF) connClose replaced let newcon = SaneConnection new announce $ ((conkey,u),conevent sessionConduits newcon) return $ newcon else case replaced of WriteOnlyConnection w | inout==In -> do let newcon = ConnectionPair new w announce ((conkey,u),conevent sessionConduits newcon) return newcon ReadOnlyConnection r | inout==Out -> do let newcon = ConnectionPair r new announce ((conkey,u),conevent sessionConduits newcon) return newcon _ -> do -- connFlush todo announce ((conkey,u0), EOF) connClose replaced announce ((conkey,u), HalfConnection inout) return $ case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new modifyTVar' (conmap server) $ Map.insert conkey ConnectionRecord { ckont = mvar , cstate = new' , cdata = u } return $ handleEOF conkey u mvar new' getPacket :: Handle -> IO ByteString getPacket h = do hWaitForInput h (-1) hGetNonBlocking h 1024 -- | 'ConnectionThreads' is an interface to a pair of threads -- that are reading and writing a 'Handle'. data ConnectionThreads = ConnectionThreads { threadsWriter :: TMVar (Maybe ByteString) , threadsChannel :: TChan ByteString , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close , threadsPing :: PingMachine } -- | This spawns the reader and writer threads and returns a newly -- constructed 'ConnectionThreads' object. connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads connectionThreads h pinglogic = do (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan readerThread <- forkIO $ do myThreadId >>= flip labelThread ("readerThread") let finished e = do hClose h -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) -- let _ = fmap ioeGetErrorType e -- type hint let _ = fmap what e where what (SomeException _) = undefined atomically $ do tryTakeTMVar outs putTMVar outs Nothing -- quit writer putTMVar doner () handle (finished . Just) $ do pingBump pinglogic -- start the ping timer fix $ \loop -> do packet <- getPacket h -- warn $ "read: " <> S.take 60 packet atomically $ writeTChan incomming packet pingBump pinglogic -- warn $ "bumped: " <> S.take 60 packet isEof <- hIsEOF h if isEof then finished Nothing else loop writerThread <- forkIO . fix $ \loop -> do myThreadId >>= flip labelThread ("writerThread") let finished = do -- warn $ "finished write" -- hClose h -- quit reader throwTo readerThread (ErrorCall "EOF") atomically $ putTMVar donew () mb <- atomically $ readTMVar outs case mb of Just bs -> handle (\(SomeException e)->finished) (do -- warn $ "writing: " <> S.take 60 bs S.hPutStr h bs -- warn $ "wrote: " <> S.take 60 bs atomically $ takeTMVar outs loop) Nothing -> finished let wait = do readTMVar donew readTMVar doner return () return ConnectionThreads { threadsWriter = outs , threadsChannel = incomming , threadsWait = wait , threadsPing = pinglogic } -- | 'threadsWrite' writes the given 'ByteString' to the -- 'ConnectionThreads' object. It blocks until the ByteString -- is written and 'True' is returned, or the connection is -- interrupted and 'False' is returned. threadsWrite :: ConnectionThreads -> ByteString -> IO Bool threadsWrite c bs = atomically $ orElse (const False `fmap` threadsWait c) (const True `fmap` putTMVar (threadsWriter c) (Just bs)) -- | 'threadsClose' signals for the 'ConnectionThreads' object -- to quit and close the associated 'Handle'. This operation -- is non-blocking, follow it with 'threadsWait' if you want -- to wait for the operation to complete. threadsClose :: ConnectionThreads -> STM () threadsClose c = do let mvar = threadsWriter c v <- tryReadTMVar mvar case v of Just Nothing -> return () -- already closed _ -> putTMVar mvar Nothing -- | 'threadsRead' blocks until a 'ByteString' is available which -- is returned to the caller, or the connection is interrupted and -- 'Nothing' is returned. threadsRead :: ConnectionThreads -> IO (Maybe ByteString) threadsRead c = atomically $ orElse (const Nothing `fmap` threadsWait c) (Just `fmap` readTChan (threadsChannel c)) -- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' -- or to a pair of 'ConnectionThreads' objects that are considered as one -- connection. data ConnectionState = SaneConnection ConnectionThreads -- ^ ordinary read/write connection | WriteOnlyConnection ConnectionThreads | ReadOnlyConnection ConnectionThreads | ConnectionPair ConnectionThreads ConnectionThreads -- ^ Two 'ConnectionThreads' objects, read operations use the -- first, write operations use the second. connWrite :: ConnectionState -> ByteString -> IO Bool connWrite (ReadOnlyConnection _) bs = return False connWrite conn bs = threadsWrite c bs where c = case conn of SaneConnection c -> c WriteOnlyConnection c -> c ConnectionPair _ c -> c mapConn :: Bool -> (ConnectionThreads -> STM ()) -> ConnectionState -> STM () mapConn both action c = case c of SaneConnection rw -> action rw ReadOnlyConnection r -> action r WriteOnlyConnection w -> action w ConnectionPair r w -> do rem <- orElse (const w `fmap` action r) (const r `fmap` action w) when both $ action rem connClose :: ConnectionState -> STM () connClose c = mapConn True threadsClose c connWait :: ConnectionState -> STM () connWait c = doit -- mapConn False threadsWait c where action = threadsWait doit = case c of SaneConnection rw -> action rw ReadOnlyConnection r -> action r WriteOnlyConnection w -> action w ConnectionPair r w -> do rem <- orElse (const w `fmap` action r) (const r `fmap` action w) threadsClose rem connPingTimer :: ConnectionState -> PingMachine connPingTimer c = case c of SaneConnection rw -> threadsPing rw ReadOnlyConnection r -> threadsPing r WriteOnlyConnection w -> threadsPing w -- should be disabled. ConnectionPair r w -> threadsPing r connCancelPing :: ConnectionState -> IO () connCancelPing c = pingCancel (connPingTimer c) connWaitPing :: ConnectionState -> STM PingEvent connWaitPing c = pingWait (connPingTimer c) connFlush :: ConnectionState -> STM () connFlush c = case c of SaneConnection rw -> waitChan rw ReadOnlyConnection r -> waitChan r WriteOnlyConnection w -> return () ConnectionPair r w -> waitChan r where waitChan t = do b <- isEmptyTChan (threadsChannel t) when (not b) retry bshow :: Show a => a -> ByteString bshow e = S.pack . show $ e warn :: ByteString -> IO () warn str = S.hPutStrLn stderr str >> hFlush stderr debugNoise :: Monad m => t -> m () debugNoise str = return () data TCPStatus = Resolving | AwaitingRead | AwaitingWrite tcpManager :: ( Show k, Ord k, Ord conkey ) => (conkey -> (SockAddr, ConnectionParameters conkey u, Miliseconds)) -> (String -> Maybe k) -> (k -> IO (Maybe conkey)) -> Server conkey u releaseKey x -> IO (Manager TCPStatus k) tcpManager grokKey s2k resolvKey sv = do rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey) nullping <- forkPingMachine 0 0 return Manager { setPolicy = \k -> \case TryingToConnect -> join $ atomically $ do r <- readTVar rmap case Map.lookup k r of Just {} -> return $ return () -- Connection already in progress. Nothing -> do modifyTVar' rmap $ Map.insert k Nothing return $ void $ forkIO $ do myThreadId >>= flip labelThread ("resolve."++show k) mconkey <- resolvKey k case mconkey of Nothing -> atomically $ modifyTVar' rmap $ Map.delete k Just conkey -> do control sv $ case grokKey conkey of (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms OpenToConnect -> hPutStrLn stderr "TODO: TCP OpenToConnect" RefusingToConnect -> hPutStrLn stderr "TODO: TCP RefusingToConnect" , connections = do c <- readTVar $ conmap sv fmap (exportConnection nullping c) <$> readTVar rmap , stringToKey = s2k , showProgress = \case Resolving -> "resolving" AwaitingRead -> "awaiting inbound" AwaitingWrite -> "awaiting outbound" , showKey = show } exportConnection :: Ord conkey => PingMachine -> Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus exportConnection nullping conmap mkey = G.Connection { G.connStatus = case mkey of Nothing -> return $ G.Dormant Just conkey -> case Map.lookup conkey conmap of Nothing -> return $ G.InProgress Resolving Just (ConnectionRecord ckont cstate cdata) -> return $ case cstate of SaneConnection {} -> G.Established ConnectionPair {} -> G.Established ReadOnlyConnection {} -> G.InProgress AwaitingWrite WriteOnlyConnection {} -> G.InProgress AwaitingRead , G.connPolicy = return TryingToConnect , G.connPingLogic = case mkey >>= flip Map.lookup conmap of Nothing -> nullping Just (ConnectionRecord _ cstate _) -> connPingTimer cstate }