summaryrefslogtreecommitdiff
path: root/dht/Connection
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 15:35:23 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-03 17:26:06 -0500
commit31b799222cb76cd0002d9a3cc5b340a7b6fed139 (patch)
tree8b834e455529fb270375e4967d1acad56553544f /dht/Connection
parent1e03ed3670a8386ede93a09fa0c67785e7da6478 (diff)
server library.
Diffstat (limited to 'dht/Connection')
-rw-r--r--dht/Connection/Tcp.hs824
1 files changed, 0 insertions, 824 deletions
diff --git a/dht/Connection/Tcp.hs b/dht/Connection/Tcp.hs
deleted file mode 100644
index 4d50d47f..00000000
--- a/dht/Connection/Tcp.hs
+++ /dev/null
@@ -1,824 +0,0 @@
1{-# OPTIONS_HADDOCK prune #-}
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE DoAndIfThenElse #-}
4{-# LANGUAGE FlexibleInstances #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7{-# LANGUAGE StandaloneDeriving #-}
8{-# LANGUAGE TupleSections #-}
9{-# LANGUAGE LambdaCase #-}
10-----------------------------------------------------------------------------
11-- |
12-- Module : Connection.Tcp
13--
14-- Maintainer : joe@jerkface.net
15-- Stability : experimental
16--
17-- A TCP client/server library.
18--
19-- TODO:
20--
21-- * interface tweaks
22--
23module Connection.Tcp
24 ( module Connection.Tcp
25 , module Control.Concurrent.PingMachine ) where
26
27import Data.ByteString (ByteString,hGetNonBlocking)
28import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack)
29import Data.Conduit ( ConduitT, Void, Flush )
30#if MIN_VERSION_containers(0,5,0)
31import qualified Data.Map.Strict as Map
32import Data.Map.Strict (Map)
33#else
34import qualified Data.Map as Map
35import Data.Map (Map)
36#endif
37import Data.Monoid ( (<>) )
38import Control.Concurrent.ThreadUtil
39
40import Control.Arrow
41import Control.Concurrent.STM
42-- import Control.Concurrent.STM.TMVar
43-- import Control.Concurrent.STM.TChan
44-- import Control.Concurrent.STM.Delay
45import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException)
46import Control.Monad
47import Control.Monad.Fix
48-- import Control.Monad.STM
49-- import Control.Monad.Trans.Resource
50import Control.Monad.IO.Class (MonadIO (liftIO))
51import Data.Maybe
52import System.IO.Error (isDoesNotExistError)
53import System.IO
54 ( IOMode(..)
55 , hSetBuffering
56 , BufferMode(..)
57 , hWaitForInput
58 , hClose
59 , hIsEOF
60 , Handle
61 )
62import Network.Socket as Socket
63import Network.BSD
64 ( getProtocolNumber
65 )
66import Debug.Trace
67import Data.Time.Clock (getCurrentTime,diffUTCTime)
68-- import SockAddr ()
69-- import System.Locale (defaultTimeLocale)
70
71import qualified Data.Text as Text
72 ;import Data.Text (Text)
73import DNSCache
74import Control.Concurrent.Delay
75import Control.Concurrent.PingMachine
76import Network.StreamServer
77import Network.SocketLike hiding (sClose)
78import qualified Connection as G
79 ;import Connection (Manager (..), PeerAddress (..), Policy (..))
80import Network.Address (localhost4)
81import DPut
82import DebugTag
83
84
85type Microseconds = Int
86
87-- | This object is passed with the 'Listen' and 'Connect'
88-- instructions in order to control the behavior of the
89-- connections that are established. It is parameterized
90-- by a user-suplied type @conkey@ that is used as a lookup
91-- key for connections.
92data ConnectionParameters conkey u =
93 ConnectionParameters
94 { pingInterval :: PingInterval
95 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
96 -- event is signaled.
97 , timeout :: TimeOut
98 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
99 -- that are necessary for the connection to be considered
100 -- lost and signalling 'EOF'.
101 , makeConnKey :: (RestrictedSocket,(Local SockAddr, Remote SockAddr)) -> IO (conkey,u)
102 -- ^ This action creates a lookup key for a new connection. If 'duplex'
103 -- is 'True' and the result is already assocatied with an established
104 -- connection, then an 'EOF' will be forced before the the new
105 -- connection becomes active.
106 --
107 , duplex :: Bool
108 -- ^ If True, then the connection will be treated as a normal
109 -- two-way socket. Otherwise, a readable socket is established
110 -- with 'Listen' and a writable socket is established with
111 -- 'Connect' and they are associated when 'makeConnKey' yields
112 -- same value for each.
113 }
114
115-- | Use this function to select appropriate default values for
116-- 'ConnectionParameters' other than 'makeConnKey'.
117--
118-- Current defaults:
119--
120-- * 'pingInterval' = 28000
121--
122-- * 'timeout' = 2000
123--
124-- * 'duplex' = True
125--
126connectionDefaults
127 :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> IO (conkey,u)) -> ConnectionParameters conkey u
128connectionDefaults f = ConnectionParameters
129 { pingInterval = 28000
130 , timeout = 2000
131 , makeConnKey = f
132 , duplex = True
133 }
134
135-- | Instructions for a 'Server' object
136--
137-- To issue a command, put it into the 'serverCommand' TMVar.
138data ServerInstruction conkey u
139 = Quit
140 -- ^ kill the server. This command is automatically issued when
141 -- the server is released.
142 | Listen SockAddr (ConnectionParameters conkey u)
143 -- ^ listen for incoming connections on the given bind address.
144 | Connect SockAddr (ConnectionParameters conkey u)
145 -- ^ connect to addresses
146 | ConnectWithEndlessRetry SockAddr
147 (ConnectionParameters conkey u)
148 Miliseconds
149 -- ^ keep retrying the connection
150 | Ignore SockAddr
151 -- ^ stop listening on specified bind address
152 | Send conkey ByteString
153 -- ^ send bytes to an established connection
154
155#ifdef TEST
156deriving instance Show conkey => Show (ServerInstruction conkey u)
157instance Show (a -> b) where show _ = "<function>"
158deriving instance Show conkey => Show (ConnectionParameters conkey u)
159#endif
160
161-- | This type specifies which which half of a half-duplex
162-- connection is of interest.
163data InOrOut = In | Out
164 deriving (Enum,Eq,Ord,Show,Read)
165
166-- | These events may be read from 'serverEvent' TChannel.
167--
168data ConnectionEvent b
169 = Connection (STM Bool) (ConduitT () b IO ()) (ConduitT (Flush b) Void IO ())
170 -- ^ A new connection was established
171 | ConnectFailure SockAddr
172 -- ^ A 'Connect' command failed.
173 | HalfConnection InOrOut
174 -- ^ Half of a half-duplex connection is avaliable.
175 | EOF
176 -- ^ A connection was terminated
177 | RequiresPing
178 -- ^ 'pingInterval' miliseconds of idle was experienced
179
180#ifdef TEST
181instance Show (IO a) where show _ = "<IO action>"
182instance Show (STM a) where show _ = "<STM action>"
183instance Eq (ByteString -> IO Bool) where (==) _ _ = True
184instance Eq (IO (Maybe ByteString)) where (==) _ _ = True
185instance Eq (STM Bool) where (==) _ _ = True
186deriving instance Show b => Show (ConnectionEvent b)
187deriving instance Eq b => Eq (ConnectionEvent b)
188#endif
189
190-- | This is the per-connection state.
191data ConnectionRecord u
192 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler
193 , cstate :: ConnectionState -- ^ used to send/receive data to the connection
194 , cdata :: u -- ^ user data, stored in the connection map for convenience
195 }
196
197-- | This object accepts commands and signals events and maintains
198-- the list of currently listening ports and established connections.
199data Server a u releaseKey b
200 = Server { serverCommand :: TMVar (ServerInstruction a u)
201 , serverEvent :: TChan ((a,u), ConnectionEvent b)
202 , serverReleaseKey :: releaseKey
203 , conmap :: TVar (Map a (ConnectionRecord u))
204 , listenmap :: TVar (Map SockAddr ServerHandle)
205 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
206 }
207
208control :: Server a u releaseKey b -> ServerInstruction a u -> IO ()
209control sv = atomically . putTMVar (serverCommand sv)
210
211type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
212
213noCleanUp :: MonadIO m => Allocate () m
214noCleanUp io _ = ( (,) () ) `liftM` liftIO io
215
216-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
217-- to ensure proper cleanup. For example,
218--
219-- > import Connection.Tcp
220-- > import Control.Monad.Trans.Resource (runResourceT)
221-- > import Control.Monad.IO.Class (liftIO)
222-- > import Control.Monad.STM (atomically)
223-- > import Control.Concurrent.STM.TMVar (putTMVar)
224-- > import Control.Concurrent.STM.TChan (readTChan)
225-- >
226-- > main = runResourceT $ do
227-- > sv <- server allocate
228-- > let params = connectionDefaults (return . snd)
229-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params)
230-- > let loop = do
231-- > (_,event) <- atomically $ readTChan (serverEvent sv)
232-- > case event of
233-- > Connection getPingFlag readData writeData -> do
234-- > forkIO $ do
235-- > fix $ \readLoop -> do
236-- > readData >>= mapM $ \bytes ->
237-- > putStrLn $ "got: " ++ show bytes
238-- > readLoop
239-- > case event of EOF -> return ()
240-- > _ -> loop
241-- > liftIO loop
242--
243-- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp'
244-- to do without automatic cleanup and be sure to remember to write 'Quit' to
245-- the 'serverCommand' variable.
246server ::
247 -- forall (m :: * -> *) a u conkey releaseKey.
248 (Show conkey, MonadIO m, Ord conkey) =>
249 Allocate releaseKey m
250 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
251 -> m (Server conkey u releaseKey x)
252server allocate sessionConduits = do
253 (key,cmds) <- allocate (atomically newEmptyTMVar)
254 (atomically . flip putTMVar Quit)
255 server <- liftIO . atomically $ do
256 tchan <- newTChan
257 conmap <- newTVar Map.empty
258 listenmap<- newTVar Map.empty
259 retrymap <- newTVar Map.empty
260 return Server { serverCommand = cmds
261 , serverEvent = tchan
262 , serverReleaseKey = key
263 , conmap = conmap
264 , listenmap = listenmap
265 , retrymap = retrymap
266 }
267 liftIO $ do
268 forkLabeled "server" $ fix $ \loop -> do
269 instr <- atomically $ takeTMVar cmds
270 -- warn $ "instr = " <> bshow instr
271 let again = do doit server instr
272 -- warn $ "finished " <> bshow instr
273 loop
274 case instr of Quit -> closeAll server
275 _ -> again
276 return server
277 where
278 closeAll server = do
279 listening <- atomically . readTVar $ listenmap server
280 mapM_ quitListening (Map.elems listening)
281 let stopRetry (v,d) = do atomically $ writeTVar v False
282 interruptDelay d
283 retriers <- atomically $ do
284 rmap <- readTVar $ retrymap server
285 writeTVar (retrymap server) Map.empty
286 return rmap
287 mapM_ stopRetry (Map.elems retriers)
288 cons <- atomically . readTVar $ conmap server
289 atomically $ mapM_ (connClose . cstate) (Map.elems cons)
290 atomically $ mapM_ (connWait . cstate) (Map.elems cons)
291 atomically $ writeTVar (conmap server) Map.empty
292
293
294 doit server (Listen port params) = do
295
296 listening <- Map.member port
297 `fmap` atomically (readTVar $ listenmap server)
298 when (not listening) $ do
299
300 dput XMisc $ "Started listening on "++show port
301
302 sserv <- flip streamServer [port] ServerConfig
303 { serverWarn = dput XMisc
304 , serverSession = \sock _ h -> do
305 (conkey,u) <- makeConnKey params sock
306 _ <- newConnection server sessionConduits params conkey u h In
307 return ()
308 }
309
310 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
311
312 doit server (Ignore port) = do
313 dput XMisc $ "Stopping listen on "++show port
314 mb <- atomically $ do
315 map <- readTVar $ listenmap server
316 modifyTVar' (listenmap server) $ Map.delete port
317 return $ Map.lookup port map
318 maybe (return ()) quitListening $ mb
319
320 doit server (Send con bs) = do -- . void . forkIO $ do
321 map <- atomically $ readTVar (conmap server)
322 let post False = (trace ("cant send: "++show bs) $ return ())
323 post True = return ()
324 maybe (post False)
325 (post <=< flip connWrite bs . cstate)
326 $ Map.lookup con map
327
328 doit server (Connect addr params) = join $ atomically $ do
329 Map.lookup addr <$> readTVar (retrymap server)
330 >>= return . \case
331 Nothing -> forkit
332 Just (v,d) -> do b <- atomically $ readTVar v
333 interruptDelay d
334 when (not b) forkit
335 where
336 forkit = void . forkLabeled ( "Connect." ++ show addr ) $ do
337 proto <- getProtocolNumber "tcp"
338 sock <- socket (socketFamily addr) Stream proto
339 handle (\e -> do -- let t = ioeGetErrorType e
340 when (isDoesNotExistError e) $ return () -- warn "GOTCHA"
341 -- warn $ "connect-error: " <> bshow e
342 (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) -- XXX: ?
343 Socket.close sock
344 atomically
345 $ writeTChan (serverEvent server)
346 $ ((conkey,u),ConnectFailure addr))
347 $ do
348 connect sock addr
349 laddr <- Socket.getSocketName sock
350 (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr))
351 h <- socketToHandle sock ReadWriteMode
352 newConnection server sessionConduits params conkey u h Out
353 return ()
354
355 doit server (ConnectWithEndlessRetry addr params interval) = do
356 proto <- getProtocolNumber "tcp"
357 void . forkLabeled ("ConnectWithEndlessRetry." ++ show addr) $ do
358 timer <- interruptibleDelay
359 (retryVar,action) <- atomically $ do
360 map <- readTVar (retrymap server)
361 action <- case Map.lookup addr map of
362 Nothing -> return $ return ()
363 Just (v,d) -> do writeTVar v False
364 return $ interruptDelay d
365 v <- newTVar True
366 writeTVar (retrymap server) $! Map.insert addr (v,timer) map
367 return (v,action :: IO ())
368 action
369 fix $ \retryLoop -> do
370 utc <- getCurrentTime
371 shouldRetry <- do
372 handle (\(SomeException e) -> do
373 -- Exceptions thrown by 'socket' need to be handled specially
374 -- since we don't have enough information to broadcast a ConnectFailure
375 -- on serverEvent.
376 warn $ "Failed to create socket: " <> bshow e
377 atomically $ readTVar retryVar) $ do
378 sock <- socket (socketFamily addr) Stream proto
379 handle (\(SomeException e) -> do
380 -- Any thing else goes wrong and we broadcast ConnectFailure.
381 do (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr))
382 Socket.close sock
383 atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr)
384 `onException` return ()
385 atomically $ readTVar retryVar) $ do
386 connect sock addr
387 laddr <- Socket.getSocketName sock
388 (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr))
389 h <- socketToHandle sock ReadWriteMode
390 threads <- newConnection server sessionConduits params conkey u h Out
391 atomically $ do threadsWait threads
392 readTVar retryVar
393 fin_utc <- getCurrentTime
394 when shouldRetry $ do
395 let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc)
396 expected = fromIntegral interval
397 when (shouldRetry && elapsed < expected) $ do
398 debugNoise $ "Waiting to retry " <> bshow addr
399 void $ startDelay timer (round $ 1000 * (expected-elapsed))
400 debugNoise $ "retry " <> bshow (shouldRetry,addr)
401 when shouldRetry $ retryLoop
402
403
404-- INTERNAL ----------------------------------------------------------
405
406{-
407hWriteUntilNothing h outs =
408 fix $ \loop -> do
409 mb <- atomically $ takeTMVar outs
410 case mb of Just bs -> do S.hPutStrLn h bs
411 warn $ "wrote " <> bs
412 loop
413 Nothing -> do warn $ "wrote Nothing"
414 hClose h
415
416-}
417connRead :: ConnectionState -> IO (Maybe ByteString)
418connRead (WriteOnlyConnection w) = do
419 -- atomically $ discardContents (threadsChannel w)
420 return Nothing
421connRead conn = do
422 c <- atomically $ getThreads
423 threadsRead c
424 where
425 getThreads =
426 case conn of SaneConnection c -> return c
427 ReadOnlyConnection c -> return c
428 ConnectionPair c w -> do
429 -- discardContents (threadsChannel w)
430 return c
431
432socketFamily :: SockAddr -> Family
433socketFamily (SockAddrInet _ _) = AF_INET
434socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
435socketFamily (SockAddrUnix _) = AF_UNIX
436
437
438conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
439 -> ConnectionState
440 -> ConnectionEvent x
441conevent sessionConduits con = Connection pingflag read write
442 where
443 pingflag = swapTVar (pingFlag (connPingTimer con)) False
444 (read,write) = sessionConduits (connRead con) (connWrite con)
445
446newConnection :: Ord a
447 => Server a u1 releaseKey x
448 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) )
449 -> ConnectionParameters conkey u
450 -> a
451 -> u1
452 -> Handle
453 -> InOrOut
454 -> IO ConnectionThreads
455newConnection server sessionConduits params conkey u h inout = do
456 hSetBuffering h NoBuffering
457 let (idle_ms,timeout_ms) =
458 case (inout,duplex params) of
459 (Out,False) -> ( 0, 0 )
460 _ -> ( pingInterval params
461 , timeout params )
462
463 new <- do pinglogic <- forkPingMachine "newConnection" idle_ms timeout_ms
464 connectionThreads h pinglogic
465 started <- atomically $ newEmptyTMVar
466 kontvar <- atomically newEmptyTMVar
467 -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ?
468 let _ = kontvar :: TMVar (STM (IO ()))
469 forkLabeled ("connecting...") $ do
470 getkont <- atomically $ takeTMVar kontvar
471 kont <- atomically getkont
472 kont
473
474 atomically $ do
475 current <- fmap (Map.lookup conkey) $ readTVar (conmap server)
476 case current of
477 Nothing -> do
478 (newCon,e) <- return $
479 if duplex params
480 then let newcon = SaneConnection new
481 in ( newcon, ((conkey,u), conevent sessionConduits newcon) )
482 else ( case inout of
483 In -> ReadOnlyConnection new
484 Out -> WriteOnlyConnection new
485 , ((conkey,u), HalfConnection inout) )
486 modifyTVar' (conmap server) $ Map.insert conkey
487 ConnectionRecord { ckont = kontvar
488 , cstate = newCon
489 , cdata = u }
490 announce e
491 putTMVar kontvar $ return $ do
492 myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice.
493 atomically $ putTMVar started ()
494 -- Wait for something interesting.
495 handleEOF conkey u kontvar newCon
496 Just what@ConnectionRecord { ckont =mvar }-> do
497 putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread.
498 putTMVar mvar $ do
499 -- The action returned by updateConMap, eventually invokes handleEOF,
500 -- so the sequencer thread will not be terminated.
501 kont <- updateConMap conkey u new what
502 putTMVar started ()
503 return kont
504 return new
505 where
506
507 announce e = writeTChan (serverEvent server) e
508
509 -- This function loops and will not quit unless an action is posted to the
510 -- mvar that does not in turn invoke this function, or if an EOF occurs.
511 handleEOF conkey u mvar newCon = do
512 action <- atomically . foldr1 orElse $
513 [ takeTMVar mvar >>= id -- passed continuation
514 , connWait newCon >> return eof
515 , connWaitPing newCon >>= return . sendPing
516 -- , pingWait pingTimer >>= return . sendPing
517 ]
518 action :: IO ()
519 where
520 eof = do
521 -- warn $ "EOF " <>bshow conkey
522 connCancelPing newCon
523 atomically $ do connFlush newCon
524 announce ((conkey,u),EOF)
525 modifyTVar' (conmap server)
526 $ Map.delete conkey
527 -- warn $ "fin-EOF "<>bshow conkey
528
529 sendPing PingTimeOut = do
530 {-
531 utc <- getCurrentTime
532 let utc' = formatTime defaultTimeLocale "%s" utc
533 warn $ "ping:TIMEOUT " <> bshow utc'
534 -}
535 atomically (connClose newCon)
536 eof
537
538 sendPing PingIdle = do
539 {-
540 utc <- getCurrentTime
541 let utc' = formatTime defaultTimeLocale "%s" utc
542 -- warn $ "ping:IDLE " <> bshow utc'
543 -}
544 atomically $ announce ((conkey,u),RequiresPing)
545 handleEOF conkey u mvar newCon
546
547
548 updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do
549 new' <-
550 if duplex params then do
551 announce ((conkey,u),EOF)
552 connClose replaced
553 let newcon = SaneConnection new
554 announce $ ((conkey,u),conevent sessionConduits newcon)
555 return $ newcon
556 else
557 case replaced of
558 WriteOnlyConnection w | inout==In ->
559 do let newcon = ConnectionPair new w
560 announce ((conkey,u),conevent sessionConduits newcon)
561 return newcon
562 ReadOnlyConnection r | inout==Out ->
563 do let newcon = ConnectionPair r new
564 announce ((conkey,u),conevent sessionConduits newcon)
565 return newcon
566 _ -> do -- connFlush todo
567 announce ((conkey,u0), EOF)
568 connClose replaced
569 announce ((conkey,u), HalfConnection inout)
570 return $ case inout of
571 In -> ReadOnlyConnection new
572 Out -> WriteOnlyConnection new
573 modifyTVar' (conmap server) $ Map.insert conkey
574 ConnectionRecord { ckont = mvar
575 , cstate = new'
576 , cdata = u }
577 return $ handleEOF conkey u mvar new'
578
579
580getPacket :: Handle -> IO ByteString
581getPacket h = do hWaitForInput h (-1)
582 hGetNonBlocking h 1024
583
584
585
586-- | 'ConnectionThreads' is an interface to a pair of threads
587-- that are reading and writing a 'Handle'.
588data ConnectionThreads = ConnectionThreads
589 { threadsWriter :: TMVar (Maybe ByteString)
590 , threadsChannel :: TChan ByteString
591 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close
592 , threadsPing :: PingMachine
593 }
594
595-- | This spawns the reader and writer threads and returns a newly
596-- constructed 'ConnectionThreads' object.
597connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
598connectionThreads h pinglogic = do
599
600 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
601
602 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
603 readerThread <- forkLabeled "readerThread" $ do
604 let finished e = do
605 hClose h
606 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
607 -- let _ = fmap ioeGetErrorType e -- type hint
608 let _ = fmap what e where what (SomeException _) = undefined
609 atomically $ do tryTakeTMVar outs
610 putTMVar outs Nothing -- quit writer
611 putTMVar doner ()
612 handle (finished . Just) $ do
613 pingBump pinglogic -- start the ping timer
614 fix $ \loop -> do
615 packet <- getPacket h
616 -- warn $ "read: " <> S.take 60 packet
617 atomically $ writeTChan incomming packet
618 pingBump pinglogic
619 -- warn $ "bumped: " <> S.take 60 packet
620 isEof <- hIsEOF h
621 if isEof then finished Nothing else loop
622
623 writerThread <- forkLabeled "writerThread" . fix $ \loop -> do
624 let finished = do -- warn $ "finished write"
625 -- hClose h -- quit reader
626 throwTo readerThread (ErrorCall "EOF")
627 atomically $ putTMVar donew ()
628 mb <- atomically $ readTMVar outs
629 case mb of Just bs -> handle (\(SomeException e)->finished)
630 (do -- warn $ "writing: " <> S.take 60 bs
631 S.hPutStr h bs
632 -- warn $ "wrote: " <> S.take 60 bs
633 atomically $ takeTMVar outs
634 loop)
635 Nothing -> finished
636
637 let wait = do readTMVar donew
638 readTMVar doner
639 return ()
640 return ConnectionThreads { threadsWriter = outs
641 , threadsChannel = incomming
642 , threadsWait = wait
643 , threadsPing = pinglogic }
644
645
646-- | 'threadsWrite' writes the given 'ByteString' to the
647-- 'ConnectionThreads' object. It blocks until the ByteString
648-- is written and 'True' is returned, or the connection is
649-- interrupted and 'False' is returned.
650threadsWrite :: ConnectionThreads -> ByteString -> IO Bool
651threadsWrite c bs = atomically $
652 orElse (const False `fmap` threadsWait c)
653 (const True `fmap` putTMVar (threadsWriter c) (Just bs))
654
655-- | 'threadsClose' signals for the 'ConnectionThreads' object
656-- to quit and close the associated 'Handle'. This operation
657-- is non-blocking, follow it with 'threadsWait' if you want
658-- to wait for the operation to complete.
659threadsClose :: ConnectionThreads -> STM ()
660threadsClose c = do
661 let mvar = threadsWriter c
662 v <- tryReadTMVar mvar
663 case v of
664 Just Nothing -> return () -- already closed
665 _ -> putTMVar mvar Nothing
666
667-- | 'threadsRead' blocks until a 'ByteString' is available which
668-- is returned to the caller, or the connection is interrupted and
669-- 'Nothing' is returned.
670threadsRead :: ConnectionThreads -> IO (Maybe ByteString)
671threadsRead c = atomically $
672 orElse (const Nothing `fmap` threadsWait c)
673 (Just `fmap` readTChan (threadsChannel c))
674
675-- | A 'ConnectionState' is an interface to a single 'ConnectionThreads'
676-- or to a pair of 'ConnectionThreads' objects that are considered as one
677-- connection.
678data ConnectionState =
679 SaneConnection ConnectionThreads
680 -- ^ ordinary read/write connection
681 | WriteOnlyConnection ConnectionThreads
682 | ReadOnlyConnection ConnectionThreads
683 | ConnectionPair ConnectionThreads ConnectionThreads
684 -- ^ Two 'ConnectionThreads' objects, read operations use the
685 -- first, write operations use the second.
686
687
688
689connWrite :: ConnectionState -> ByteString -> IO Bool
690connWrite (ReadOnlyConnection _) bs = return False
691connWrite conn bs = threadsWrite c bs
692 where
693 c = case conn of SaneConnection c -> c
694 WriteOnlyConnection c -> c
695 ConnectionPair _ c -> c
696
697
698mapConn :: Bool ->
699 (ConnectionThreads -> STM ()) -> ConnectionState -> STM ()
700mapConn both action c =
701 case c of
702 SaneConnection rw -> action rw
703 ReadOnlyConnection r -> action r
704 WriteOnlyConnection w -> action w
705 ConnectionPair r w -> do
706 rem <- orElse (const w `fmap` action r)
707 (const r `fmap` action w)
708 when both $ action rem
709
710connClose :: ConnectionState -> STM ()
711connClose c = mapConn True threadsClose c
712
713connWait :: ConnectionState -> STM ()
714connWait c = doit -- mapConn False threadsWait c
715 where
716 action = threadsWait
717 doit =
718 case c of
719 SaneConnection rw -> action rw
720 ReadOnlyConnection r -> action r
721 WriteOnlyConnection w -> action w
722 ConnectionPair r w -> do
723 rem <- orElse (const w `fmap` action r)
724 (const r `fmap` action w)
725 threadsClose rem
726
727connPingTimer :: ConnectionState -> PingMachine
728connPingTimer c =
729 case c of
730 SaneConnection rw -> threadsPing rw
731 ReadOnlyConnection r -> threadsPing r
732 WriteOnlyConnection w -> threadsPing w -- should be disabled.
733 ConnectionPair r w -> threadsPing r
734
735connCancelPing :: ConnectionState -> IO ()
736connCancelPing c = pingCancel (connPingTimer c)
737
738connWaitPing :: ConnectionState -> STM PingEvent
739connWaitPing c = pingWait (connPingTimer c)
740
741connFlush :: ConnectionState -> STM ()
742connFlush c =
743 case c of
744 SaneConnection rw -> waitChan rw
745 ReadOnlyConnection r -> waitChan r
746 WriteOnlyConnection w -> return ()
747 ConnectionPair r w -> waitChan r
748 where
749 waitChan t = do
750 b <- isEmptyTChan (threadsChannel t)
751 when (not b) retry
752
753bshow :: Show a => a -> ByteString
754bshow e = S.pack . show $ e
755
756warn :: ByteString -> IO ()
757warn str =dputB XMisc str
758
759debugNoise :: Monad m => t -> m ()
760debugNoise str = return ()
761
762data TCPStatus = Resolving | AwaitingRead | AwaitingWrite
763
764-- SockAddr -> (SockAddr, ConnectionParameters SockAddr ConnectionData, Miliseconds)
765
766
767tcpManager :: (PeerAddress -> (SockAddr, ConnectionParameters PeerAddress u, Miliseconds))
768 -- -> (String -> Maybe Text)
769 -- -> (Text -> IO (Maybe PeerAddress))
770 -> Server PeerAddress u releaseKey x
771 -> IO (Manager TCPStatus Text)
772tcpManager grokKey sv = do
773 rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey)
774 nullping <- forkPingMachine "tcpManager" 0 0
775 (rslv,rev) <- do
776 dns <- newDNSCache
777 let rslv k = map PeerAddress <$> forwardResolve dns k
778 rev (PeerAddress addr) = reverseResolve dns addr
779 return (rslv,rev)
780 return Manager {
781 setPolicy = \k -> \case
782 TryingToConnect -> join $ atomically $ do
783 r <- readTVar rmap
784 case Map.lookup k r of
785 Just {} -> return $ return () -- Connection already in progress.
786 Nothing -> do
787 modifyTVar' rmap $ Map.insert k Nothing
788 return $ void $ forkLabeled ("resolve."++show k) $ do
789 mconkey <- listToMaybe <$> rslv k
790 case mconkey of
791 Nothing -> atomically $ modifyTVar' rmap $ Map.delete k
792 Just conkey -> do
793 control sv $ case grokKey conkey of
794 (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms
795 OpenToConnect -> dput XMisc "TODO: TCP OpenToConnect"
796 RefusingToConnect -> dput XMisc "TODO: TCP RefusingToConnect"
797 , status = \k -> do
798 c <- readTVar (conmap sv)
799 ck <- Map.lookup k <$> readTVar rmap
800 return $ exportConnection c (join ck)
801 , connections = Map.keys <$> readTVar rmap
802 , stringToKey = Just . Text.pack
803 , showProgress = \case
804 Resolving -> "resolving"
805 AwaitingRead -> "awaiting inbound"
806 AwaitingWrite -> "awaiting outbound"
807 , showKey = show
808 , resolvePeer = rslv
809 , reverseAddress = rev
810 }
811
812exportConnection :: Ord conkey => Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus
813exportConnection conmap mkey = G.Connection
814 { G.connStatus = case mkey of
815 Nothing -> G.Dormant
816 Just conkey -> case Map.lookup conkey conmap of
817 Nothing -> G.InProgress Resolving
818 Just (ConnectionRecord ckont cstate cdata) -> case cstate of
819 SaneConnection {} -> G.Established
820 ConnectionPair {} -> G.Established
821 ReadOnlyConnection {} -> G.InProgress AwaitingWrite
822 WriteOnlyConnection {} -> G.InProgress AwaitingRead
823 , G.connPolicy = TryingToConnect
824 }