summaryrefslogtreecommitdiff
path: root/Presence/Server.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r--Presence/Server.hs826
1 files changed, 826 insertions, 0 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
new file mode 100644
index 00000000..c38aec2a
--- /dev/null
+++ b/Presence/Server.hs
@@ -0,0 +1,826 @@
1{-# OPTIONS_HADDOCK prune #-}
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE DoAndIfThenElse #-}
4{-# LANGUAGE FlexibleInstances #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7{-# LANGUAGE StandaloneDeriving #-}
8{-# LANGUAGE TupleSections #-}
9{-# LANGUAGE LambdaCase #-}
10-----------------------------------------------------------------------------
11-- |
12-- Module : Server
13--
14-- Maintainer : joe@jerkface.net
15-- Stability : experimental
16--
17-- A TCP client/server library.
18--
19-- TODO:
20--
21-- * interface tweaks
22--
23module Server
24 ( module Server
25 , module PingMachine ) where
26
27import Data.ByteString (ByteString,hGetNonBlocking)
28import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack)
29import Data.Conduit ( Source, Sink, Flush )
30#if MIN_VERSION_containers(0,5,0)
31import qualified Data.Map.Strict as Map
32import Data.Map.Strict (Map)
33#else
34import qualified Data.Map as Map
35import Data.Map (Map)
36#endif
37import Data.Monoid ( (<>) )
38#ifdef THREAD_DEBUG
39import Control.Concurrent.Lifted.Instrument
40#else
41import Control.Concurrent.Lifted
42import GHC.Conc (labelThread)
43#endif
44
45import Control.Concurrent.STM
46-- import Control.Concurrent.STM.TMVar
47-- import Control.Concurrent.STM.TChan
48-- import Control.Concurrent.STM.Delay
49import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException)
50import Control.Monad
51import Control.Monad.Fix
52-- import Control.Monad.STM
53-- import Control.Monad.Trans.Resource
54import Control.Monad.IO.Class (MonadIO (liftIO))
55import System.IO.Error (isDoesNotExistError)
56import System.IO
57 ( IOMode(..)
58 , hSetBuffering
59 , BufferMode(..)
60 , hWaitForInput
61 , hClose
62 , hIsEOF
63 , stderr
64 , Handle
65 , hFlush
66 , hPutStrLn
67 )
68import Network.Socket as Socket
69import Network.BSD
70 ( getProtocolNumber
71 )
72import Debug.Trace
73import Data.Time.Clock (getCurrentTime,diffUTCTime)
74-- import SockAddr ()
75-- import System.Locale (defaultTimeLocale)
76
77import InterruptibleDelay
78import PingMachine
79import Network.StreamServer
80import Network.SocketLike hiding (sClose)
81import qualified Connection as G
82 ;import Connection (Manager (..), Policy(..))
83
84
85type Microseconds = Int
86
87-- | This object is passed with the 'Listen' and 'Connect'
88-- instructions in order to control the behavior of the
89-- connections that are established. It is parameterized
90-- by a user-suplied type @conkey@ that is used as a lookup
91-- key for connections.
92data ConnectionParameters conkey u =
93 ConnectionParameters
94 { pingInterval :: PingInterval
95 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
96 -- event is signaled.
97 , timeout :: TimeOut
98 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
99 -- that are necessary for the connection to be considered
100 -- lost and signalling 'EOF'.
101 , makeConnKey :: RestrictedSocket -> IO (conkey,u)
102 -- ^ This action creates a lookup key for a new connection. If 'duplex'
103 -- is 'True' and the result is already assocatied with an established
104 -- connection, then an 'EOF' will be forced before the the new
105 -- connection becomes active.
106 --
107 , duplex :: Bool
108 -- ^ If True, then the connection will be treated as a normal
109 -- two-way socket. Otherwise, a readable socket is established
110 -- with 'Listen' and a writable socket is established with
111 -- 'Connect' and they are associated when 'makeConnKey' yields
112 -- same value for each.
113 }
114
115-- | Use this function to select appropriate default values for
116-- 'ConnectionParameters' other than 'makeConnKey'.
117--
118-- Current defaults:
119--
120-- * 'pingInterval' = 28000
121--
122-- * 'timeout' = 2000
123--
124-- * 'duplex' = True
125--
126connectionDefaults
127 :: (RestrictedSocket -> IO (conkey,u)) -> ConnectionParameters conkey u
128connectionDefaults f = ConnectionParameters
129 { pingInterval = 28000
130 , timeout = 2000
131 , makeConnKey = f
132 , duplex = True
133 }
134
135-- | Instructions for a 'Server' object
136--
137-- To issue a command, put it into the 'serverCommand' TMVar.
138data ServerInstruction conkey u
139 = Quit
140 -- ^ kill the server. This command is automatically issued when
141 -- the server is released.
142 | Listen PortNumber (ConnectionParameters conkey u)
143 -- ^ listen for incoming connections
144 | Connect SockAddr (ConnectionParameters conkey u)
145 -- ^ connect to addresses
146 | ConnectWithEndlessRetry SockAddr
147 (ConnectionParameters conkey u)
148 Miliseconds
149 -- ^ keep retrying the connection
150 | Ignore PortNumber
151 -- ^ stop listening on specified port
152 | Send conkey ByteString
153 -- ^ send bytes to an established connection
154
155#ifdef TEST
156deriving instance Show conkey => Show (ServerInstruction conkey u)
157instance Show (a -> b) where show _ = "<function>"
158deriving instance Show conkey => Show (ConnectionParameters conkey u)
159#endif
160
161-- | This type specifies which which half of a half-duplex
162-- connection is of interest.
163data InOrOut = In | Out
164 deriving (Enum,Eq,Ord,Show,Read)
165
166-- | These events may be read from 'serverEvent' TChannel.
167--
168data ConnectionEvent b
169 = Connection (STM Bool) (Source IO b) (Sink (Flush b) IO ())
170 -- ^ A new connection was established
171 | ConnectFailure SockAddr
172 -- ^ A 'Connect' command failed.
173 | HalfConnection InOrOut
174 -- ^ Half of a half-duplex connection is avaliable.
175 | EOF
176 -- ^ A connection was terminated
177 | RequiresPing
178 -- ^ 'pingInterval' miliseconds of idle was experienced
179
180#ifdef TEST
181instance Show (IO a) where show _ = "<IO action>"
182instance Show (STM a) where show _ = "<STM action>"
183instance Eq (ByteString -> IO Bool) where (==) _ _ = True
184instance Eq (IO (Maybe ByteString)) where (==) _ _ = True
185instance Eq (STM Bool) where (==) _ _ = True
186deriving instance Show b => Show (ConnectionEvent b)
187deriving instance Eq b => Eq (ConnectionEvent b)
188#endif
189
190-- | This is the per-connection state.
191data ConnectionRecord u
192 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler
193 , cstate :: ConnectionState -- ^ used to send/receive data to the connection
194 , cdata :: u -- ^ user data, stored in the connection map for convenience
195 }
196
197-- | This object accepts commands and signals events and maintains
198-- the list of currently listening ports and established connections.
199data Server a u releaseKey b
200 = Server { serverCommand :: TMVar (ServerInstruction a u)
201 , serverEvent :: TChan ((a,u), ConnectionEvent b)
202 , serverReleaseKey :: releaseKey
203 , conmap :: TVar (Map a (ConnectionRecord u))
204 , listenmap :: TVar (Map PortNumber ServerHandle)
205 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
206 }
207
208control :: Server a u releaseKey b -> ServerInstruction a u -> IO ()
209control sv = atomically . putTMVar (serverCommand sv)
210
211type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
212
213noCleanUp :: MonadIO m => Allocate () m
214noCleanUp io _ = ( (,) () ) `liftM` liftIO io
215
216-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
217-- to ensure proper cleanup. For example,
218--
219-- > import Server
220-- > import Control.Monad.Trans.Resource (runResourceT)
221-- > import Control.Monad.IO.Class (liftIO)
222-- > import Control.Monad.STM (atomically)
223-- > import Control.Concurrent.STM.TMVar (putTMVar)
224-- > import Control.Concurrent.STM.TChan (readTChan)
225-- >
226-- > main = runResourceT $ do
227-- > sv <- server allocate
228-- > let params = connectionDefaults (return . snd)
229-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params)
230-- > let loop = do
231-- > (_,event) <- atomically $ readTChan (serverEvent sv)
232-- > case event of
233-- > Connection getPingFlag readData writeData -> do
234-- > forkIO $ do
235-- > fix $ \readLoop -> do
236-- > readData >>= mapM $ \bytes ->
237-- > putStrLn $ "got: " ++ show bytes
238-- > readLoop
239-- > case event of EOF -> return ()
240-- > _ -> loop
241-- > liftIO loop
242--
243-- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp'
244-- to do without automatic cleanup and be sure to remember to write 'Quit' to
245-- the 'serverCommand' variable.
246server ::
247 -- forall (m :: * -> *) a u conkey releaseKey.
248 (Show conkey, MonadIO m, Ord conkey) =>
249 Allocate releaseKey m
250 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
251 -> m (Server conkey u releaseKey x)
252server allocate sessionConduits = do
253 (key,cmds) <- allocate (atomically newEmptyTMVar)
254 (atomically . flip putTMVar Quit)
255 server <- liftIO . atomically $ do
256 tchan <- newTChan
257 conmap <- newTVar Map.empty
258 listenmap<- newTVar Map.empty
259 retrymap <- newTVar Map.empty
260 return Server { serverCommand = cmds
261 , serverEvent = tchan
262 , serverReleaseKey = key
263 , conmap = conmap
264 , listenmap = listenmap
265 , retrymap = retrymap
266 }
267 liftIO $ do
268 tid <- forkIO $ fix $ \loop -> do
269 instr <- atomically $ takeTMVar cmds
270 -- warn $ "instr = " <> bshow instr
271 let again = do doit server instr
272 -- warn $ "finished " <> bshow instr
273 loop
274 case instr of Quit -> closeAll server
275 _ -> again
276 labelThread tid "server"
277 return server
278 where
279 closeAll server = do
280 listening <- atomically . readTVar $ listenmap server
281 mapM_ quitListening (Map.elems listening)
282 let stopRetry (v,d) = do atomically $ writeTVar v False
283 interruptDelay d
284 retriers <- atomically $ do
285 rmap <- readTVar $ retrymap server
286 writeTVar (retrymap server) Map.empty
287 return rmap
288 mapM_ stopRetry (Map.elems retriers)
289 cons <- atomically . readTVar $ conmap server
290 atomically $ mapM_ (connClose . cstate) (Map.elems cons)
291 atomically $ mapM_ (connWait . cstate) (Map.elems cons)
292 atomically $ writeTVar (conmap server) Map.empty
293
294
295 doit server (Listen port params) = do
296
297 listening <- Map.member port
298 `fmap` atomically (readTVar $ listenmap server)
299 when (not listening) $ do
300
301 hPutStrLn stderr $ "Started listening on "++show port
302
303 let family = AF_INET6
304 address = case family of
305 AF_INET -> SockAddrInet port iNADDR_ANY
306 AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0
307
308 sserv <- flip streamServer address ServerConfig
309 { serverWarn = hPutStrLn stderr
310 , serverSession = \sock _ h -> do
311 (conkey,u) <- makeConnKey params sock
312 _ <- newConnection server sessionConduits params conkey u h In
313 return ()
314 }
315
316 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
317
318 doit server (Ignore port) = do
319 hPutStrLn stderr $ "Stopping listen on "++show port
320 mb <- atomically $ do
321 map <- readTVar $ listenmap server
322 modifyTVar' (listenmap server) $ Map.delete port
323 return $ Map.lookup port map
324 maybe (return ()) quitListening $ mb
325
326 doit server (Send con bs) = do -- . void . forkIO $ do
327 map <- atomically $ readTVar (conmap server)
328 let post False = (trace ("cant send: "++show bs) $ return ())
329 post True = return ()
330 maybe (post False)
331 (post <=< flip connWrite bs . cstate)
332 $ Map.lookup con map
333
334 doit server (Connect addr params) = join $ atomically $ do
335 Map.lookup addr <$> readTVar (retrymap server)
336 >>= return . \case
337 Nothing -> forkit
338 Just (v,d) -> do b <- atomically $ readTVar v
339 interruptDelay d
340 when (not b) forkit
341 where
342 forkit = void . forkIO $ do
343 myThreadId >>= flip labelThread ( "Connect." ++ show addr )
344 proto <- getProtocolNumber "tcp"
345 sock <- socket (socketFamily addr) Stream proto
346 handle (\e -> do -- let t = ioeGetErrorType e
347 when (isDoesNotExistError e) $ return () -- warn "GOTCHA"
348 -- warn $ "connect-error: " <> bshow e
349 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ?
350 Socket.close sock
351 atomically
352 $ writeTChan (serverEvent server)
353 $ ((conkey,u),ConnectFailure addr))
354 $ do
355 connect sock addr
356 (conkey,u) <- makeConnKey params (restrictSocket sock)
357 h <- socketToHandle sock ReadWriteMode
358 newConnection server sessionConduits params conkey u h Out
359 return ()
360
361 doit server (ConnectWithEndlessRetry addr params interval) = do
362 proto <- getProtocolNumber "tcp"
363 void . forkIO $ do
364 myThreadId >>= flip labelThread ("ConnectWithEndlessRetry." ++ show addr)
365 timer <- interruptibleDelay
366 (retryVar,action) <- atomically $ do
367 map <- readTVar (retrymap server)
368 action <- case Map.lookup addr map of
369 Nothing -> return $ return ()
370 Just (v,d) -> do writeTVar v False
371 return $ interruptDelay d
372 v <- newTVar True
373 writeTVar (retrymap server) $! Map.insert addr (v,timer) map
374 return (v,action :: IO ())
375 action
376 fix $ \retryLoop -> do
377 utc <- getCurrentTime
378 shouldRetry <- do
379 handle (\(SomeException e) -> do
380 -- Exceptions thrown by 'socket' need to be handled specially
381 -- since we don't have enough information to broadcast a ConnectFailure
382 -- on serverEvent.
383 warn $ "Failed to create socket: " <> bshow e
384 atomically $ readTVar retryVar) $ do
385 sock <- socket (socketFamily addr) Stream proto
386 handle (\(SomeException e) -> do
387 -- Any thing else goes wrong and we broadcast ConnectFailure.
388 do (conkey,u) <- makeConnKey params (restrictSocket sock)
389 Socket.close sock
390 atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr)
391 `onException` return ()
392 atomically $ readTVar retryVar) $ do
393 connect sock addr
394 (conkey,u) <- makeConnKey params (restrictSocket sock)
395 h <- socketToHandle sock ReadWriteMode
396 threads <- newConnection server sessionConduits params conkey u h Out
397 atomically $ do threadsWait threads
398 readTVar retryVar
399 fin_utc <- getCurrentTime
400 when shouldRetry $ do
401 let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc)
402 expected = fromIntegral interval
403 when (shouldRetry && elapsed < expected) $ do
404 debugNoise $ "Waiting to retry " <> bshow addr
405 void $ startDelay timer (round $ 1000 * (expected-elapsed))
406 debugNoise $ "retry " <> bshow (shouldRetry,addr)
407 when shouldRetry $ retryLoop
408
409
410-- INTERNAL ----------------------------------------------------------
411
412{-
413hWriteUntilNothing h outs =
414 fix $ \loop -> do
415 mb <- atomically $ takeTMVar outs
416 case mb of Just bs -> do S.hPutStrLn h bs
417 warn $ "wrote " <> bs
418 loop
419 Nothing -> do warn $ "wrote Nothing"
420 hClose h
421
422-}
423connRead :: ConnectionState -> IO (Maybe ByteString)
424connRead (WriteOnlyConnection w) = do
425 -- atomically $ discardContents (threadsChannel w)
426 return Nothing
427connRead conn = do
428 c <- atomically $ getThreads
429 threadsRead c
430 where
431 getThreads =
432 case conn of SaneConnection c -> return c
433 ReadOnlyConnection c -> return c
434 ConnectionPair c w -> do
435 -- discardContents (threadsChannel w)
436 return c
437
438socketFamily :: SockAddr -> Family
439socketFamily (SockAddrInet _ _) = AF_INET
440socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
441socketFamily (SockAddrUnix _) = AF_UNIX
442
443
444conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
445 -> ConnectionState
446 -> ConnectionEvent x
447conevent sessionConduits con = Connection pingflag read write
448 where
449 pingflag = swapTVar (pingFlag (connPingTimer con)) False
450 (read,write) = sessionConduits (connRead con) (connWrite con)
451
452newConnection :: Ord a
453 => Server a u1 releaseKey x
454 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
455 -> ConnectionParameters conkey u
456 -> a
457 -> u1
458 -> Handle
459 -> InOrOut
460 -> IO ConnectionThreads
461newConnection server sessionConduits params conkey u h inout = do
462 hSetBuffering h NoBuffering
463 let (idle_ms,timeout_ms) =
464 case (inout,duplex params) of
465 (Out,False) -> ( 0, 0 )
466 _ -> ( pingInterval params
467 , timeout params )
468
469 new <- do pinglogic <- forkPingMachine idle_ms timeout_ms
470 connectionThreads h pinglogic
471 started <- atomically $ newEmptyTMVar
472 kontvar <- atomically newEmptyTMVar
473 -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ?
474 let _ = kontvar :: TMVar (STM (IO ()))
475 forkIO $ do
476 myThreadId >>= flip labelThread ("connecting...")
477 getkont <- atomically $ takeTMVar kontvar
478 kont <- atomically getkont
479 kont
480
481 atomically $ do
482 current <- fmap (Map.lookup conkey) $ readTVar (conmap server)
483 case current of
484 Nothing -> do
485 (newCon,e) <- return $
486 if duplex params
487 then let newcon = SaneConnection new
488 in ( newcon, ((conkey,u), conevent sessionConduits newcon) )
489 else ( case inout of
490 In -> ReadOnlyConnection new
491 Out -> WriteOnlyConnection new
492 , ((conkey,u), HalfConnection inout) )
493 modifyTVar' (conmap server) $ Map.insert conkey
494 ConnectionRecord { ckont = kontvar
495 , cstate = newCon
496 , cdata = u }
497 announce e
498 putTMVar kontvar $ return $ do
499 myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice.
500 atomically $ putTMVar started ()
501 -- Wait for something interesting.
502 handleEOF conkey u kontvar newCon
503 Just what@ConnectionRecord { ckont =mvar }-> do
504 putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread.
505 putTMVar mvar $ do
506 -- The action returned by updateConMap, eventually invokes handleEOF,
507 -- so the sequencer thread will not be terminated.
508 kont <- updateConMap conkey u new what
509 putTMVar started ()
510 return kont
511 return new
512 where
513
514 announce e = writeTChan (serverEvent server) e
515
516 -- This function loops and will not quit unless an action is posted to the
517 -- mvar that does not in turn invoke this function, or if an EOF occurs.
518 handleEOF conkey u mvar newCon = do
519 action <- atomically . foldr1 orElse $
520 [ takeTMVar mvar >>= id -- passed continuation
521 , connWait newCon >> return eof
522 , connWaitPing newCon >>= return . sendPing
523 -- , pingWait pingTimer >>= return . sendPing
524 ]
525 action :: IO ()
526 where
527 eof = do
528 -- warn $ "EOF " <>bshow conkey
529 connCancelPing newCon
530 atomically $ do connFlush newCon
531 announce ((conkey,u),EOF)
532 modifyTVar' (conmap server)
533 $ Map.delete conkey
534 -- warn $ "fin-EOF "<>bshow conkey
535
536 sendPing PingTimeOut = do
537 {-
538 utc <- getCurrentTime
539 let utc' = formatTime defaultTimeLocale "%s" utc
540 warn $ "ping:TIMEOUT " <> bshow utc'
541 -}
542 atomically (connClose newCon)
543 eof
544
545 sendPing PingIdle = do
546 {-
547 utc <- getCurrentTime
548 let utc' = formatTime defaultTimeLocale "%s" utc
549 -- warn $ "ping:IDLE " <> bshow utc'
550 -}
551 atomically $ announce ((conkey,u),RequiresPing)
552 handleEOF conkey u mvar newCon
553
554
555 updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do
556 new' <-
557 if duplex params then do
558 announce ((conkey,u),EOF)
559 connClose replaced
560 let newcon = SaneConnection new
561 announce $ ((conkey,u),conevent sessionConduits newcon)
562 return $ newcon
563 else
564 case replaced of
565 WriteOnlyConnection w | inout==In ->
566 do let newcon = ConnectionPair new w
567 announce ((conkey,u),conevent sessionConduits newcon)
568 return newcon
569 ReadOnlyConnection r | inout==Out ->
570 do let newcon = ConnectionPair r new
571 announce ((conkey,u),conevent sessionConduits newcon)
572 return newcon
573 _ -> do -- connFlush todo
574 announce ((conkey,u0), EOF)
575 connClose replaced
576 announce ((conkey,u), HalfConnection inout)
577 return $ case inout of
578 In -> ReadOnlyConnection new
579 Out -> WriteOnlyConnection new
580 modifyTVar' (conmap server) $ Map.insert conkey
581 ConnectionRecord { ckont = mvar
582 , cstate = new'
583 , cdata = u }
584 return $ handleEOF conkey u mvar new'
585
586
587getPacket :: Handle -> IO ByteString
588getPacket h = do hWaitForInput h (-1)
589 hGetNonBlocking h 1024
590
591
592
593-- | 'ConnectionThreads' is an interface to a pair of threads
594-- that are reading and writing a 'Handle'.
595data ConnectionThreads = ConnectionThreads
596 { threadsWriter :: TMVar (Maybe ByteString)
597 , threadsChannel :: TChan ByteString
598 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close
599 , threadsPing :: PingMachine
600 }
601
602-- | This spawns the reader and writer threads and returns a newly
603-- constructed 'ConnectionThreads' object.
604connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
605connectionThreads h pinglogic = do
606
607 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
608
609 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
610 readerThread <- forkIO $ do
611 myThreadId >>= flip labelThread ("readerThread")
612 let finished e = do
613 hClose h
614 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
615 -- let _ = fmap ioeGetErrorType e -- type hint
616 let _ = fmap what e where what (SomeException _) = undefined
617 atomically $ do tryTakeTMVar outs
618 putTMVar outs Nothing -- quit writer
619 putTMVar doner ()
620 handle (finished . Just) $ do
621 pingBump pinglogic -- start the ping timer
622 fix $ \loop -> do
623 packet <- getPacket h
624 -- warn $ "read: " <> S.take 60 packet
625 atomically $ writeTChan incomming packet
626 pingBump pinglogic
627 -- warn $ "bumped: " <> S.take 60 packet
628 isEof <- hIsEOF h
629 if isEof then finished Nothing else loop
630
631 writerThread <- forkIO . fix $ \loop -> do
632 myThreadId >>= flip labelThread ("writerThread")
633 let finished = do -- warn $ "finished write"
634 -- hClose h -- quit reader
635 throwTo readerThread (ErrorCall "EOF")
636 atomically $ putTMVar donew ()
637 mb <- atomically $ readTMVar outs
638 case mb of Just bs -> handle (\(SomeException e)->finished)
639 (do -- warn $ "writing: " <> S.take 60 bs
640 S.hPutStr h bs
641 -- warn $ "wrote: " <> S.take 60 bs
642 atomically $ takeTMVar outs
643 loop)
644 Nothing -> finished
645
646 let wait = do readTMVar donew
647 readTMVar doner
648 return ()
649 return ConnectionThreads { threadsWriter = outs
650 , threadsChannel = incomming
651 , threadsWait = wait
652 , threadsPing = pinglogic }
653
654
655-- | 'threadsWrite' writes the given 'ByteString' to the
656-- 'ConnectionThreads' object. It blocks until the ByteString
657-- is written and 'True' is returned, or the connection is
658-- interrupted and 'False' is returned.
659threadsWrite :: ConnectionThreads -> ByteString -> IO Bool
660threadsWrite c bs = atomically $
661 orElse (const False `fmap` threadsWait c)
662 (const True `fmap` putTMVar (threadsWriter c) (Just bs))
663
664-- | 'threadsClose' signals for the 'ConnectionThreads' object
665-- to quit and close the associated 'Handle'. This operation
666-- is non-blocking, follow it with 'threadsWait' if you want
667-- to wait for the operation to complete.
668threadsClose :: ConnectionThreads -> STM ()
669threadsClose c = do
670 let mvar = threadsWriter c
671 v <- tryReadTMVar mvar
672 case v of
673 Just Nothing -> return () -- already closed
674 _ -> putTMVar mvar Nothing
675
676-- | 'threadsRead' blocks until a 'ByteString' is available which
677-- is returned to the caller, or the connection is interrupted and
678-- 'Nothing' is returned.
679threadsRead :: ConnectionThreads -> IO (Maybe ByteString)
680threadsRead c = atomically $
681 orElse (const Nothing `fmap` threadsWait c)
682 (Just `fmap` readTChan (threadsChannel c))
683
684-- | A 'ConnectionState' is an interface to a single 'ConnectionThreads'
685-- or to a pair of 'ConnectionThreads' objects that are considered as one
686-- connection.
687data ConnectionState =
688 SaneConnection ConnectionThreads
689 -- ^ ordinary read/write connection
690 | WriteOnlyConnection ConnectionThreads
691 | ReadOnlyConnection ConnectionThreads
692 | ConnectionPair ConnectionThreads ConnectionThreads
693 -- ^ Two 'ConnectionThreads' objects, read operations use the
694 -- first, write operations use the second.
695
696
697
698connWrite :: ConnectionState -> ByteString -> IO Bool
699connWrite (ReadOnlyConnection _) bs = return False
700connWrite conn bs = threadsWrite c bs
701 where
702 c = case conn of SaneConnection c -> c
703 WriteOnlyConnection c -> c
704 ConnectionPair _ c -> c
705
706
707mapConn :: Bool ->
708 (ConnectionThreads -> STM ()) -> ConnectionState -> STM ()
709mapConn both action c =
710 case c of
711 SaneConnection rw -> action rw
712 ReadOnlyConnection r -> action r
713 WriteOnlyConnection w -> action w
714 ConnectionPair r w -> do
715 rem <- orElse (const w `fmap` action r)
716 (const r `fmap` action w)
717 when both $ action rem
718
719connClose :: ConnectionState -> STM ()
720connClose c = mapConn True threadsClose c
721
722connWait :: ConnectionState -> STM ()
723connWait c = doit -- mapConn False threadsWait c
724 where
725 action = threadsWait
726 doit =
727 case c of
728 SaneConnection rw -> action rw
729 ReadOnlyConnection r -> action r
730 WriteOnlyConnection w -> action w
731 ConnectionPair r w -> do
732 rem <- orElse (const w `fmap` action r)
733 (const r `fmap` action w)
734 threadsClose rem
735
736connPingTimer :: ConnectionState -> PingMachine
737connPingTimer c =
738 case c of
739 SaneConnection rw -> threadsPing rw
740 ReadOnlyConnection r -> threadsPing r
741 WriteOnlyConnection w -> threadsPing w -- should be disabled.
742 ConnectionPair r w -> threadsPing r
743
744connCancelPing :: ConnectionState -> IO ()
745connCancelPing c = pingCancel (connPingTimer c)
746
747connWaitPing :: ConnectionState -> STM PingEvent
748connWaitPing c = pingWait (connPingTimer c)
749
750connFlush :: ConnectionState -> STM ()
751connFlush c =
752 case c of
753 SaneConnection rw -> waitChan rw
754 ReadOnlyConnection r -> waitChan r
755 WriteOnlyConnection w -> return ()
756 ConnectionPair r w -> waitChan r
757 where
758 waitChan t = do
759 b <- isEmptyTChan (threadsChannel t)
760 when (not b) retry
761
762bshow :: Show a => a -> ByteString
763bshow e = S.pack . show $ e
764
765warn :: ByteString -> IO ()
766warn str = S.hPutStrLn stderr str >> hFlush stderr
767
768debugNoise :: Monad m => t -> m ()
769debugNoise str = return ()
770
771data TCPStatus = Resolving | AwaitingRead | AwaitingWrite
772
773tcpManager :: ( Show k, Ord k, Ord conkey ) =>
774 (conkey -> (SockAddr, ConnectionParameters conkey u, Miliseconds))
775 -> (String -> Maybe k)
776 -> (k -> IO (Maybe conkey))
777 -> Server conkey u releaseKey x
778 -> IO (Manager TCPStatus k)
779tcpManager grokKey s2k resolvKey sv = do
780 rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey)
781 nullping <- forkPingMachine 0 0
782 return Manager {
783 setPolicy = \k -> \case
784 TryingToConnect -> join $ atomically $ do
785 r <- readTVar rmap
786 case Map.lookup k r of
787 Just {} -> return $ return () -- Connection already in progress.
788 Nothing -> do
789 modifyTVar' rmap $ Map.insert k Nothing
790 return $ void $ forkIO $ do
791 myThreadId >>= flip labelThread ("resolve."++show k)
792 mconkey <- resolvKey k
793 case mconkey of
794 Nothing -> atomically $ modifyTVar' rmap $ Map.delete k
795 Just conkey -> do
796 control sv $ case grokKey conkey of
797 (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms
798 OpenToConnect -> hPutStrLn stderr "TODO: TCP OpenToConnect"
799 RefusingToConnect -> hPutStrLn stderr "TODO: TCP RefusingToConnect"
800 , connections = do
801 c <- readTVar $ conmap sv
802 fmap (exportConnection nullping c) <$> readTVar rmap
803 , stringToKey = s2k
804 , showProgress = \case
805 Resolving -> "resolving"
806 AwaitingRead -> "awaiting inbound"
807 AwaitingWrite -> "awaiting outbound"
808 , showKey = show
809 }
810
811exportConnection :: Ord conkey => PingMachine -> Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus
812exportConnection nullping conmap mkey = G.Connection
813 { G.connStatus = case mkey of
814 Nothing -> return $ G.Dormant
815 Just conkey -> case Map.lookup conkey conmap of
816 Nothing -> return $ G.InProgress Resolving
817 Just (ConnectionRecord ckont cstate cdata) -> return $ case cstate of
818 SaneConnection {} -> G.Established
819 ConnectionPair {} -> G.Established
820 ReadOnlyConnection {} -> G.InProgress AwaitingWrite
821 WriteOnlyConnection {} -> G.InProgress AwaitingRead
822 , G.connPolicy = return TryingToConnect
823 , G.connPingLogic = case mkey >>= flip Map.lookup conmap of
824 Nothing -> nullping
825 Just (ConnectionRecord _ cstate _) -> connPingTimer cstate
826 }