summaryrefslogtreecommitdiff
path: root/server/src/Connection/Tcp.hs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/Connection/Tcp.hs')
-rw-r--r--server/src/Connection/Tcp.hs825
1 files changed, 825 insertions, 0 deletions
diff --git a/server/src/Connection/Tcp.hs b/server/src/Connection/Tcp.hs
new file mode 100644
index 00000000..7d93e7de
--- /dev/null
+++ b/server/src/Connection/Tcp.hs
@@ -0,0 +1,825 @@
1{-# OPTIONS_HADDOCK prune #-}
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE DoAndIfThenElse #-}
4{-# LANGUAGE FlexibleInstances #-}
5{-# LANGUAGE LambdaCase #-}
6{-# LANGUAGE NondecreasingIndentation #-}
7{-# LANGUAGE OverloadedStrings #-}
8{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE StandaloneDeriving #-}
10{-# LANGUAGE TupleSections #-}
11-----------------------------------------------------------------------------
12-- |
13-- Module : Connection.Tcp
14--
15-- Maintainer : joe@jerkface.net
16-- Stability : experimental
17--
18-- A TCP client/server library.
19--
20-- TODO:
21--
22-- * interface tweaks
23--
24module Connection.Tcp
25 ( module Connection.Tcp
26 , module Control.Concurrent.PingMachine ) where
27
28import Data.ByteString (ByteString,hGetNonBlocking)
29import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack)
30import Data.Conduit ( ConduitT, Void, Flush )
31#if MIN_VERSION_containers(0,5,0)
32import qualified Data.Map.Strict as Map
33import Data.Map.Strict (Map)
34#else
35import qualified Data.Map as Map
36import Data.Map (Map)
37#endif
38import Data.Monoid ( (<>) )
39import Control.Concurrent.ThreadUtil
40
41import Control.Arrow
42import Control.Concurrent.STM
43-- import Control.Concurrent.STM.TMVar
44-- import Control.Concurrent.STM.TChan
45-- import Control.Concurrent.STM.Delay
46import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException)
47import Control.Monad
48import Control.Monad.Fix
49-- import Control.Monad.STM
50-- import Control.Monad.Trans.Resource
51import Control.Monad.IO.Class (MonadIO (liftIO))
52import Data.Maybe
53import System.IO.Error (isDoesNotExistError)
54import System.IO
55 ( IOMode(..)
56 , hSetBuffering
57 , BufferMode(..)
58 , hWaitForInput
59 , hClose
60 , hIsEOF
61 , Handle
62 )
63import Network.Socket as Socket
64import Network.BSD
65 ( getProtocolNumber
66 )
67import Debug.Trace
68import Data.Time.Clock (getCurrentTime,diffUTCTime)
69-- import SockAddr ()
70-- import System.Locale (defaultTimeLocale)
71
72import qualified Data.Text as Text
73 ;import Data.Text (Text)
74import DNSCache
75import Control.Concurrent.Delay
76import Control.Concurrent.PingMachine
77import Network.StreamServer
78import Network.SocketLike hiding (sClose)
79import qualified Connection as G
80 ;import Connection (Manager (..), PeerAddress (..), Policy (..))
81import Network.Address (localhost4)
82import DPut
83import DebugTag
84
85
86type Microseconds = Int
87
88-- | This object is passed with the 'Listen' and 'Connect'
89-- instructions in order to control the behavior of the
90-- connections that are established. It is parameterized
91-- by a user-suplied type @conkey@ that is used as a lookup
92-- key for connections.
93data ConnectionParameters conkey u =
94 ConnectionParameters
95 { pingInterval :: PingInterval
96 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
97 -- event is signaled.
98 , timeout :: TimeOut
99 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
100 -- that are necessary for the connection to be considered
101 -- lost and signalling 'EOF'.
102 , makeConnKey :: (RestrictedSocket,(Local SockAddr, Remote SockAddr)) -> IO (conkey,u)
103 -- ^ This action creates a lookup key for a new connection. If 'duplex'
104 -- is 'True' and the result is already assocatied with an established
105 -- connection, then an 'EOF' will be forced before the the new
106 -- connection becomes active.
107 --
108 , duplex :: Bool
109 -- ^ If True, then the connection will be treated as a normal
110 -- two-way socket. Otherwise, a readable socket is established
111 -- with 'Listen' and a writable socket is established with
112 -- 'Connect' and they are associated when 'makeConnKey' yields
113 -- same value for each.
114 }
115
116-- | Use this function to select appropriate default values for
117-- 'ConnectionParameters' other than 'makeConnKey'.
118--
119-- Current defaults:
120--
121-- * 'pingInterval' = 28000
122--
123-- * 'timeout' = 2000
124--
125-- * 'duplex' = True
126--
127connectionDefaults
128 :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> IO (conkey,u)) -> ConnectionParameters conkey u
129connectionDefaults f = ConnectionParameters
130 { pingInterval = 28000
131 , timeout = 2000
132 , makeConnKey = f
133 , duplex = True
134 }
135
136-- | Instructions for a 'Server' object
137--
138-- To issue a command, put it into the 'serverCommand' TMVar.
139data ServerInstruction conkey u
140 = Quit
141 -- ^ kill the server. This command is automatically issued when
142 -- the server is released.
143 | Listen SockAddr (ConnectionParameters conkey u)
144 -- ^ listen for incoming connections on the given bind address.
145 | Connect SockAddr (ConnectionParameters conkey u)
146 -- ^ connect to addresses
147 | ConnectWithEndlessRetry SockAddr
148 (ConnectionParameters conkey u)
149 Miliseconds
150 -- ^ keep retrying the connection
151 | Ignore SockAddr
152 -- ^ stop listening on specified bind address
153 | Send conkey ByteString
154 -- ^ send bytes to an established connection
155
156#ifdef TEST
157deriving instance Show conkey => Show (ServerInstruction conkey u)
158instance Show (a -> b) where show _ = "<function>"
159deriving instance Show conkey => Show (ConnectionParameters conkey u)
160#endif
161
162-- | This type specifies which which half of a half-duplex
163-- connection is of interest.
164data InOrOut = In | Out
165 deriving (Enum,Eq,Ord,Show,Read)
166
167-- | These events may be read from 'serverEvent' TChannel.
168--
169data ConnectionEvent b
170 = Connection (STM Bool) (ConduitT () b IO ()) (ConduitT (Flush b) Void IO ())
171 -- ^ A new connection was established
172 | ConnectFailure SockAddr
173 -- ^ A 'Connect' command failed.
174 | HalfConnection InOrOut
175 -- ^ Half of a half-duplex connection is avaliable.
176 | EOF
177 -- ^ A connection was terminated
178 | RequiresPing
179 -- ^ 'pingInterval' miliseconds of idle was experienced
180
181#ifdef TEST
182instance Show (IO a) where show _ = "<IO action>"
183instance Show (STM a) where show _ = "<STM action>"
184instance Eq (ByteString -> IO Bool) where (==) _ _ = True
185instance Eq (IO (Maybe ByteString)) where (==) _ _ = True
186instance Eq (STM Bool) where (==) _ _ = True
187deriving instance Show b => Show (ConnectionEvent b)
188deriving instance Eq b => Eq (ConnectionEvent b)
189#endif
190
191-- | This is the per-connection state.
192data ConnectionRecord u
193 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler
194 , cstate :: ConnectionState -- ^ used to send/receive data to the connection
195 , cdata :: u -- ^ user data, stored in the connection map for convenience
196 }
197
198-- | This object accepts commands and signals events and maintains
199-- the list of currently listening ports and established connections.
200data Server a u releaseKey b
201 = Server { serverCommand :: TMVar (ServerInstruction a u)
202 , serverEvent :: TChan ((a,u), ConnectionEvent b)
203 , serverReleaseKey :: releaseKey
204 , conmap :: TVar (Map a (ConnectionRecord u))
205 , listenmap :: TVar (Map SockAddr ServerHandle)
206 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
207 }
208
209control :: Server a u releaseKey b -> ServerInstruction a u -> IO ()
210control sv = atomically . putTMVar (serverCommand sv)
211
212type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
213
214noCleanUp :: MonadIO m => Allocate () m
215noCleanUp io _ = ( (,) () ) `liftM` liftIO io
216
217-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
218-- to ensure proper cleanup. For example,
219--
220-- > import Connection.Tcp
221-- > import Control.Monad.Trans.Resource (runResourceT)
222-- > import Control.Monad.IO.Class (liftIO)
223-- > import Control.Monad.STM (atomically)
224-- > import Control.Concurrent.STM.TMVar (putTMVar)
225-- > import Control.Concurrent.STM.TChan (readTChan)
226-- >
227-- > main = runResourceT $ do
228-- > sv <- server allocate
229-- > let params = connectionDefaults (return . snd)
230-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params)
231-- > let loop = do
232-- > (_,event) <- atomically $ readTChan (serverEvent sv)
233-- > case event of
234-- > Connection getPingFlag readData writeData -> do
235-- > forkIO $ do
236-- > fix $ \readLoop -> do
237-- > readData >>= mapM $ \bytes ->
238-- > putStrLn $ "got: " ++ show bytes
239-- > readLoop
240-- > case event of EOF -> return ()
241-- > _ -> loop
242-- > liftIO loop
243--
244-- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp'
245-- to do without automatic cleanup and be sure to remember to write 'Quit' to
246-- the 'serverCommand' variable.
247server ::
248 -- forall (m :: * -> *) a u conkey releaseKey.
249 (Show conkey, MonadIO m, Ord conkey) =>
250 Allocate releaseKey m
251 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
252 -> m (Server conkey u releaseKey x)
253server allocate sessionConduits = do
254 (key,cmds) <- allocate (atomically newEmptyTMVar)
255 (atomically . flip putTMVar Quit)
256 server <- liftIO . atomically $ do
257 tchan <- newTChan
258 conmap <- newTVar Map.empty
259 listenmap<- newTVar Map.empty
260 retrymap <- newTVar Map.empty
261 return Server { serverCommand = cmds
262 , serverEvent = tchan
263 , serverReleaseKey = key
264 , conmap = conmap
265 , listenmap = listenmap
266 , retrymap = retrymap
267 }
268 liftIO $ do
269 forkLabeled "server" $ fix $ \loop -> do
270 instr <- atomically $ takeTMVar cmds
271 -- warn $ "instr = " <> bshow instr
272 let again = do doit server instr
273 -- warn $ "finished " <> bshow instr
274 loop
275 case instr of Quit -> closeAll server
276 _ -> again
277 return server
278 where
279 closeAll server = do
280 listening <- atomically . readTVar $ listenmap server
281 mapM_ quitListening (Map.elems listening)
282 let stopRetry (v,d) = do atomically $ writeTVar v False
283 interruptDelay d
284 retriers <- atomically $ do
285 rmap <- readTVar $ retrymap server
286 writeTVar (retrymap server) Map.empty
287 return rmap
288 mapM_ stopRetry (Map.elems retriers)
289 cons <- atomically . readTVar $ conmap server
290 atomically $ mapM_ (connClose . cstate) (Map.elems cons)
291 atomically $ mapM_ (connWait . cstate) (Map.elems cons)
292 atomically $ writeTVar (conmap server) Map.empty
293
294
295 doit server (Listen port params) = do
296
297 listening <- Map.member port
298 `fmap` atomically (readTVar $ listenmap server)
299 when (not listening) $ do
300
301 dput XMisc $ "Started listening on "++show port
302
303 sserv <- flip streamServer [port] ServerConfig
304 { serverWarn = dput XMisc
305 , serverSession = \sock _ h -> do
306 (conkey,u) <- makeConnKey params sock
307 _ <- newConnection server sessionConduits params conkey u h In
308 return ()
309 }
310
311 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
312
313 doit server (Ignore port) = do
314 dput XMisc $ "Stopping listen on "++show port
315 mb <- atomically $ do
316 map <- readTVar $ listenmap server
317 modifyTVar' (listenmap server) $ Map.delete port
318 return $ Map.lookup port map
319 maybe (return ()) quitListening $ mb
320
321 doit server (Send con bs) = do -- . void . forkIO $ do
322 map <- atomically $ readTVar (conmap server)
323 let post False = (trace ("cant send: "++show bs) $ return ())
324 post True = return ()
325 maybe (post False)
326 (post <=< flip connWrite bs . cstate)
327 $ Map.lookup con map
328
329 doit server (Connect addr params) = join $ atomically $ do
330 Map.lookup addr <$> readTVar (retrymap server)
331 >>= return . \case
332 Nothing -> forkit
333 Just (v,d) -> do b <- atomically $ readTVar v
334 interruptDelay d
335 when (not b) forkit
336 where
337 forkit = void . forkLabeled ( "Connect." ++ show addr ) $ do
338 proto <- getProtocolNumber "tcp"
339 sock <- socket (socketFamily addr) Stream proto
340 handle (\e -> do -- let t = ioeGetErrorType e
341 when (isDoesNotExistError e) $ return () -- warn "GOTCHA"
342 -- warn $ "connect-error: " <> bshow e
343 (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) -- XXX: ?
344 Socket.close sock
345 atomically
346 $ writeTChan (serverEvent server)
347 $ ((conkey,u),ConnectFailure addr))
348 $ do
349 connect sock addr
350 laddr <- Socket.getSocketName sock
351 (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr))
352 h <- socketToHandle sock ReadWriteMode
353 newConnection server sessionConduits params conkey u h Out
354 return ()
355
356 doit server (ConnectWithEndlessRetry addr params interval) = do
357 proto <- getProtocolNumber "tcp"
358 void . forkLabeled ("ConnectWithEndlessRetry." ++ show addr) $ do
359 timer <- interruptibleDelay
360 (retryVar,action) <- atomically $ do
361 map <- readTVar (retrymap server)
362 action <- case Map.lookup addr map of
363 Nothing -> return $ return ()
364 Just (v,d) -> do writeTVar v False
365 return $ interruptDelay d
366 v <- newTVar True
367 writeTVar (retrymap server) $! Map.insert addr (v,timer) map
368 return (v,action :: IO ())
369 action
370 fix $ \retryLoop -> do
371 utc <- getCurrentTime
372 shouldRetry <- do
373 handle (\(SomeException e) -> do
374 -- Exceptions thrown by 'socket' need to be handled specially
375 -- since we don't have enough information to broadcast a ConnectFailure
376 -- on serverEvent.
377 warn $ "Failed to create socket: " <> bshow e
378 atomically $ readTVar retryVar) $ do
379 sock <- socket (socketFamily addr) Stream proto
380 handle (\(SomeException e) -> do
381 -- Any thing else goes wrong and we broadcast ConnectFailure.
382 do (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr))
383 Socket.close sock
384 atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr)
385 `onException` return ()
386 atomically $ readTVar retryVar) $ do
387 connect sock addr
388 laddr <- Socket.getSocketName sock
389 (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr))
390 h <- socketToHandle sock ReadWriteMode
391 threads <- newConnection server sessionConduits params conkey u h Out
392 atomically $ do threadsWait threads
393 readTVar retryVar
394 fin_utc <- getCurrentTime
395 when shouldRetry $ do
396 let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc)
397 expected = fromIntegral interval
398 when (shouldRetry && elapsed < expected) $ do
399 debugNoise $ "Waiting to retry " <> bshow addr
400 void $ startDelay timer (round $ 1000 * (expected-elapsed))
401 debugNoise $ "retry " <> bshow (shouldRetry,addr)
402 when shouldRetry $ retryLoop
403
404
405-- INTERNAL ----------------------------------------------------------
406
407{-
408hWriteUntilNothing h outs =
409 fix $ \loop -> do
410 mb <- atomically $ takeTMVar outs
411 case mb of Just bs -> do S.hPutStrLn h bs
412 warn $ "wrote " <> bs
413 loop
414 Nothing -> do warn $ "wrote Nothing"
415 hClose h
416
417-}
418connRead :: ConnectionState -> IO (Maybe ByteString)
419connRead (WriteOnlyConnection w) = do
420 -- atomically $ discardContents (threadsChannel w)
421 return Nothing
422connRead conn = do
423 c <- atomically $ getThreads
424 threadsRead c
425 where
426 getThreads =
427 case conn of SaneConnection c -> return c
428 ReadOnlyConnection c -> return c
429 ConnectionPair c w -> do
430 -- discardContents (threadsChannel w)
431 return c
432
433socketFamily :: SockAddr -> Family
434socketFamily (SockAddrInet _ _) = AF_INET
435socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
436socketFamily (SockAddrUnix _) = AF_UNIX
437
438
439conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
440 -> ConnectionState
441 -> ConnectionEvent x
442conevent sessionConduits con = Connection pingflag read write
443 where
444 pingflag = swapTVar (pingFlag (connPingTimer con)) False
445 (read,write) = sessionConduits (connRead con) (connWrite con)
446
447newConnection :: Ord a
448 => Server a u1 releaseKey x
449 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
450 -> ConnectionParameters conkey u
451 -> a
452 -> u1
453 -> Handle
454 -> InOrOut
455 -> IO ConnectionThreads
456newConnection server sessionConduits params conkey u h inout = do
457 hSetBuffering h NoBuffering
458 let (idle_ms,timeout_ms) =
459 case (inout,duplex params) of
460 (Out,False) -> ( 0, 0 )
461 _ -> ( pingInterval params
462 , timeout params )
463
464 new <- do pinglogic <- forkPingMachine "newConnection" idle_ms timeout_ms
465 connectionThreads h pinglogic
466 started <- atomically $ newEmptyTMVar
467 kontvar <- atomically newEmptyTMVar
468 -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ?
469 let _ = kontvar :: TMVar (STM (IO ()))
470 forkLabeled ("connecting...") $ do
471 getkont <- atomically $ takeTMVar kontvar
472 kont <- atomically getkont
473 kont
474
475 atomically $ do
476 current <- fmap (Map.lookup conkey) $ readTVar (conmap server)
477 case current of
478 Nothing -> do
479 (newCon,e) <- return $
480 if duplex params
481 then let newcon = SaneConnection new
482 in ( newcon, ((conkey,u), conevent sessionConduits newcon) )
483 else ( case inout of
484 In -> ReadOnlyConnection new
485 Out -> WriteOnlyConnection new
486 , ((conkey,u), HalfConnection inout) )
487 modifyTVar' (conmap server) $ Map.insert conkey
488 ConnectionRecord { ckont = kontvar
489 , cstate = newCon
490 , cdata = u }
491 announce e
492 putTMVar kontvar $ return $ do
493 myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice.
494 atomically $ putTMVar started ()
495 -- Wait for something interesting.
496 handleEOF conkey u kontvar newCon
497 Just what@ConnectionRecord { ckont =mvar }-> do
498 putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread.
499 putTMVar mvar $ do
500 -- The action returned by updateConMap, eventually invokes handleEOF,
501 -- so the sequencer thread will not be terminated.
502 kont <- updateConMap conkey u new what
503 putTMVar started ()
504 return kont
505 return new
506 where
507
508 announce e = writeTChan (serverEvent server) e
509
510 -- This function loops and will not quit unless an action is posted to the
511 -- mvar that does not in turn invoke this function, or if an EOF occurs.
512 handleEOF conkey u mvar newCon = do
513 action <- atomically . foldr1 orElse $
514 [ takeTMVar mvar >>= id -- passed continuation
515 , connWait newCon >> return eof
516 , connWaitPing newCon >>= return . sendPing
517 -- , pingWait pingTimer >>= return . sendPing
518 ]
519 action :: IO ()
520 where
521 eof = do
522 -- warn $ "EOF " <>bshow conkey
523 connCancelPing newCon
524 atomically $ do connFlush newCon
525 announce ((conkey,u),EOF)
526 modifyTVar' (conmap server)
527 $ Map.delete conkey
528 -- warn $ "fin-EOF "<>bshow conkey
529
530 sendPing PingTimeOut = do
531 {-
532 utc <- getCurrentTime
533 let utc' = formatTime defaultTimeLocale "%s" utc
534 warn $ "ping:TIMEOUT " <> bshow utc'
535 -}
536 atomically (connClose newCon)
537 eof
538
539 sendPing PingIdle = do
540 {-
541 utc <- getCurrentTime
542 let utc' = formatTime defaultTimeLocale "%s" utc
543 -- warn $ "ping:IDLE " <> bshow utc'
544 -}
545 atomically $ announce ((conkey,u),RequiresPing)
546 handleEOF conkey u mvar newCon
547
548
549 updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do
550 new' <-
551 if duplex params then do
552 announce ((conkey,u),EOF)
553 connClose replaced
554 let newcon = SaneConnection new
555 announce $ ((conkey,u),conevent sessionConduits newcon)
556 return $ newcon
557 else
558 case replaced of
559 WriteOnlyConnection w | inout==In ->
560 do let newcon = ConnectionPair new w
561 announce ((conkey,u),conevent sessionConduits newcon)
562 return newcon
563 ReadOnlyConnection r | inout==Out ->
564 do let newcon = ConnectionPair r new
565 announce ((conkey,u),conevent sessionConduits newcon)
566 return newcon
567 _ -> do -- connFlush todo
568 announce ((conkey,u0), EOF)
569 connClose replaced
570 announce ((conkey,u), HalfConnection inout)
571 return $ case inout of
572 In -> ReadOnlyConnection new
573 Out -> WriteOnlyConnection new
574 modifyTVar' (conmap server) $ Map.insert conkey
575 ConnectionRecord { ckont = mvar
576 , cstate = new'
577 , cdata = u }
578 return $ handleEOF conkey u mvar new'
579
580
581getPacket :: Handle -> IO ByteString
582getPacket h = do hWaitForInput h (-1)
583 hGetNonBlocking h 1024
584
585
586
587-- | 'ConnectionThreads' is an interface to a pair of threads
588-- that are reading and writing a 'Handle'.
589data ConnectionThreads = ConnectionThreads
590 { threadsWriter :: TMVar (Maybe ByteString)
591 , threadsChannel :: TChan ByteString
592 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close
593 , threadsPing :: PingMachine
594 }
595
596-- | This spawns the reader and writer threads and returns a newly
597-- constructed 'ConnectionThreads' object.
598connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
599connectionThreads h pinglogic = do
600
601 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
602
603 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
604 readerThread <- forkLabeled "readerThread" $ do
605 let finished e = do
606 hClose h
607 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
608 -- let _ = fmap ioeGetErrorType e -- type hint
609 let _ = fmap what e where what (SomeException _) = undefined
610 atomically $ do tryTakeTMVar outs
611 putTMVar outs Nothing -- quit writer
612 putTMVar doner ()
613 handle (finished . Just) $ do
614 pingBump pinglogic -- start the ping timer
615 fix $ \loop -> do
616 packet <- getPacket h
617 -- warn $ "read: " <> S.take 60 packet
618 atomically $ writeTChan incomming packet
619 pingBump pinglogic
620 -- warn $ "bumped: " <> S.take 60 packet
621 isEof <- hIsEOF h
622 if isEof then finished Nothing else loop
623
624 writerThread <- forkLabeled "writerThread" . fix $ \loop -> do
625 let finished = do -- warn $ "finished write"
626 -- hClose h -- quit reader
627 throwTo readerThread (ErrorCall "EOF")
628 atomically $ putTMVar donew ()
629 mb <- atomically $ readTMVar outs
630 case mb of Just bs -> handle (\(SomeException e)->finished)
631 (do -- warn $ "writing: " <> S.take 60 bs
632 S.hPutStr h bs
633 -- warn $ "wrote: " <> S.take 60 bs
634 atomically $ takeTMVar outs
635 loop)
636 Nothing -> finished
637
638 let wait = do readTMVar donew
639 readTMVar doner
640 return ()
641 return ConnectionThreads { threadsWriter = outs
642 , threadsChannel = incomming
643 , threadsWait = wait
644 , threadsPing = pinglogic }
645
646
647-- | 'threadsWrite' writes the given 'ByteString' to the
648-- 'ConnectionThreads' object. It blocks until the ByteString
649-- is written and 'True' is returned, or the connection is
650-- interrupted and 'False' is returned.
651threadsWrite :: ConnectionThreads -> ByteString -> IO Bool
652threadsWrite c bs = atomically $
653 orElse (const False `fmap` threadsWait c)
654 (const True `fmap` putTMVar (threadsWriter c) (Just bs))
655
656-- | 'threadsClose' signals for the 'ConnectionThreads' object
657-- to quit and close the associated 'Handle'. This operation
658-- is non-blocking, follow it with 'threadsWait' if you want
659-- to wait for the operation to complete.
660threadsClose :: ConnectionThreads -> STM ()
661threadsClose c = do
662 let mvar = threadsWriter c
663 v <- tryReadTMVar mvar
664 case v of
665 Just Nothing -> return () -- already closed
666 _ -> putTMVar mvar Nothing
667
668-- | 'threadsRead' blocks until a 'ByteString' is available which
669-- is returned to the caller, or the connection is interrupted and
670-- 'Nothing' is returned.
671threadsRead :: ConnectionThreads -> IO (Maybe ByteString)
672threadsRead c = atomically $
673 orElse (const Nothing `fmap` threadsWait c)
674 (Just `fmap` readTChan (threadsChannel c))
675
676-- | A 'ConnectionState' is an interface to a single 'ConnectionThreads'
677-- or to a pair of 'ConnectionThreads' objects that are considered as one
678-- connection.
679data ConnectionState =
680 SaneConnection ConnectionThreads
681 -- ^ ordinary read/write connection
682 | WriteOnlyConnection ConnectionThreads
683 | ReadOnlyConnection ConnectionThreads
684 | ConnectionPair ConnectionThreads ConnectionThreads
685 -- ^ Two 'ConnectionThreads' objects, read operations use the
686 -- first, write operations use the second.
687
688
689
690connWrite :: ConnectionState -> ByteString -> IO Bool
691connWrite (ReadOnlyConnection _) bs = return False
692connWrite conn bs = threadsWrite c bs
693 where
694 c = case conn of SaneConnection c -> c
695 WriteOnlyConnection c -> c
696 ConnectionPair _ c -> c
697
698
699mapConn :: Bool ->
700 (ConnectionThreads -> STM ()) -> ConnectionState -> STM ()
701mapConn both action c =
702 case c of
703 SaneConnection rw -> action rw
704 ReadOnlyConnection r -> action r
705 WriteOnlyConnection w -> action w
706 ConnectionPair r w -> do
707 rem <- orElse (const w `fmap` action r)
708 (const r `fmap` action w)
709 when both $ action rem
710
711connClose :: ConnectionState -> STM ()
712connClose c = mapConn True threadsClose c
713
714connWait :: ConnectionState -> STM ()
715connWait c = doit -- mapConn False threadsWait c
716 where
717 action = threadsWait
718 doit =
719 case c of
720 SaneConnection rw -> action rw
721 ReadOnlyConnection r -> action r
722 WriteOnlyConnection w -> action w
723 ConnectionPair r w -> do
724 rem <- orElse (const w `fmap` action r)
725 (const r `fmap` action w)
726 threadsClose rem
727
728connPingTimer :: ConnectionState -> PingMachine
729connPingTimer c =
730 case c of
731 SaneConnection rw -> threadsPing rw
732 ReadOnlyConnection r -> threadsPing r
733 WriteOnlyConnection w -> threadsPing w -- should be disabled.
734 ConnectionPair r w -> threadsPing r
735
736connCancelPing :: ConnectionState -> IO ()
737connCancelPing c = pingCancel (connPingTimer c)
738
739connWaitPing :: ConnectionState -> STM PingEvent
740connWaitPing c = pingWait (connPingTimer c)
741
742connFlush :: ConnectionState -> STM ()
743connFlush c =
744 case c of
745 SaneConnection rw -> waitChan rw
746 ReadOnlyConnection r -> waitChan r
747 WriteOnlyConnection w -> return ()
748 ConnectionPair r w -> waitChan r
749 where
750 waitChan t = do
751 b <- isEmptyTChan (threadsChannel t)
752 when (not b) retry
753
754bshow :: Show a => a -> ByteString
755bshow e = S.pack . show $ e
756
757warn :: ByteString -> IO ()
758warn str =dputB XMisc str
759
760debugNoise :: Monad m => t -> m ()
761debugNoise str = return ()
762
763data TCPStatus = Resolving | AwaitingRead | AwaitingWrite
764
765-- SockAddr -> (SockAddr, ConnectionParameters SockAddr ConnectionData, Miliseconds)
766
767
768tcpManager :: (PeerAddress -> (SockAddr, ConnectionParameters PeerAddress u, Miliseconds))
769 -- -> (String -> Maybe Text)
770 -- -> (Text -> IO (Maybe PeerAddress))
771 -> Server PeerAddress u releaseKey x
772 -> IO (Manager TCPStatus Text)
773tcpManager grokKey sv = do
774 rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey)
775 nullping <- forkPingMachine "tcpManager" 0 0
776 (rslv,rev) <- do
777 dns <- newDNSCache
778 let rslv k = map PeerAddress <$> forwardResolve dns k
779 rev (PeerAddress addr) = reverseResolve dns addr
780 return (rslv,rev)
781 return Manager {
782 setPolicy = \k -> \case
783 TryingToConnect -> join $ atomically $ do
784 r <- readTVar rmap
785 case Map.lookup k r of
786 Just {} -> return $ return () -- Connection already in progress.
787 Nothing -> do
788 modifyTVar' rmap $ Map.insert k Nothing
789 return $ void $ forkLabeled ("resolve."++show k) $ do
790 mconkey <- listToMaybe <$> rslv k
791 case mconkey of
792 Nothing -> atomically $ modifyTVar' rmap $ Map.delete k
793 Just conkey -> do
794 control sv $ case grokKey conkey of
795 (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms
796 OpenToConnect -> dput XMisc "TODO: TCP OpenToConnect"
797 RefusingToConnect -> dput XMisc "TODO: TCP RefusingToConnect"
798 , status = \k -> do
799 c <- readTVar (conmap sv)
800 ck <- Map.lookup k <$> readTVar rmap
801 return $ exportConnection c (join ck)
802 , connections = Map.keys <$> readTVar rmap
803 , stringToKey = Just . Text.pack
804 , showProgress = \case
805 Resolving -> "resolving"
806 AwaitingRead -> "awaiting inbound"
807 AwaitingWrite -> "awaiting outbound"
808 , showKey = show
809 , resolvePeer = rslv
810 , reverseAddress = rev
811 }
812
813exportConnection :: Ord conkey => Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus
814exportConnection conmap mkey = G.Connection
815 { G.connStatus = case mkey of
816 Nothing -> G.Dormant
817 Just conkey -> case Map.lookup conkey conmap of
818 Nothing -> G.InProgress Resolving
819 Just (ConnectionRecord ckont cstate cdata) -> case cstate of
820 SaneConnection {} -> G.Established
821 ConnectionPair {} -> G.Established
822 ReadOnlyConnection {} -> G.InProgress AwaitingWrite
823 WriteOnlyConnection {} -> G.InProgress AwaitingRead
824 , G.connPolicy = TryingToConnect
825 }