From 31b799222cb76cd0002d9a3cc5b340a7b6fed139 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 3 Jan 2020 15:35:23 -0500 Subject: server library. --- dht/Connection/Tcp.hs | 824 -------------------------------------------------- 1 file changed, 824 deletions(-) delete mode 100644 dht/Connection/Tcp.hs (limited to 'dht/Connection/Tcp.hs') diff --git a/dht/Connection/Tcp.hs b/dht/Connection/Tcp.hs deleted file mode 100644 index 4d50d47f..00000000 --- a/dht/Connection/Tcp.hs +++ /dev/null @@ -1,824 +0,0 @@ -{-# OPTIONS_HADDOCK prune #-} -{-# LANGUAGE CPP #-} -{-# LANGUAGE DoAndIfThenElse #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE TupleSections #-} -{-# LANGUAGE LambdaCase #-} ------------------------------------------------------------------------------ --- | --- Module : Connection.Tcp --- --- Maintainer : joe@jerkface.net --- Stability : experimental --- --- A TCP client/server library. --- --- TODO: --- --- * interface tweaks --- -module Connection.Tcp - ( module Connection.Tcp - , module Control.Concurrent.PingMachine ) where - -import Data.ByteString (ByteString,hGetNonBlocking) -import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) -import Data.Conduit ( ConduitT, Void, Flush ) -#if MIN_VERSION_containers(0,5,0) -import qualified Data.Map.Strict as Map -import Data.Map.Strict (Map) -#else -import qualified Data.Map as Map -import Data.Map (Map) -#endif -import Data.Monoid ( (<>) ) -import Control.Concurrent.ThreadUtil - -import Control.Arrow -import Control.Concurrent.STM --- import Control.Concurrent.STM.TMVar --- import Control.Concurrent.STM.TChan --- import Control.Concurrent.STM.Delay -import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException) -import Control.Monad -import Control.Monad.Fix --- import Control.Monad.STM --- import Control.Monad.Trans.Resource -import Control.Monad.IO.Class (MonadIO (liftIO)) -import Data.Maybe -import System.IO.Error (isDoesNotExistError) -import System.IO - ( IOMode(..) - , hSetBuffering - , BufferMode(..) - , hWaitForInput - , hClose - , hIsEOF - , Handle - ) -import Network.Socket as Socket -import Network.BSD - ( getProtocolNumber - ) -import Debug.Trace -import Data.Time.Clock (getCurrentTime,diffUTCTime) --- import SockAddr () --- import System.Locale (defaultTimeLocale) - -import qualified Data.Text as Text - ;import Data.Text (Text) -import DNSCache -import Control.Concurrent.Delay -import Control.Concurrent.PingMachine -import Network.StreamServer -import Network.SocketLike hiding (sClose) -import qualified Connection as G - ;import Connection (Manager (..), PeerAddress (..), Policy (..)) -import Network.Address (localhost4) -import DPut -import DebugTag - - -type Microseconds = Int - --- | This object is passed with the 'Listen' and 'Connect' --- instructions in order to control the behavior of the --- connections that are established. It is parameterized --- by a user-suplied type @conkey@ that is used as a lookup --- key for connections. -data ConnectionParameters conkey u = - ConnectionParameters - { pingInterval :: PingInterval - -- ^ The miliseconds of idle to allow before a 'RequiresPing' - -- event is signaled. - , timeout :: TimeOut - -- ^ The miliseconds of idle after 'RequiresPing' is signaled - -- that are necessary for the connection to be considered - -- lost and signalling 'EOF'. - , makeConnKey :: (RestrictedSocket,(Local SockAddr, Remote SockAddr)) -> IO (conkey,u) - -- ^ This action creates a lookup key for a new connection. If 'duplex' - -- is 'True' and the result is already assocatied with an established - -- connection, then an 'EOF' will be forced before the the new - -- connection becomes active. - -- - , duplex :: Bool - -- ^ If True, then the connection will be treated as a normal - -- two-way socket. Otherwise, a readable socket is established - -- with 'Listen' and a writable socket is established with - -- 'Connect' and they are associated when 'makeConnKey' yields - -- same value for each. - } - --- | Use this function to select appropriate default values for --- 'ConnectionParameters' other than 'makeConnKey'. --- --- Current defaults: --- --- * 'pingInterval' = 28000 --- --- * 'timeout' = 2000 --- --- * 'duplex' = True --- -connectionDefaults - :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> IO (conkey,u)) -> ConnectionParameters conkey u -connectionDefaults f = ConnectionParameters - { pingInterval = 28000 - , timeout = 2000 - , makeConnKey = f - , duplex = True - } - --- | Instructions for a 'Server' object --- --- To issue a command, put it into the 'serverCommand' TMVar. -data ServerInstruction conkey u - = Quit - -- ^ kill the server. This command is automatically issued when - -- the server is released. - | Listen SockAddr (ConnectionParameters conkey u) - -- ^ listen for incoming connections on the given bind address. - | Connect SockAddr (ConnectionParameters conkey u) - -- ^ connect to addresses - | ConnectWithEndlessRetry SockAddr - (ConnectionParameters conkey u) - Miliseconds - -- ^ keep retrying the connection - | Ignore SockAddr - -- ^ stop listening on specified bind address - | Send conkey ByteString - -- ^ send bytes to an established connection - -#ifdef TEST -deriving instance Show conkey => Show (ServerInstruction conkey u) -instance Show (a -> b) where show _ = "" -deriving instance Show conkey => Show (ConnectionParameters conkey u) -#endif - --- | This type specifies which which half of a half-duplex --- connection is of interest. -data InOrOut = In | Out - deriving (Enum,Eq,Ord,Show,Read) - --- | These events may be read from 'serverEvent' TChannel. --- -data ConnectionEvent b - = Connection (STM Bool) (ConduitT () b IO ()) (ConduitT (Flush b) Void IO ()) - -- ^ A new connection was established - | ConnectFailure SockAddr - -- ^ A 'Connect' command failed. - | HalfConnection InOrOut - -- ^ Half of a half-duplex connection is avaliable. - | EOF - -- ^ A connection was terminated - | RequiresPing - -- ^ 'pingInterval' miliseconds of idle was experienced - -#ifdef TEST -instance Show (IO a) where show _ = "" -instance Show (STM a) where show _ = "" -instance Eq (ByteString -> IO Bool) where (==) _ _ = True -instance Eq (IO (Maybe ByteString)) where (==) _ _ = True -instance Eq (STM Bool) where (==) _ _ = True -deriving instance Show b => Show (ConnectionEvent b) -deriving instance Eq b => Eq (ConnectionEvent b) -#endif - --- | This is the per-connection state. -data ConnectionRecord u - = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler - , cstate :: ConnectionState -- ^ used to send/receive data to the connection - , cdata :: u -- ^ user data, stored in the connection map for convenience - } - --- | This object accepts commands and signals events and maintains --- the list of currently listening ports and established connections. -data Server a u releaseKey b - = Server { serverCommand :: TMVar (ServerInstruction a u) - , serverEvent :: TChan ((a,u), ConnectionEvent b) - , serverReleaseKey :: releaseKey - , conmap :: TVar (Map a (ConnectionRecord u)) - , listenmap :: TVar (Map SockAddr ServerHandle) - , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) - } - -control :: Server a u releaseKey b -> ServerInstruction a u -> IO () -control sv = atomically . putTMVar (serverCommand sv) - -type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) - -noCleanUp :: MonadIO m => Allocate () m -noCleanUp io _ = ( (,) () ) `liftM` liftIO io - --- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' --- to ensure proper cleanup. For example, --- --- > import Connection.Tcp --- > import Control.Monad.Trans.Resource (runResourceT) --- > import Control.Monad.IO.Class (liftIO) --- > import Control.Monad.STM (atomically) --- > import Control.Concurrent.STM.TMVar (putTMVar) --- > import Control.Concurrent.STM.TChan (readTChan) --- > --- > main = runResourceT $ do --- > sv <- server allocate --- > let params = connectionDefaults (return . snd) --- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) --- > let loop = do --- > (_,event) <- atomically $ readTChan (serverEvent sv) --- > case event of --- > Connection getPingFlag readData writeData -> do --- > forkIO $ do --- > fix $ \readLoop -> do --- > readData >>= mapM $ \bytes -> --- > putStrLn $ "got: " ++ show bytes --- > readLoop --- > case event of EOF -> return () --- > _ -> loop --- > liftIO loop --- --- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp' --- to do without automatic cleanup and be sure to remember to write 'Quit' to --- the 'serverCommand' variable. -server :: - -- forall (m :: * -> *) a u conkey releaseKey. - (Show conkey, MonadIO m, Ord conkey) => - Allocate releaseKey m - -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) - -> m (Server conkey u releaseKey x) -server allocate sessionConduits = do - (key,cmds) <- allocate (atomically newEmptyTMVar) - (atomically . flip putTMVar Quit) - server <- liftIO . atomically $ do - tchan <- newTChan - conmap <- newTVar Map.empty - listenmap<- newTVar Map.empty - retrymap <- newTVar Map.empty - return Server { serverCommand = cmds - , serverEvent = tchan - , serverReleaseKey = key - , conmap = conmap - , listenmap = listenmap - , retrymap = retrymap - } - liftIO $ do - forkLabeled "server" $ fix $ \loop -> do - instr <- atomically $ takeTMVar cmds - -- warn $ "instr = " <> bshow instr - let again = do doit server instr - -- warn $ "finished " <> bshow instr - loop - case instr of Quit -> closeAll server - _ -> again - return server - where - closeAll server = do - listening <- atomically . readTVar $ listenmap server - mapM_ quitListening (Map.elems listening) - let stopRetry (v,d) = do atomically $ writeTVar v False - interruptDelay d - retriers <- atomically $ do - rmap <- readTVar $ retrymap server - writeTVar (retrymap server) Map.empty - return rmap - mapM_ stopRetry (Map.elems retriers) - cons <- atomically . readTVar $ conmap server - atomically $ mapM_ (connClose . cstate) (Map.elems cons) - atomically $ mapM_ (connWait . cstate) (Map.elems cons) - atomically $ writeTVar (conmap server) Map.empty - - - doit server (Listen port params) = do - - listening <- Map.member port - `fmap` atomically (readTVar $ listenmap server) - when (not listening) $ do - - dput XMisc $ "Started listening on "++show port - - sserv <- flip streamServer [port] ServerConfig - { serverWarn = dput XMisc - , serverSession = \sock _ h -> do - (conkey,u) <- makeConnKey params sock - _ <- newConnection server sessionConduits params conkey u h In - return () - } - - atomically $ listenmap server `modifyTVar'` Map.insert port sserv - - doit server (Ignore port) = do - dput XMisc $ "Stopping listen on "++show port - mb <- atomically $ do - map <- readTVar $ listenmap server - modifyTVar' (listenmap server) $ Map.delete port - return $ Map.lookup port map - maybe (return ()) quitListening $ mb - - doit server (Send con bs) = do -- . void . forkIO $ do - map <- atomically $ readTVar (conmap server) - let post False = (trace ("cant send: "++show bs) $ return ()) - post True = return () - maybe (post False) - (post <=< flip connWrite bs . cstate) - $ Map.lookup con map - - doit server (Connect addr params) = join $ atomically $ do - Map.lookup addr <$> readTVar (retrymap server) - >>= return . \case - Nothing -> forkit - Just (v,d) -> do b <- atomically $ readTVar v - interruptDelay d - when (not b) forkit - where - forkit = void . forkLabeled ( "Connect." ++ show addr ) $ do - proto <- getProtocolNumber "tcp" - sock <- socket (socketFamily addr) Stream proto - handle (\e -> do -- let t = ioeGetErrorType e - when (isDoesNotExistError e) $ return () -- warn "GOTCHA" - -- warn $ "connect-error: " <> bshow e - (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) -- XXX: ? - Socket.close sock - atomically - $ writeTChan (serverEvent server) - $ ((conkey,u),ConnectFailure addr)) - $ do - connect sock addr - laddr <- Socket.getSocketName sock - (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) - h <- socketToHandle sock ReadWriteMode - newConnection server sessionConduits params conkey u h Out - return () - - doit server (ConnectWithEndlessRetry addr params interval) = do - proto <- getProtocolNumber "tcp" - void . forkLabeled ("ConnectWithEndlessRetry." ++ show addr) $ do - timer <- interruptibleDelay - (retryVar,action) <- atomically $ do - map <- readTVar (retrymap server) - action <- case Map.lookup addr map of - Nothing -> return $ return () - Just (v,d) -> do writeTVar v False - return $ interruptDelay d - v <- newTVar True - writeTVar (retrymap server) $! Map.insert addr (v,timer) map - return (v,action :: IO ()) - action - fix $ \retryLoop -> do - utc <- getCurrentTime - shouldRetry <- do - handle (\(SomeException e) -> do - -- Exceptions thrown by 'socket' need to be handled specially - -- since we don't have enough information to broadcast a ConnectFailure - -- on serverEvent. - warn $ "Failed to create socket: " <> bshow e - atomically $ readTVar retryVar) $ do - sock <- socket (socketFamily addr) Stream proto - handle (\(SomeException e) -> do - -- Any thing else goes wrong and we broadcast ConnectFailure. - do (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) - Socket.close sock - atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr) - `onException` return () - atomically $ readTVar retryVar) $ do - connect sock addr - laddr <- Socket.getSocketName sock - (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) - h <- socketToHandle sock ReadWriteMode - threads <- newConnection server sessionConduits params conkey u h Out - atomically $ do threadsWait threads - readTVar retryVar - fin_utc <- getCurrentTime - when shouldRetry $ do - let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) - expected = fromIntegral interval - when (shouldRetry && elapsed < expected) $ do - debugNoise $ "Waiting to retry " <> bshow addr - void $ startDelay timer (round $ 1000 * (expected-elapsed)) - debugNoise $ "retry " <> bshow (shouldRetry,addr) - when shouldRetry $ retryLoop - - --- INTERNAL ---------------------------------------------------------- - -{- -hWriteUntilNothing h outs = - fix $ \loop -> do - mb <- atomically $ takeTMVar outs - case mb of Just bs -> do S.hPutStrLn h bs - warn $ "wrote " <> bs - loop - Nothing -> do warn $ "wrote Nothing" - hClose h - --} -connRead :: ConnectionState -> IO (Maybe ByteString) -connRead (WriteOnlyConnection w) = do - -- atomically $ discardContents (threadsChannel w) - return Nothing -connRead conn = do - c <- atomically $ getThreads - threadsRead c - where - getThreads = - case conn of SaneConnection c -> return c - ReadOnlyConnection c -> return c - ConnectionPair c w -> do - -- discardContents (threadsChannel w) - return c - -socketFamily :: SockAddr -> Family -socketFamily (SockAddrInet _ _) = AF_INET -socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 -socketFamily (SockAddrUnix _) = AF_UNIX - - -conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) - -> ConnectionState - -> ConnectionEvent x -conevent sessionConduits con = Connection pingflag read write - where - pingflag = swapTVar (pingFlag (connPingTimer con)) False - (read,write) = sessionConduits (connRead con) (connWrite con) - -newConnection :: Ord a - => Server a u1 releaseKey x - -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) - -> ConnectionParameters conkey u - -> a - -> u1 - -> Handle - -> InOrOut - -> IO ConnectionThreads -newConnection server sessionConduits params conkey u h inout = do - hSetBuffering h NoBuffering - let (idle_ms,timeout_ms) = - case (inout,duplex params) of - (Out,False) -> ( 0, 0 ) - _ -> ( pingInterval params - , timeout params ) - - new <- do pinglogic <- forkPingMachine "newConnection" idle_ms timeout_ms - connectionThreads h pinglogic - started <- atomically $ newEmptyTMVar - kontvar <- atomically newEmptyTMVar - -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ? - let _ = kontvar :: TMVar (STM (IO ())) - forkLabeled ("connecting...") $ do - getkont <- atomically $ takeTMVar kontvar - kont <- atomically getkont - kont - - atomically $ do - current <- fmap (Map.lookup conkey) $ readTVar (conmap server) - case current of - Nothing -> do - (newCon,e) <- return $ - if duplex params - then let newcon = SaneConnection new - in ( newcon, ((conkey,u), conevent sessionConduits newcon) ) - else ( case inout of - In -> ReadOnlyConnection new - Out -> WriteOnlyConnection new - , ((conkey,u), HalfConnection inout) ) - modifyTVar' (conmap server) $ Map.insert conkey - ConnectionRecord { ckont = kontvar - , cstate = newCon - , cdata = u } - announce e - putTMVar kontvar $ return $ do - myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice. - atomically $ putTMVar started () - -- Wait for something interesting. - handleEOF conkey u kontvar newCon - Just what@ConnectionRecord { ckont =mvar }-> do - putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread. - putTMVar mvar $ do - -- The action returned by updateConMap, eventually invokes handleEOF, - -- so the sequencer thread will not be terminated. - kont <- updateConMap conkey u new what - putTMVar started () - return kont - return new - where - - announce e = writeTChan (serverEvent server) e - - -- This function loops and will not quit unless an action is posted to the - -- mvar that does not in turn invoke this function, or if an EOF occurs. - handleEOF conkey u mvar newCon = do - action <- atomically . foldr1 orElse $ - [ takeTMVar mvar >>= id -- passed continuation - , connWait newCon >> return eof - , connWaitPing newCon >>= return . sendPing - -- , pingWait pingTimer >>= return . sendPing - ] - action :: IO () - where - eof = do - -- warn $ "EOF " <>bshow conkey - connCancelPing newCon - atomically $ do connFlush newCon - announce ((conkey,u),EOF) - modifyTVar' (conmap server) - $ Map.delete conkey - -- warn $ "fin-EOF "<>bshow conkey - - sendPing PingTimeOut = do - {- - utc <- getCurrentTime - let utc' = formatTime defaultTimeLocale "%s" utc - warn $ "ping:TIMEOUT " <> bshow utc' - -} - atomically (connClose newCon) - eof - - sendPing PingIdle = do - {- - utc <- getCurrentTime - let utc' = formatTime defaultTimeLocale "%s" utc - -- warn $ "ping:IDLE " <> bshow utc' - -} - atomically $ announce ((conkey,u),RequiresPing) - handleEOF conkey u mvar newCon - - - updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do - new' <- - if duplex params then do - announce ((conkey,u),EOF) - connClose replaced - let newcon = SaneConnection new - announce $ ((conkey,u),conevent sessionConduits newcon) - return $ newcon - else - case replaced of - WriteOnlyConnection w | inout==In -> - do let newcon = ConnectionPair new w - announce ((conkey,u),conevent sessionConduits newcon) - return newcon - ReadOnlyConnection r | inout==Out -> - do let newcon = ConnectionPair r new - announce ((conkey,u),conevent sessionConduits newcon) - return newcon - _ -> do -- connFlush todo - announce ((conkey,u0), EOF) - connClose replaced - announce ((conkey,u), HalfConnection inout) - return $ case inout of - In -> ReadOnlyConnection new - Out -> WriteOnlyConnection new - modifyTVar' (conmap server) $ Map.insert conkey - ConnectionRecord { ckont = mvar - , cstate = new' - , cdata = u } - return $ handleEOF conkey u mvar new' - - -getPacket :: Handle -> IO ByteString -getPacket h = do hWaitForInput h (-1) - hGetNonBlocking h 1024 - - - --- | 'ConnectionThreads' is an interface to a pair of threads --- that are reading and writing a 'Handle'. -data ConnectionThreads = ConnectionThreads - { threadsWriter :: TMVar (Maybe ByteString) - , threadsChannel :: TChan ByteString - , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close - , threadsPing :: PingMachine - } - --- | This spawns the reader and writer threads and returns a newly --- constructed 'ConnectionThreads' object. -connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads -connectionThreads h pinglogic = do - - (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar - - (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan - readerThread <- forkLabeled "readerThread" $ do - let finished e = do - hClose h - -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) - -- let _ = fmap ioeGetErrorType e -- type hint - let _ = fmap what e where what (SomeException _) = undefined - atomically $ do tryTakeTMVar outs - putTMVar outs Nothing -- quit writer - putTMVar doner () - handle (finished . Just) $ do - pingBump pinglogic -- start the ping timer - fix $ \loop -> do - packet <- getPacket h - -- warn $ "read: " <> S.take 60 packet - atomically $ writeTChan incomming packet - pingBump pinglogic - -- warn $ "bumped: " <> S.take 60 packet - isEof <- hIsEOF h - if isEof then finished Nothing else loop - - writerThread <- forkLabeled "writerThread" . fix $ \loop -> do - let finished = do -- warn $ "finished write" - -- hClose h -- quit reader - throwTo readerThread (ErrorCall "EOF") - atomically $ putTMVar donew () - mb <- atomically $ readTMVar outs - case mb of Just bs -> handle (\(SomeException e)->finished) - (do -- warn $ "writing: " <> S.take 60 bs - S.hPutStr h bs - -- warn $ "wrote: " <> S.take 60 bs - atomically $ takeTMVar outs - loop) - Nothing -> finished - - let wait = do readTMVar donew - readTMVar doner - return () - return ConnectionThreads { threadsWriter = outs - , threadsChannel = incomming - , threadsWait = wait - , threadsPing = pinglogic } - - --- | 'threadsWrite' writes the given 'ByteString' to the --- 'ConnectionThreads' object. It blocks until the ByteString --- is written and 'True' is returned, or the connection is --- interrupted and 'False' is returned. -threadsWrite :: ConnectionThreads -> ByteString -> IO Bool -threadsWrite c bs = atomically $ - orElse (const False `fmap` threadsWait c) - (const True `fmap` putTMVar (threadsWriter c) (Just bs)) - --- | 'threadsClose' signals for the 'ConnectionThreads' object --- to quit and close the associated 'Handle'. This operation --- is non-blocking, follow it with 'threadsWait' if you want --- to wait for the operation to complete. -threadsClose :: ConnectionThreads -> STM () -threadsClose c = do - let mvar = threadsWriter c - v <- tryReadTMVar mvar - case v of - Just Nothing -> return () -- already closed - _ -> putTMVar mvar Nothing - --- | 'threadsRead' blocks until a 'ByteString' is available which --- is returned to the caller, or the connection is interrupted and --- 'Nothing' is returned. -threadsRead :: ConnectionThreads -> IO (Maybe ByteString) -threadsRead c = atomically $ - orElse (const Nothing `fmap` threadsWait c) - (Just `fmap` readTChan (threadsChannel c)) - --- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' --- or to a pair of 'ConnectionThreads' objects that are considered as one --- connection. -data ConnectionState = - SaneConnection ConnectionThreads - -- ^ ordinary read/write connection - | WriteOnlyConnection ConnectionThreads - | ReadOnlyConnection ConnectionThreads - | ConnectionPair ConnectionThreads ConnectionThreads - -- ^ Two 'ConnectionThreads' objects, read operations use the - -- first, write operations use the second. - - - -connWrite :: ConnectionState -> ByteString -> IO Bool -connWrite (ReadOnlyConnection _) bs = return False -connWrite conn bs = threadsWrite c bs - where - c = case conn of SaneConnection c -> c - WriteOnlyConnection c -> c - ConnectionPair _ c -> c - - -mapConn :: Bool -> - (ConnectionThreads -> STM ()) -> ConnectionState -> STM () -mapConn both action c = - case c of - SaneConnection rw -> action rw - ReadOnlyConnection r -> action r - WriteOnlyConnection w -> action w - ConnectionPair r w -> do - rem <- orElse (const w `fmap` action r) - (const r `fmap` action w) - when both $ action rem - -connClose :: ConnectionState -> STM () -connClose c = mapConn True threadsClose c - -connWait :: ConnectionState -> STM () -connWait c = doit -- mapConn False threadsWait c - where - action = threadsWait - doit = - case c of - SaneConnection rw -> action rw - ReadOnlyConnection r -> action r - WriteOnlyConnection w -> action w - ConnectionPair r w -> do - rem <- orElse (const w `fmap` action r) - (const r `fmap` action w) - threadsClose rem - -connPingTimer :: ConnectionState -> PingMachine -connPingTimer c = - case c of - SaneConnection rw -> threadsPing rw - ReadOnlyConnection r -> threadsPing r - WriteOnlyConnection w -> threadsPing w -- should be disabled. - ConnectionPair r w -> threadsPing r - -connCancelPing :: ConnectionState -> IO () -connCancelPing c = pingCancel (connPingTimer c) - -connWaitPing :: ConnectionState -> STM PingEvent -connWaitPing c = pingWait (connPingTimer c) - -connFlush :: ConnectionState -> STM () -connFlush c = - case c of - SaneConnection rw -> waitChan rw - ReadOnlyConnection r -> waitChan r - WriteOnlyConnection w -> return () - ConnectionPair r w -> waitChan r - where - waitChan t = do - b <- isEmptyTChan (threadsChannel t) - when (not b) retry - -bshow :: Show a => a -> ByteString -bshow e = S.pack . show $ e - -warn :: ByteString -> IO () -warn str =dputB XMisc str - -debugNoise :: Monad m => t -> m () -debugNoise str = return () - -data TCPStatus = Resolving | AwaitingRead | AwaitingWrite - --- SockAddr -> (SockAddr, ConnectionParameters SockAddr ConnectionData, Miliseconds) - - -tcpManager :: (PeerAddress -> (SockAddr, ConnectionParameters PeerAddress u, Miliseconds)) - -- -> (String -> Maybe Text) - -- -> (Text -> IO (Maybe PeerAddress)) - -> Server PeerAddress u releaseKey x - -> IO (Manager TCPStatus Text) -tcpManager grokKey sv = do - rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey) - nullping <- forkPingMachine "tcpManager" 0 0 - (rslv,rev) <- do - dns <- newDNSCache - let rslv k = map PeerAddress <$> forwardResolve dns k - rev (PeerAddress addr) = reverseResolve dns addr - return (rslv,rev) - return Manager { - setPolicy = \k -> \case - TryingToConnect -> join $ atomically $ do - r <- readTVar rmap - case Map.lookup k r of - Just {} -> return $ return () -- Connection already in progress. - Nothing -> do - modifyTVar' rmap $ Map.insert k Nothing - return $ void $ forkLabeled ("resolve."++show k) $ do - mconkey <- listToMaybe <$> rslv k - case mconkey of - Nothing -> atomically $ modifyTVar' rmap $ Map.delete k - Just conkey -> do - control sv $ case grokKey conkey of - (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms - OpenToConnect -> dput XMisc "TODO: TCP OpenToConnect" - RefusingToConnect -> dput XMisc "TODO: TCP RefusingToConnect" - , status = \k -> do - c <- readTVar (conmap sv) - ck <- Map.lookup k <$> readTVar rmap - return $ exportConnection c (join ck) - , connections = Map.keys <$> readTVar rmap - , stringToKey = Just . Text.pack - , showProgress = \case - Resolving -> "resolving" - AwaitingRead -> "awaiting inbound" - AwaitingWrite -> "awaiting outbound" - , showKey = show - , resolvePeer = rslv - , reverseAddress = rev - } - -exportConnection :: Ord conkey => Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus -exportConnection conmap mkey = G.Connection - { G.connStatus = case mkey of - Nothing -> G.Dormant - Just conkey -> case Map.lookup conkey conmap of - Nothing -> G.InProgress Resolving - Just (ConnectionRecord ckont cstate cdata) -> case cstate of - SaneConnection {} -> G.Established - ConnectionPair {} -> G.Established - ReadOnlyConnection {} -> G.InProgress AwaitingWrite - WriteOnlyConnection {} -> G.InProgress AwaitingRead - , G.connPolicy = TryingToConnect - } -- cgit v1.2.3