diff options
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r-- | Presence/Server.hs | 105 |
1 files changed, 59 insertions, 46 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index baf5a1a8..87644946 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -82,7 +82,7 @@ type PingInterval = Miliseconds | |||
82 | -- connections that are established. It is parameterized | 82 | -- connections that are established. It is parameterized |
83 | -- by a user-suplied type @conkey@ that is used as a lookup | 83 | -- by a user-suplied type @conkey@ that is used as a lookup |
84 | -- key for connections. | 84 | -- key for connections. |
85 | data ConnectionParameters conkey = | 85 | data ConnectionParameters conkey u = |
86 | ConnectionParameters | 86 | ConnectionParameters |
87 | { pingInterval :: PingInterval | 87 | { pingInterval :: PingInterval |
88 | -- ^ The miliseconds of idle to allow before a 'RequiresPing' | 88 | -- ^ The miliseconds of idle to allow before a 'RequiresPing' |
@@ -91,7 +91,7 @@ data ConnectionParameters conkey = | |||
91 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled | 91 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled |
92 | -- that are necessary for the connection to be considered | 92 | -- that are necessary for the connection to be considered |
93 | -- lost and signalling 'EOF'. | 93 | -- lost and signalling 'EOF'. |
94 | , makeConnKey :: (Socket,SockAddr) -> IO conkey | 94 | , makeConnKey :: (Socket,SockAddr) -> IO (conkey,u) |
95 | -- ^ This action creates a lookup key for a new connection. If 'duplex' | 95 | -- ^ This action creates a lookup key for a new connection. If 'duplex' |
96 | -- is 'True' and the result is already assocatied with an established | 96 | -- is 'True' and the result is already assocatied with an established |
97 | -- connection, then an 'EOF' will be forced before the the new | 97 | -- connection, then an 'EOF' will be forced before the the new |
@@ -117,7 +117,7 @@ data ConnectionParameters conkey = | |||
117 | -- * 'duplex' = True | 117 | -- * 'duplex' = True |
118 | -- | 118 | -- |
119 | connectionDefaults | 119 | connectionDefaults |
120 | :: ((Socket, SockAddr) -> IO conkey) -> ConnectionParameters conkey | 120 | :: ((Socket, SockAddr) -> IO (conkey,u)) -> ConnectionParameters conkey u |
121 | connectionDefaults f = ConnectionParameters | 121 | connectionDefaults f = ConnectionParameters |
122 | { pingInterval = 28000 | 122 | { pingInterval = 28000 |
123 | , timeout = 2000 | 123 | , timeout = 2000 |
@@ -128,16 +128,16 @@ connectionDefaults f = ConnectionParameters | |||
128 | -- | Instructions for a 'Server' object | 128 | -- | Instructions for a 'Server' object |
129 | -- | 129 | -- |
130 | -- To issue a command, put it into the 'serverCommand' TMVar. | 130 | -- To issue a command, put it into the 'serverCommand' TMVar. |
131 | data ServerInstruction conkey | 131 | data ServerInstruction conkey u |
132 | = Quit | 132 | = Quit |
133 | -- ^ kill the server. This command is automatically issued when | 133 | -- ^ kill the server. This command is automatically issued when |
134 | -- the server is released. | 134 | -- the server is released. |
135 | | Listen PortNumber (ConnectionParameters conkey) | 135 | | Listen PortNumber (ConnectionParameters conkey u) |
136 | -- ^ listen for incomming connections | 136 | -- ^ listen for incomming connections |
137 | | Connect SockAddr (ConnectionParameters conkey) | 137 | | Connect SockAddr (ConnectionParameters conkey u) |
138 | -- ^ connect to addresses | 138 | -- ^ connect to addresses |
139 | | ConnectWithEndlessRetry SockAddr | 139 | | ConnectWithEndlessRetry SockAddr |
140 | (ConnectionParameters conkey) | 140 | (ConnectionParameters conkey u) |
141 | Miliseconds | 141 | Miliseconds |
142 | -- ^ keep retrying the connection | 142 | -- ^ keep retrying the connection |
143 | | Ignore PortNumber | 143 | | Ignore PortNumber |
@@ -182,13 +182,20 @@ deriving instance Show b => Show (ConnectionEvent b) | |||
182 | deriving instance Eq b => Eq (ConnectionEvent b) | 182 | deriving instance Eq b => Eq (ConnectionEvent b) |
183 | #endif | 183 | #endif |
184 | 184 | ||
185 | -- | This is the per-connection state. | ||
186 | data ConnectionRecord u | ||
187 | = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler | ||
188 | , cstate :: ConnectionState -- ^ used to send/receive data to the connection | ||
189 | , cdata :: u -- ^ user data, stored in the connection map for convenience | ||
190 | } | ||
191 | |||
185 | -- | This object accepts commands and signals events and maintains | 192 | -- | This object accepts commands and signals events and maintains |
186 | -- the list of currently listening ports and established connections. | 193 | -- the list of currently listening ports and established connections. |
187 | data Server a | 194 | data Server a u |
188 | = Server { serverCommand :: TMVar (ServerInstruction a) | 195 | = Server { serverCommand :: TMVar (ServerInstruction a u) |
189 | , serverEvent :: TChan (a, ConnectionEvent ByteString) | 196 | , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) |
190 | , serverReleaseKey :: ReleaseKey | 197 | , serverReleaseKey :: ReleaseKey |
191 | , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) | 198 | , conmap :: TVar (Map a (ConnectionRecord u)) |
192 | , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) | 199 | , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) |
193 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) | 200 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) |
194 | } | 201 | } |
@@ -217,7 +224,7 @@ control sv = atomically . putTMVar (serverCommand sv) | |||
217 | -- > case event of EOF -> return () | 224 | -- > case event of EOF -> return () |
218 | -- > _ -> loop | 225 | -- > _ -> loop |
219 | -- > liftIO loop | 226 | -- > liftIO loop |
220 | server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a) | 227 | server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a u) |
221 | server = do | 228 | server = do |
222 | (key,cmds) <- allocate (atomically newEmptyTMVar) | 229 | (key,cmds) <- allocate (atomically newEmptyTMVar) |
223 | (atomically . flip putTMVar Quit) | 230 | (atomically . flip putTMVar Quit) |
@@ -255,8 +262,8 @@ server = do | |||
255 | return rmap | 262 | return rmap |
256 | mapM_ stopRetry (Map.elems retriers) | 263 | mapM_ stopRetry (Map.elems retriers) |
257 | cons <- atomically . readTVar $ conmap server | 264 | cons <- atomically . readTVar $ conmap server |
258 | atomically $ mapM_ (connClose . snd) (Map.elems cons) | 265 | atomically $ mapM_ (connClose . cstate) (Map.elems cons) |
259 | atomically $ mapM_ (connWait . snd) (Map.elems cons) | 266 | atomically $ mapM_ (connWait . cstate) (Map.elems cons) |
260 | atomically $ writeTVar (conmap server) Map.empty | 267 | atomically $ writeTVar (conmap server) Map.empty |
261 | 268 | ||
262 | 269 | ||
@@ -296,7 +303,7 @@ server = do | |||
296 | let post False = (trace ("cant send: "++show bs) $ return ()) | 303 | let post False = (trace ("cant send: "++show bs) $ return ()) |
297 | post True = return () | 304 | post True = return () |
298 | maybe (post False) | 305 | maybe (post False) |
299 | (post <=< flip connWrite bs . snd) | 306 | (post <=< flip connWrite bs . cstate) |
300 | $ Map.lookup con map | 307 | $ Map.lookup con map |
301 | 308 | ||
302 | doit server (Connect addr params) = liftIO $ do | 309 | doit server (Connect addr params) = liftIO $ do |
@@ -315,17 +322,17 @@ server = do | |||
315 | handle (\e -> do -- let t = ioeGetErrorType e | 322 | handle (\e -> do -- let t = ioeGetErrorType e |
316 | when (isDoesNotExistError e) $ return () -- warn "GOTCHA" | 323 | when (isDoesNotExistError e) $ return () -- warn "GOTCHA" |
317 | -- warn $ "connect-error: " <> bshow e | 324 | -- warn $ "connect-error: " <> bshow e |
318 | conkey <- makeConnKey params (sock,addr) -- XXX: ? | 325 | (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? |
319 | sClose sock | 326 | sClose sock |
320 | atomically | 327 | atomically |
321 | $ writeTChan (serverEvent server) | 328 | $ writeTChan (serverEvent server) |
322 | $ (conkey,ConnectFailure addr)) | 329 | $ ((conkey,u),ConnectFailure addr)) |
323 | $ do | 330 | $ do |
324 | connect sock addr | 331 | connect sock addr |
325 | me <- getSocketName sock | 332 | me <- getSocketName sock |
326 | conkey <- makeConnKey params (sock,me) | 333 | (conkey,u) <- makeConnKey params (sock,me) |
327 | h <- socketToHandle sock ReadWriteMode | 334 | h <- socketToHandle sock ReadWriteMode |
328 | newConnection server params conkey h Out | 335 | newConnection server params conkey u h Out |
329 | return () | 336 | return () |
330 | 337 | ||
331 | doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do | 338 | doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do |
@@ -355,19 +362,19 @@ server = do | |||
355 | -- warn $ "connect-error: " <> bshow e | 362 | -- warn $ "connect-error: " <> bshow e |
356 | -- Weird hack: puting the would-be peer address | 363 | -- Weird hack: puting the would-be peer address |
357 | -- instead of local socketName | 364 | -- instead of local socketName |
358 | conkey <- makeConnKey params (sock,addr) -- XXX: ? | 365 | (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? |
359 | sClose sock | 366 | sClose sock |
360 | atomically $ do | 367 | atomically $ do |
361 | writeTChan (serverEvent server) | 368 | writeTChan (serverEvent server) |
362 | $ (conkey,ConnectFailure addr) | 369 | $ ((conkey,u),ConnectFailure addr) |
363 | retry <- readTVar retryVar | 370 | retry <- readTVar retryVar |
364 | putTMVar resultVar retry) | 371 | putTMVar resultVar retry) |
365 | $ do | 372 | $ do |
366 | connect sock addr | 373 | connect sock addr |
367 | me <- getSocketName sock | 374 | me <- getSocketName sock |
368 | conkey <- makeConnKey params (sock,me) | 375 | (conkey,u) <- makeConnKey params (sock,me) |
369 | h <- socketToHandle sock ReadWriteMode | 376 | h <- socketToHandle sock ReadWriteMode |
370 | threads <- newConnection server params conkey h Out | 377 | threads <- newConnection server params conkey u h Out |
371 | atomically $ do threadsWait threads | 378 | atomically $ do threadsWait threads |
372 | retry <- readTVar retryVar | 379 | retry <- readTVar retryVar |
373 | putTMVar resultVar retry | 380 | putTMVar resultVar retry |
@@ -425,14 +432,14 @@ conevent con = Connection pingflag read write | |||
425 | read = connRead con | 432 | read = connRead con |
426 | write = connWrite con | 433 | write = connWrite con |
427 | 434 | ||
428 | newConnection server params conkey h inout = do | 435 | newConnection server params conkey u h inout = do |
429 | hSetBuffering h NoBuffering | 436 | hSetBuffering h NoBuffering |
430 | let (forward,idle_ms,timeout_ms) = | 437 | let (forward,idle_ms,timeout_ms) = |
431 | case (inout,duplex params) of | 438 | case (inout,duplex params) of |
432 | (Out,False) -> ( const $ return () | 439 | (Out,False) -> ( const $ return () |
433 | , 0 | 440 | , 0 |
434 | , 0 ) | 441 | , 0 ) |
435 | _ -> ( announce . (conkey,) . Got | 442 | _ -> ( announce . ((conkey,u),) . Got |
436 | , pingInterval params | 443 | , pingInterval params |
437 | , timeout params ) | 444 | , timeout params ) |
438 | 445 | ||
@@ -452,20 +459,23 @@ newConnection server params conkey h inout = do | |||
452 | (newCon,e) <- return $ | 459 | (newCon,e) <- return $ |
453 | if duplex params | 460 | if duplex params |
454 | then let newcon = SaneConnection new | 461 | then let newcon = SaneConnection new |
455 | in ( newcon, (conkey, conevent newcon) ) | 462 | in ( newcon, ((conkey,u), conevent newcon) ) |
456 | else ( case inout of | 463 | else ( case inout of |
457 | In -> ReadOnlyConnection new | 464 | In -> ReadOnlyConnection new |
458 | Out -> WriteOnlyConnection new | 465 | Out -> WriteOnlyConnection new |
459 | , (conkey, HalfConnection inout) ) | 466 | , ((conkey,u), HalfConnection inout) ) |
460 | modifyTVar' (conmap server) $ Map.insert conkey (kontvar,newCon) | 467 | modifyTVar' (conmap server) $ Map.insert conkey |
468 | ConnectionRecord { ckont = kontvar | ||
469 | , cstate = newCon | ||
470 | , cdata = u } | ||
461 | announce e | 471 | announce e |
462 | putTMVar kontvar $ return $ do | 472 | putTMVar kontvar $ return $ do |
463 | atomically $ putTMVar started () | 473 | atomically $ putTMVar started () |
464 | handleEOF conkey kontvar newCon | 474 | handleEOF conkey u kontvar newCon |
465 | Just what@(mvar,_) -> do | 475 | Just what@ConnectionRecord { ckont =mvar }-> do |
466 | putTMVar kontvar $ return $ return () | 476 | putTMVar kontvar $ return $ return () |
467 | putTMVar mvar $ do | 477 | putTMVar mvar $ do |
468 | kont <- updateConMap conkey new what | 478 | kont <- updateConMap conkey u new what |
469 | putTMVar started () | 479 | putTMVar started () |
470 | return kont | 480 | return kont |
471 | #ifdef TEST | 481 | #ifdef TEST |
@@ -489,7 +499,7 @@ newConnection server params conkey h inout = do | |||
489 | 499 | ||
490 | announce e = writeTChan (serverEvent server) e | 500 | announce e = writeTChan (serverEvent server) e |
491 | 501 | ||
492 | handleEOF conkey mvar newCon = do | 502 | handleEOF conkey u mvar newCon = do |
493 | action <- atomically . foldr1 orElse $ | 503 | action <- atomically . foldr1 orElse $ |
494 | [ takeTMVar mvar >>= id -- passed continuation | 504 | [ takeTMVar mvar >>= id -- passed continuation |
495 | , connWait newCon >> return eof | 505 | , connWait newCon >> return eof |
@@ -502,7 +512,7 @@ newConnection server params conkey h inout = do | |||
502 | -- warn $ "EOF " <>bshow conkey | 512 | -- warn $ "EOF " <>bshow conkey |
503 | connCancelPing newCon | 513 | connCancelPing newCon |
504 | atomically $ do connFlush newCon | 514 | atomically $ do connFlush newCon |
505 | announce (conkey,EOF) | 515 | announce ((conkey,u),EOF) |
506 | modifyTVar' (conmap server) | 516 | modifyTVar' (conmap server) |
507 | $ Map.delete conkey | 517 | $ Map.delete conkey |
508 | -- warn $ "fin-EOF "<>bshow conkey | 518 | -- warn $ "fin-EOF "<>bshow conkey |
@@ -524,43 +534,46 @@ newConnection server params conkey h inout = do | |||
524 | let utc' = formatTime defaultTimeLocale "%s" utc | 534 | let utc' = formatTime defaultTimeLocale "%s" utc |
525 | warn $ "IDLE" <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) | 535 | warn $ "IDLE" <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) |
526 | -} | 536 | -} |
527 | atomically $ announce (conkey,RequiresPing) | 537 | atomically $ announce ((conkey,u),RequiresPing) |
528 | handleEOF conkey mvar newCon | 538 | handleEOF conkey u mvar newCon |
529 | 539 | ||
530 | 540 | ||
531 | updateConMap conkey new (mvar,replaced) = do | 541 | updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do |
532 | new' <- | 542 | new' <- |
533 | if duplex params then do | 543 | if duplex params then do |
534 | announce (conkey,EOF) | 544 | announce ((conkey,u),EOF) |
535 | connClose replaced | 545 | connClose replaced |
536 | let newcon = SaneConnection new | 546 | let newcon = SaneConnection new |
537 | announce $ (conkey,conevent newcon) | 547 | announce $ ((conkey,u),conevent newcon) |
538 | return $ newcon | 548 | return $ newcon |
539 | else | 549 | else |
540 | case replaced of | 550 | case replaced of |
541 | WriteOnlyConnection w | inout==In -> | 551 | WriteOnlyConnection w | inout==In -> |
542 | do let newcon = ConnectionPair new w | 552 | do let newcon = ConnectionPair new w |
543 | announce (conkey,conevent newcon) | 553 | announce ((conkey,u),conevent newcon) |
544 | return newcon | 554 | return newcon |
545 | ReadOnlyConnection r | inout==Out -> | 555 | ReadOnlyConnection r | inout==Out -> |
546 | do let newcon = ConnectionPair r new | 556 | do let newcon = ConnectionPair r new |
547 | announce (conkey,conevent newcon) | 557 | announce ((conkey,u),conevent newcon) |
548 | return newcon | 558 | return newcon |
549 | _ -> do -- connFlush todo | 559 | _ -> do -- connFlush todo |
550 | announce (conkey, EOF) | 560 | announce ((conkey,u0), EOF) |
551 | connClose replaced | 561 | connClose replaced |
552 | announce (conkey, HalfConnection inout) | 562 | announce ((conkey,u), HalfConnection inout) |
553 | return $ case inout of | 563 | return $ case inout of |
554 | In -> ReadOnlyConnection new | 564 | In -> ReadOnlyConnection new |
555 | Out -> WriteOnlyConnection new | 565 | Out -> WriteOnlyConnection new |
556 | modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') | 566 | modifyTVar' (conmap server) $ Map.insert conkey |
557 | return $ handleEOF conkey mvar new' | 567 | ConnectionRecord { ckont = mvar |
568 | , cstate = new' | ||
569 | , cdata = u } | ||
570 | return $ handleEOF conkey u mvar new' | ||
558 | 571 | ||
559 | acceptLoop server params sock = handle (acceptException server params sock) $ do | 572 | acceptLoop server params sock = handle (acceptException server params sock) $ do |
560 | con <- accept sock | 573 | con <- accept sock |
561 | conkey <- makeConnKey params con | 574 | (conkey,u) <- makeConnKey params con |
562 | h <- socketToHandle (fst con) ReadWriteMode | 575 | h <- socketToHandle (fst con) ReadWriteMode |
563 | newConnection server params conkey h In | 576 | newConnection server params conkey u h In |
564 | acceptLoop server params sock | 577 | acceptLoop server params sock |
565 | 578 | ||
566 | acceptException server params sock ioerror = do | 579 | acceptException server params sock ioerror = do |