summaryrefslogtreecommitdiff
path: root/Presence
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-16 22:32:18 -0500
committerjoe <joe@jerkface.net>2014-02-16 22:32:18 -0500
commit934f42f8b547ee59da7066168e34258996c48881 (patch)
tree341c3a78f00b3eb2c1df9af9e7141b422489ec79 /Presence
parent1ca1927a27ab4200c9a71f2e6ab4fd6be860e92f (diff)
Added user-data to connection map.
Diffstat (limited to 'Presence')
-rw-r--r--Presence/Server.hs105
1 files changed, 59 insertions, 46 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index baf5a1a8..87644946 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -82,7 +82,7 @@ type PingInterval = Miliseconds
82-- connections that are established. It is parameterized 82-- connections that are established. It is parameterized
83-- by a user-suplied type @conkey@ that is used as a lookup 83-- by a user-suplied type @conkey@ that is used as a lookup
84-- key for connections. 84-- key for connections.
85data ConnectionParameters conkey = 85data ConnectionParameters conkey u =
86 ConnectionParameters 86 ConnectionParameters
87 { pingInterval :: PingInterval 87 { pingInterval :: PingInterval
88 -- ^ The miliseconds of idle to allow before a 'RequiresPing' 88 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
@@ -91,7 +91,7 @@ data ConnectionParameters conkey =
91 -- ^ The miliseconds of idle after 'RequiresPing' is signaled 91 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
92 -- that are necessary for the connection to be considered 92 -- that are necessary for the connection to be considered
93 -- lost and signalling 'EOF'. 93 -- lost and signalling 'EOF'.
94 , makeConnKey :: (Socket,SockAddr) -> IO conkey 94 , makeConnKey :: (Socket,SockAddr) -> IO (conkey,u)
95 -- ^ This action creates a lookup key for a new connection. If 'duplex' 95 -- ^ This action creates a lookup key for a new connection. If 'duplex'
96 -- is 'True' and the result is already assocatied with an established 96 -- is 'True' and the result is already assocatied with an established
97 -- connection, then an 'EOF' will be forced before the the new 97 -- connection, then an 'EOF' will be forced before the the new
@@ -117,7 +117,7 @@ data ConnectionParameters conkey =
117-- * 'duplex' = True 117-- * 'duplex' = True
118-- 118--
119connectionDefaults 119connectionDefaults
120 :: ((Socket, SockAddr) -> IO conkey) -> ConnectionParameters conkey 120 :: ((Socket, SockAddr) -> IO (conkey,u)) -> ConnectionParameters conkey u
121connectionDefaults f = ConnectionParameters 121connectionDefaults f = ConnectionParameters
122 { pingInterval = 28000 122 { pingInterval = 28000
123 , timeout = 2000 123 , timeout = 2000
@@ -128,16 +128,16 @@ connectionDefaults f = ConnectionParameters
128-- | Instructions for a 'Server' object 128-- | Instructions for a 'Server' object
129-- 129--
130-- To issue a command, put it into the 'serverCommand' TMVar. 130-- To issue a command, put it into the 'serverCommand' TMVar.
131data ServerInstruction conkey 131data ServerInstruction conkey u
132 = Quit 132 = Quit
133 -- ^ kill the server. This command is automatically issued when 133 -- ^ kill the server. This command is automatically issued when
134 -- the server is released. 134 -- the server is released.
135 | Listen PortNumber (ConnectionParameters conkey) 135 | Listen PortNumber (ConnectionParameters conkey u)
136 -- ^ listen for incomming connections 136 -- ^ listen for incomming connections
137 | Connect SockAddr (ConnectionParameters conkey) 137 | Connect SockAddr (ConnectionParameters conkey u)
138 -- ^ connect to addresses 138 -- ^ connect to addresses
139 | ConnectWithEndlessRetry SockAddr 139 | ConnectWithEndlessRetry SockAddr
140 (ConnectionParameters conkey) 140 (ConnectionParameters conkey u)
141 Miliseconds 141 Miliseconds
142 -- ^ keep retrying the connection 142 -- ^ keep retrying the connection
143 | Ignore PortNumber 143 | Ignore PortNumber
@@ -182,13 +182,20 @@ deriving instance Show b => Show (ConnectionEvent b)
182deriving instance Eq b => Eq (ConnectionEvent b) 182deriving instance Eq b => Eq (ConnectionEvent b)
183#endif 183#endif
184 184
185-- | This is the per-connection state.
186data ConnectionRecord u
187 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler
188 , cstate :: ConnectionState -- ^ used to send/receive data to the connection
189 , cdata :: u -- ^ user data, stored in the connection map for convenience
190 }
191
185-- | This object accepts commands and signals events and maintains 192-- | This object accepts commands and signals events and maintains
186-- the list of currently listening ports and established connections. 193-- the list of currently listening ports and established connections.
187data Server a 194data Server a u
188 = Server { serverCommand :: TMVar (ServerInstruction a) 195 = Server { serverCommand :: TMVar (ServerInstruction a u)
189 , serverEvent :: TChan (a, ConnectionEvent ByteString) 196 , serverEvent :: TChan ((a,u), ConnectionEvent ByteString)
190 , serverReleaseKey :: ReleaseKey 197 , serverReleaseKey :: ReleaseKey
191 , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) 198 , conmap :: TVar (Map a (ConnectionRecord u))
192 , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) 199 , listenmap :: TVar (Map PortNumber (ThreadId,Socket))
193 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) 200 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay))
194 } 201 }
@@ -217,7 +224,7 @@ control sv = atomically . putTMVar (serverCommand sv)
217-- > case event of EOF -> return () 224-- > case event of EOF -> return ()
218-- > _ -> loop 225-- > _ -> loop
219-- > liftIO loop 226-- > liftIO loop
220server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a) 227server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a u)
221server = do 228server = do
222 (key,cmds) <- allocate (atomically newEmptyTMVar) 229 (key,cmds) <- allocate (atomically newEmptyTMVar)
223 (atomically . flip putTMVar Quit) 230 (atomically . flip putTMVar Quit)
@@ -255,8 +262,8 @@ server = do
255 return rmap 262 return rmap
256 mapM_ stopRetry (Map.elems retriers) 263 mapM_ stopRetry (Map.elems retriers)
257 cons <- atomically . readTVar $ conmap server 264 cons <- atomically . readTVar $ conmap server
258 atomically $ mapM_ (connClose . snd) (Map.elems cons) 265 atomically $ mapM_ (connClose . cstate) (Map.elems cons)
259 atomically $ mapM_ (connWait . snd) (Map.elems cons) 266 atomically $ mapM_ (connWait . cstate) (Map.elems cons)
260 atomically $ writeTVar (conmap server) Map.empty 267 atomically $ writeTVar (conmap server) Map.empty
261 268
262 269
@@ -296,7 +303,7 @@ server = do
296 let post False = (trace ("cant send: "++show bs) $ return ()) 303 let post False = (trace ("cant send: "++show bs) $ return ())
297 post True = return () 304 post True = return ()
298 maybe (post False) 305 maybe (post False)
299 (post <=< flip connWrite bs . snd) 306 (post <=< flip connWrite bs . cstate)
300 $ Map.lookup con map 307 $ Map.lookup con map
301 308
302 doit server (Connect addr params) = liftIO $ do 309 doit server (Connect addr params) = liftIO $ do
@@ -315,17 +322,17 @@ server = do
315 handle (\e -> do -- let t = ioeGetErrorType e 322 handle (\e -> do -- let t = ioeGetErrorType e
316 when (isDoesNotExistError e) $ return () -- warn "GOTCHA" 323 when (isDoesNotExistError e) $ return () -- warn "GOTCHA"
317 -- warn $ "connect-error: " <> bshow e 324 -- warn $ "connect-error: " <> bshow e
318 conkey <- makeConnKey params (sock,addr) -- XXX: ? 325 (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ?
319 sClose sock 326 sClose sock
320 atomically 327 atomically
321 $ writeTChan (serverEvent server) 328 $ writeTChan (serverEvent server)
322 $ (conkey,ConnectFailure addr)) 329 $ ((conkey,u),ConnectFailure addr))
323 $ do 330 $ do
324 connect sock addr 331 connect sock addr
325 me <- getSocketName sock 332 me <- getSocketName sock
326 conkey <- makeConnKey params (sock,me) 333 (conkey,u) <- makeConnKey params (sock,me)
327 h <- socketToHandle sock ReadWriteMode 334 h <- socketToHandle sock ReadWriteMode
328 newConnection server params conkey h Out 335 newConnection server params conkey u h Out
329 return () 336 return ()
330 337
331 doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do 338 doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do
@@ -355,19 +362,19 @@ server = do
355 -- warn $ "connect-error: " <> bshow e 362 -- warn $ "connect-error: " <> bshow e
356 -- Weird hack: puting the would-be peer address 363 -- Weird hack: puting the would-be peer address
357 -- instead of local socketName 364 -- instead of local socketName
358 conkey <- makeConnKey params (sock,addr) -- XXX: ? 365 (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ?
359 sClose sock 366 sClose sock
360 atomically $ do 367 atomically $ do
361 writeTChan (serverEvent server) 368 writeTChan (serverEvent server)
362 $ (conkey,ConnectFailure addr) 369 $ ((conkey,u),ConnectFailure addr)
363 retry <- readTVar retryVar 370 retry <- readTVar retryVar
364 putTMVar resultVar retry) 371 putTMVar resultVar retry)
365 $ do 372 $ do
366 connect sock addr 373 connect sock addr
367 me <- getSocketName sock 374 me <- getSocketName sock
368 conkey <- makeConnKey params (sock,me) 375 (conkey,u) <- makeConnKey params (sock,me)
369 h <- socketToHandle sock ReadWriteMode 376 h <- socketToHandle sock ReadWriteMode
370 threads <- newConnection server params conkey h Out 377 threads <- newConnection server params conkey u h Out
371 atomically $ do threadsWait threads 378 atomically $ do threadsWait threads
372 retry <- readTVar retryVar 379 retry <- readTVar retryVar
373 putTMVar resultVar retry 380 putTMVar resultVar retry
@@ -425,14 +432,14 @@ conevent con = Connection pingflag read write
425 read = connRead con 432 read = connRead con
426 write = connWrite con 433 write = connWrite con
427 434
428newConnection server params conkey h inout = do 435newConnection server params conkey u h inout = do
429 hSetBuffering h NoBuffering 436 hSetBuffering h NoBuffering
430 let (forward,idle_ms,timeout_ms) = 437 let (forward,idle_ms,timeout_ms) =
431 case (inout,duplex params) of 438 case (inout,duplex params) of
432 (Out,False) -> ( const $ return () 439 (Out,False) -> ( const $ return ()
433 , 0 440 , 0
434 , 0 ) 441 , 0 )
435 _ -> ( announce . (conkey,) . Got 442 _ -> ( announce . ((conkey,u),) . Got
436 , pingInterval params 443 , pingInterval params
437 , timeout params ) 444 , timeout params )
438 445
@@ -452,20 +459,23 @@ newConnection server params conkey h inout = do
452 (newCon,e) <- return $ 459 (newCon,e) <- return $
453 if duplex params 460 if duplex params
454 then let newcon = SaneConnection new 461 then let newcon = SaneConnection new
455 in ( newcon, (conkey, conevent newcon) ) 462 in ( newcon, ((conkey,u), conevent newcon) )
456 else ( case inout of 463 else ( case inout of
457 In -> ReadOnlyConnection new 464 In -> ReadOnlyConnection new
458 Out -> WriteOnlyConnection new 465 Out -> WriteOnlyConnection new
459 , (conkey, HalfConnection inout) ) 466 , ((conkey,u), HalfConnection inout) )
460 modifyTVar' (conmap server) $ Map.insert conkey (kontvar,newCon) 467 modifyTVar' (conmap server) $ Map.insert conkey
468 ConnectionRecord { ckont = kontvar
469 , cstate = newCon
470 , cdata = u }
461 announce e 471 announce e
462 putTMVar kontvar $ return $ do 472 putTMVar kontvar $ return $ do
463 atomically $ putTMVar started () 473 atomically $ putTMVar started ()
464 handleEOF conkey kontvar newCon 474 handleEOF conkey u kontvar newCon
465 Just what@(mvar,_) -> do 475 Just what@ConnectionRecord { ckont =mvar }-> do
466 putTMVar kontvar $ return $ return () 476 putTMVar kontvar $ return $ return ()
467 putTMVar mvar $ do 477 putTMVar mvar $ do
468 kont <- updateConMap conkey new what 478 kont <- updateConMap conkey u new what
469 putTMVar started () 479 putTMVar started ()
470 return kont 480 return kont
471#ifdef TEST 481#ifdef TEST
@@ -489,7 +499,7 @@ newConnection server params conkey h inout = do
489 499
490 announce e = writeTChan (serverEvent server) e 500 announce e = writeTChan (serverEvent server) e
491 501
492 handleEOF conkey mvar newCon = do 502 handleEOF conkey u mvar newCon = do
493 action <- atomically . foldr1 orElse $ 503 action <- atomically . foldr1 orElse $
494 [ takeTMVar mvar >>= id -- passed continuation 504 [ takeTMVar mvar >>= id -- passed continuation
495 , connWait newCon >> return eof 505 , connWait newCon >> return eof
@@ -502,7 +512,7 @@ newConnection server params conkey h inout = do
502 -- warn $ "EOF " <>bshow conkey 512 -- warn $ "EOF " <>bshow conkey
503 connCancelPing newCon 513 connCancelPing newCon
504 atomically $ do connFlush newCon 514 atomically $ do connFlush newCon
505 announce (conkey,EOF) 515 announce ((conkey,u),EOF)
506 modifyTVar' (conmap server) 516 modifyTVar' (conmap server)
507 $ Map.delete conkey 517 $ Map.delete conkey
508 -- warn $ "fin-EOF "<>bshow conkey 518 -- warn $ "fin-EOF "<>bshow conkey
@@ -524,43 +534,46 @@ newConnection server params conkey h inout = do
524 let utc' = formatTime defaultTimeLocale "%s" utc 534 let utc' = formatTime defaultTimeLocale "%s" utc
525 warn $ "IDLE" <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) 535 warn $ "IDLE" <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me)
526 -} 536 -}
527 atomically $ announce (conkey,RequiresPing) 537 atomically $ announce ((conkey,u),RequiresPing)
528 handleEOF conkey mvar newCon 538 handleEOF conkey u mvar newCon
529 539
530 540
531 updateConMap conkey new (mvar,replaced) = do 541 updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do
532 new' <- 542 new' <-
533 if duplex params then do 543 if duplex params then do
534 announce (conkey,EOF) 544 announce ((conkey,u),EOF)
535 connClose replaced 545 connClose replaced
536 let newcon = SaneConnection new 546 let newcon = SaneConnection new
537 announce $ (conkey,conevent newcon) 547 announce $ ((conkey,u),conevent newcon)
538 return $ newcon 548 return $ newcon
539 else 549 else
540 case replaced of 550 case replaced of
541 WriteOnlyConnection w | inout==In -> 551 WriteOnlyConnection w | inout==In ->
542 do let newcon = ConnectionPair new w 552 do let newcon = ConnectionPair new w
543 announce (conkey,conevent newcon) 553 announce ((conkey,u),conevent newcon)
544 return newcon 554 return newcon
545 ReadOnlyConnection r | inout==Out -> 555 ReadOnlyConnection r | inout==Out ->
546 do let newcon = ConnectionPair r new 556 do let newcon = ConnectionPair r new
547 announce (conkey,conevent newcon) 557 announce ((conkey,u),conevent newcon)
548 return newcon 558 return newcon
549 _ -> do -- connFlush todo 559 _ -> do -- connFlush todo
550 announce (conkey, EOF) 560 announce ((conkey,u0), EOF)
551 connClose replaced 561 connClose replaced
552 announce (conkey, HalfConnection inout) 562 announce ((conkey,u), HalfConnection inout)
553 return $ case inout of 563 return $ case inout of
554 In -> ReadOnlyConnection new 564 In -> ReadOnlyConnection new
555 Out -> WriteOnlyConnection new 565 Out -> WriteOnlyConnection new
556 modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') 566 modifyTVar' (conmap server) $ Map.insert conkey
557 return $ handleEOF conkey mvar new' 567 ConnectionRecord { ckont = mvar
568 , cstate = new'
569 , cdata = u }
570 return $ handleEOF conkey u mvar new'
558 571
559acceptLoop server params sock = handle (acceptException server params sock) $ do 572acceptLoop server params sock = handle (acceptException server params sock) $ do
560 con <- accept sock 573 con <- accept sock
561 conkey <- makeConnKey params con 574 (conkey,u) <- makeConnKey params con
562 h <- socketToHandle (fst con) ReadWriteMode 575 h <- socketToHandle (fst con) ReadWriteMode
563 newConnection server params conkey h In 576 newConnection server params conkey u h In
564 acceptLoop server params sock 577 acceptLoop server params sock
565 578
566acceptException server params sock ioerror = do 579acceptException server params sock ioerror = do