summaryrefslogtreecommitdiff
path: root/Presence/Server.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-10 00:08:23 -0500
committerjoe <joe@jerkface.net>2014-02-10 00:08:23 -0500
commit0e3a39ff7d4c6a2ba5190ddfcc38b4bb8d22c367 (patch)
tree44cbcd17543b310742c250acfb14ec866537a6c5 /Presence/Server.hs
parente710ed2434496f7891363def81c1c8756eadd129 (diff)
moved ping logic into ConnectionState
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r--Presence/Server.hs62
1 files changed, 42 insertions, 20 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index c7162265..530b6669 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -311,7 +311,17 @@ killListener (thread,sock) = do sClose sock
311 311
312newConnection server params conkey h inout = do 312newConnection server params conkey h inout = do
313 hSetBuffering h NoBuffering 313 hSetBuffering h NoBuffering
314 new <- connectionThreads h 314 let (forward,idle_ms,timeout_ms) =
315 case (inout,duplex params) of
316 (Out,True) -> ( const $ return ()
317 , 0
318 , 0 )
319 _ -> ( announce . (conkey,) . Got
320 , pingInterval params
321 , timeout params )
322
323 new <- do pinglogic <- pingMachine idle_ms timeout_ms
324 connectionThreads h pinglogic
315 started <- atomically $ newEmptyTMVar 325 started <- atomically $ newEmptyTMVar
316 kontvar <- atomically newEmptyTMVar 326 kontvar <- atomically newEmptyTMVar
317 forkIO $ do 327 forkIO $ do
@@ -319,7 +329,6 @@ newConnection server params conkey h inout = do
319 kont <- atomically getkont 329 kont <- atomically getkont
320 kont 330 kont
321 331
322 pinglogic <- pingMachine (pingInterval params) (timeout params)
323 atomically $ do 332 atomically $ do
324 current <- fmap (Map.lookup conkey) $ readTVar (conmap server) 333 current <- fmap (Map.lookup conkey) $ readTVar (conmap server)
325 case current of 334 case current of
@@ -335,25 +344,21 @@ newConnection server params conkey h inout = do
335 announce e 344 announce e
336 putTMVar kontvar $ return $ do 345 putTMVar kontvar $ return $ do
337 atomically $ putTMVar started () 346 atomically $ putTMVar started ()
338 handleEOF conkey kontvar newCon pinglogic 347 handleEOF conkey kontvar newCon
339 Just what@(mvar,_) -> do 348 Just what@(mvar,_) -> do
340 putTMVar kontvar $ return $ return () 349 putTMVar kontvar $ return $ return ()
341 putTMVar mvar $ do 350 putTMVar mvar $ do
342 kont <- updateConMap conkey new what pinglogic 351 kont <- updateConMap conkey new what
343 putTMVar started () 352 putTMVar started ()
344 return kont 353 return kont
345 forkIO $ do -- inout==In || duplex params then forkIO $ do 354 forkIO $ do -- inout==In || duplex params then forkIO $ do
346 -- warn $ "waiting read thread: " <> bshow (conkey,inout) 355 -- warn $ "waiting read thread: " <> bshow (conkey,inout)
347 let forward =
348 case (inout,duplex params) of
349 (Out,True) -> const $ return ()
350 _ -> announce . (conkey,) . Got
351 atomically $ takeTMVar started 356 atomically $ takeTMVar started
352 pingBump pinglogic -- start the ping timer 357 -- pingBump pinglogic -- start the ping timer
353 fix $ \loop -> do 358 fix $ \loop -> do
354 -- warn $ "read thread: " <> bshow (conkey,inout) 359 -- warn $ "read thread: " <> bshow (conkey,inout)
355 mb <- threadsRead new 360 mb <- threadsRead new
356 pingBump pinglogic 361 -- pingBump pinglogic
357 -- warn $ "got: " <> bshow (mb,(conkey,inout)) 362 -- warn $ "got: " <> bshow (mb,(conkey,inout))
358 maybe (return ()) 363 maybe (return ())
359 (atomically . forward >=> const loop) 364 (atomically . forward >=> const loop)
@@ -363,17 +368,18 @@ newConnection server params conkey h inout = do
363 368
364 announce e = writeTChan (serverEvent server) e 369 announce e = writeTChan (serverEvent server) e
365 370
366 handleEOF conkey mvar newCon pingTimer = do 371 handleEOF conkey mvar newCon = do
367 action <- atomically . foldr1 orElse $ 372 action <- atomically . foldr1 orElse $
368 [ takeTMVar mvar >>= id -- passed continuation 373 [ takeTMVar mvar >>= id -- passed continuation
369 , connWait newCon >> return eof 374 , connWait newCon >> return eof
370 , pingWait pingTimer >>= return . sendPing 375 , connWaitPing newCon >>= return . sendPing
376 -- , pingWait pingTimer >>= return . sendPing
371 ] 377 ]
372 action :: IO () 378 action :: IO ()
373 where 379 where
374 eof = do 380 eof = do
375 -- warn $ "EOF " <>bshow conkey 381 -- warn $ "EOF " <>bshow conkey
376 pingCancel pingTimer -- force cleanup of ping thread 382 connCancelPing newCon
377 atomically $ do connFlush newCon 383 atomically $ do connFlush newCon
378 announce (conkey,EOF) 384 announce (conkey,EOF)
379 modifyTVar' (conmap server) 385 modifyTVar' (conmap server)
@@ -384,10 +390,10 @@ newConnection server params conkey h inout = do
384 eof 390 eof
385 sendPing PingIdle = do 391 sendPing PingIdle = do
386 atomically . announce $ (conkey,RequiresPing) 392 atomically . announce $ (conkey,RequiresPing)
387 handleEOF conkey mvar newCon pingTimer 393 handleEOF conkey mvar newCon
388 394
389 395
390 updateConMap conkey new (mvar,replaced) pingTimer = do 396 updateConMap conkey new (mvar,replaced) = do
391 new' <- 397 new' <-
392 if duplex params then do 398 if duplex params then do
393 announce (conkey,EOF) 399 announce (conkey,EOF)
@@ -410,7 +416,7 @@ newConnection server params conkey h inout = do
410 In -> ReadOnlyConnection new 416 In -> ReadOnlyConnection new
411 Out -> WriteOnlyConnection new 417 Out -> WriteOnlyConnection new
412 modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') 418 modifyTVar' (conmap server) $ Map.insert conkey (mvar,new')
413 return $ handleEOF conkey mvar new' pingTimer 419 return $ handleEOF conkey mvar new'
414 420
415acceptLoop server params sock = handle (acceptException server params sock) $ do 421acceptLoop server params sock = handle (acceptException server params sock) $ do
416 con <- accept sock 422 con <- accept sock
@@ -445,12 +451,13 @@ data ConnectionThreads = ConnectionThreads
445 { threadsWriter :: TMVar (Maybe ByteString) 451 { threadsWriter :: TMVar (Maybe ByteString)
446 , threadsChannel :: TChan ByteString 452 , threadsChannel :: TChan ByteString
447 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close 453 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close
454 , threadsPing :: PingMachine
448 } 455 }
449 456
450-- | This spawns the reader and writer threads and returns a newly 457-- | This spawns the reader and writer threads and returns a newly
451-- constructed 'ConnectionThreads' object. 458-- constructed 'ConnectionThreads' object.
452connectionThreads :: Handle -> IO ConnectionThreads 459connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
453connectionThreads h = do 460connectionThreads h pinglogic = do
454 461
455 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar 462 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
456 writerThread <- forkIO . fix $ \loop -> do 463 writerThread <- forkIO . fix $ \loop -> do
@@ -471,9 +478,12 @@ connectionThreads h = do
471 let _ = fmap ioeGetErrorType e -- type hint 478 let _ = fmap ioeGetErrorType e -- type hint
472 atomically $ do putTMVar outs Nothing -- quit writer 479 atomically $ do putTMVar outs Nothing -- quit writer
473 putTMVar doner () 480 putTMVar doner ()
474 handle (finished . Just) . fix $ \loop -> do 481 handle (finished . Just) $ do
482 pingBump pinglogic -- start the ping timer
483 fix $ \loop -> do
475 packet <- getPacket h 484 packet <- getPacket h
476 atomically $ writeTChan incomming packet 485 atomically $ writeTChan incomming packet
486 pingBump pinglogic
477 isEof <- liftIO $ hIsEOF h 487 isEof <- liftIO $ hIsEOF h
478 if isEof then finished Nothing else loop 488 if isEof then finished Nothing else loop
479 489
@@ -482,7 +492,8 @@ connectionThreads h = do
482 return () 492 return ()
483 return ConnectionThreads { threadsWriter = outs 493 return ConnectionThreads { threadsWriter = outs
484 , threadsChannel = incomming 494 , threadsChannel = incomming
485 , threadsWait = wait } 495 , threadsWait = wait
496 , threadsPing = pinglogic }
486 497
487 498
488-- | 'threadsWrite' writes the given 'ByteString' to the 499-- | 'threadsWrite' writes the given 'ByteString' to the
@@ -555,6 +566,17 @@ connClose c = mapConn True threadsClose c
555connWait :: ConnectionState -> STM () 566connWait :: ConnectionState -> STM ()
556connWait c = mapConn False threadsWait c 567connWait c = mapConn False threadsWait c
557 568
569connPingTimer c =
570 case c of
571 SaneConnection rw -> threadsPing rw
572 ReadOnlyConnection r -> threadsPing r
573 WriteOnlyConnection w -> threadsPing w -- should be disabled.
574 ConnectionPair r w -> threadsPing r
575
576connCancelPing c = pingCancel (connPingTimer c)
577connWaitPing c = pingWait (connPingTimer c)
578
579
558connFlush c = 580connFlush c =
559 case c of 581 case c of
560 SaneConnection rw -> waitChan rw 582 SaneConnection rw -> waitChan rw