From 934f42f8b547ee59da7066168e34258996c48881 Mon Sep 17 00:00:00 2001 From: joe Date: Sun, 16 Feb 2014 22:32:18 -0500 Subject: Added user-data to connection map. --- Presence/Server.hs | 105 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 46 deletions(-) (limited to 'Presence/Server.hs') diff --git a/Presence/Server.hs b/Presence/Server.hs index baf5a1a8..87644946 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs @@ -82,7 +82,7 @@ type PingInterval = Miliseconds -- connections that are established. It is parameterized -- by a user-suplied type @conkey@ that is used as a lookup -- key for connections. -data ConnectionParameters conkey = +data ConnectionParameters conkey u = ConnectionParameters { pingInterval :: PingInterval -- ^ The miliseconds of idle to allow before a 'RequiresPing' @@ -91,7 +91,7 @@ data ConnectionParameters conkey = -- ^ The miliseconds of idle after 'RequiresPing' is signaled -- that are necessary for the connection to be considered -- lost and signalling 'EOF'. - , makeConnKey :: (Socket,SockAddr) -> IO conkey + , makeConnKey :: (Socket,SockAddr) -> IO (conkey,u) -- ^ This action creates a lookup key for a new connection. If 'duplex' -- is 'True' and the result is already assocatied with an established -- connection, then an 'EOF' will be forced before the the new @@ -117,7 +117,7 @@ data ConnectionParameters conkey = -- * 'duplex' = True -- connectionDefaults - :: ((Socket, SockAddr) -> IO conkey) -> ConnectionParameters conkey + :: ((Socket, SockAddr) -> IO (conkey,u)) -> ConnectionParameters conkey u connectionDefaults f = ConnectionParameters { pingInterval = 28000 , timeout = 2000 @@ -128,16 +128,16 @@ connectionDefaults f = ConnectionParameters -- | Instructions for a 'Server' object -- -- To issue a command, put it into the 'serverCommand' TMVar. -data ServerInstruction conkey +data ServerInstruction conkey u = Quit -- ^ kill the server. This command is automatically issued when -- the server is released. - | Listen PortNumber (ConnectionParameters conkey) + | Listen PortNumber (ConnectionParameters conkey u) -- ^ listen for incomming connections - | Connect SockAddr (ConnectionParameters conkey) + | Connect SockAddr (ConnectionParameters conkey u) -- ^ connect to addresses | ConnectWithEndlessRetry SockAddr - (ConnectionParameters conkey) + (ConnectionParameters conkey u) Miliseconds -- ^ keep retrying the connection | Ignore PortNumber @@ -182,13 +182,20 @@ deriving instance Show b => Show (ConnectionEvent b) deriving instance Eq b => Eq (ConnectionEvent b) #endif +-- | This is the per-connection state. +data ConnectionRecord u + = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler + , cstate :: ConnectionState -- ^ used to send/receive data to the connection + , cdata :: u -- ^ user data, stored in the connection map for convenience + } + -- | This object accepts commands and signals events and maintains -- the list of currently listening ports and established connections. -data Server a - = Server { serverCommand :: TMVar (ServerInstruction a) - , serverEvent :: TChan (a, ConnectionEvent ByteString) +data Server a u + = Server { serverCommand :: TMVar (ServerInstruction a u) + , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) , serverReleaseKey :: ReleaseKey - , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) + , conmap :: TVar (Map a (ConnectionRecord u)) , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) } @@ -217,7 +224,7 @@ control sv = atomically . putTMVar (serverCommand sv) -- > case event of EOF -> return () -- > _ -> loop -- > liftIO loop -server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a) +server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a u) server = do (key,cmds) <- allocate (atomically newEmptyTMVar) (atomically . flip putTMVar Quit) @@ -255,8 +262,8 @@ server = do return rmap mapM_ stopRetry (Map.elems retriers) cons <- atomically . readTVar $ conmap server - atomically $ mapM_ (connClose . snd) (Map.elems cons) - atomically $ mapM_ (connWait . snd) (Map.elems cons) + atomically $ mapM_ (connClose . cstate) (Map.elems cons) + atomically $ mapM_ (connWait . cstate) (Map.elems cons) atomically $ writeTVar (conmap server) Map.empty @@ -296,7 +303,7 @@ server = do let post False = (trace ("cant send: "++show bs) $ return ()) post True = return () maybe (post False) - (post <=< flip connWrite bs . snd) + (post <=< flip connWrite bs . cstate) $ Map.lookup con map doit server (Connect addr params) = liftIO $ do @@ -315,17 +322,17 @@ server = do handle (\e -> do -- let t = ioeGetErrorType e when (isDoesNotExistError e) $ return () -- warn "GOTCHA" -- warn $ "connect-error: " <> bshow e - conkey <- makeConnKey params (sock,addr) -- XXX: ? + (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? sClose sock atomically $ writeTChan (serverEvent server) - $ (conkey,ConnectFailure addr)) + $ ((conkey,u),ConnectFailure addr)) $ do connect sock addr me <- getSocketName sock - conkey <- makeConnKey params (sock,me) + (conkey,u) <- makeConnKey params (sock,me) h <- socketToHandle sock ReadWriteMode - newConnection server params conkey h Out + newConnection server params conkey u h Out return () doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do @@ -355,19 +362,19 @@ server = do -- warn $ "connect-error: " <> bshow e -- Weird hack: puting the would-be peer address -- instead of local socketName - conkey <- makeConnKey params (sock,addr) -- XXX: ? + (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? sClose sock atomically $ do writeTChan (serverEvent server) - $ (conkey,ConnectFailure addr) + $ ((conkey,u),ConnectFailure addr) retry <- readTVar retryVar putTMVar resultVar retry) $ do connect sock addr me <- getSocketName sock - conkey <- makeConnKey params (sock,me) + (conkey,u) <- makeConnKey params (sock,me) h <- socketToHandle sock ReadWriteMode - threads <- newConnection server params conkey h Out + threads <- newConnection server params conkey u h Out atomically $ do threadsWait threads retry <- readTVar retryVar putTMVar resultVar retry @@ -425,14 +432,14 @@ conevent con = Connection pingflag read write read = connRead con write = connWrite con -newConnection server params conkey h inout = do +newConnection server params conkey u h inout = do hSetBuffering h NoBuffering let (forward,idle_ms,timeout_ms) = case (inout,duplex params) of (Out,False) -> ( const $ return () , 0 , 0 ) - _ -> ( announce . (conkey,) . Got + _ -> ( announce . ((conkey,u),) . Got , pingInterval params , timeout params ) @@ -452,20 +459,23 @@ newConnection server params conkey h inout = do (newCon,e) <- return $ if duplex params then let newcon = SaneConnection new - in ( newcon, (conkey, conevent newcon) ) + in ( newcon, ((conkey,u), conevent newcon) ) else ( case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new - , (conkey, HalfConnection inout) ) - modifyTVar' (conmap server) $ Map.insert conkey (kontvar,newCon) + , ((conkey,u), HalfConnection inout) ) + modifyTVar' (conmap server) $ Map.insert conkey + ConnectionRecord { ckont = kontvar + , cstate = newCon + , cdata = u } announce e putTMVar kontvar $ return $ do atomically $ putTMVar started () - handleEOF conkey kontvar newCon - Just what@(mvar,_) -> do + handleEOF conkey u kontvar newCon + Just what@ConnectionRecord { ckont =mvar }-> do putTMVar kontvar $ return $ return () putTMVar mvar $ do - kont <- updateConMap conkey new what + kont <- updateConMap conkey u new what putTMVar started () return kont #ifdef TEST @@ -489,7 +499,7 @@ newConnection server params conkey h inout = do announce e = writeTChan (serverEvent server) e - handleEOF conkey mvar newCon = do + handleEOF conkey u mvar newCon = do action <- atomically . foldr1 orElse $ [ takeTMVar mvar >>= id -- passed continuation , connWait newCon >> return eof @@ -502,7 +512,7 @@ newConnection server params conkey h inout = do -- warn $ "EOF " <>bshow conkey connCancelPing newCon atomically $ do connFlush newCon - announce (conkey,EOF) + announce ((conkey,u),EOF) modifyTVar' (conmap server) $ Map.delete conkey -- warn $ "fin-EOF "<>bshow conkey @@ -524,43 +534,46 @@ newConnection server params conkey h inout = do let utc' = formatTime defaultTimeLocale "%s" utc warn $ "IDLE" <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) -} - atomically $ announce (conkey,RequiresPing) - handleEOF conkey mvar newCon + atomically $ announce ((conkey,u),RequiresPing) + handleEOF conkey u mvar newCon - updateConMap conkey new (mvar,replaced) = do + updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do new' <- if duplex params then do - announce (conkey,EOF) + announce ((conkey,u),EOF) connClose replaced let newcon = SaneConnection new - announce $ (conkey,conevent newcon) + announce $ ((conkey,u),conevent newcon) return $ newcon else case replaced of WriteOnlyConnection w | inout==In -> do let newcon = ConnectionPair new w - announce (conkey,conevent newcon) + announce ((conkey,u),conevent newcon) return newcon ReadOnlyConnection r | inout==Out -> do let newcon = ConnectionPair r new - announce (conkey,conevent newcon) + announce ((conkey,u),conevent newcon) return newcon _ -> do -- connFlush todo - announce (conkey, EOF) + announce ((conkey,u0), EOF) connClose replaced - announce (conkey, HalfConnection inout) + announce ((conkey,u), HalfConnection inout) return $ case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new - modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') - return $ handleEOF conkey mvar new' + modifyTVar' (conmap server) $ Map.insert conkey + ConnectionRecord { ckont = mvar + , cstate = new' + , cdata = u } + return $ handleEOF conkey u mvar new' acceptLoop server params sock = handle (acceptException server params sock) $ do con <- accept sock - conkey <- makeConnKey params con + (conkey,u) <- makeConnKey params con h <- socketToHandle (fst con) ReadWriteMode - newConnection server params conkey h In + newConnection server params conkey u h In acceptLoop server params sock acceptException server params sock ioerror = do -- cgit v1.2.3