diff options
Diffstat (limited to 'dht/Connection/Tcp.hs')
-rw-r--r-- | dht/Connection/Tcp.hs | 824 |
1 files changed, 0 insertions, 824 deletions
diff --git a/dht/Connection/Tcp.hs b/dht/Connection/Tcp.hs deleted file mode 100644 index 4d50d47f..00000000 --- a/dht/Connection/Tcp.hs +++ /dev/null | |||
@@ -1,824 +0,0 @@ | |||
1 | {-# OPTIONS_HADDOCK prune #-} | ||
2 | {-# LANGUAGE CPP #-} | ||
3 | {-# LANGUAGE DoAndIfThenElse #-} | ||
4 | {-# LANGUAGE FlexibleInstances #-} | ||
5 | {-# LANGUAGE OverloadedStrings #-} | ||
6 | {-# LANGUAGE RankNTypes #-} | ||
7 | {-# LANGUAGE StandaloneDeriving #-} | ||
8 | {-# LANGUAGE TupleSections #-} | ||
9 | {-# LANGUAGE LambdaCase #-} | ||
10 | ----------------------------------------------------------------------------- | ||
11 | -- | | ||
12 | -- Module : Connection.Tcp | ||
13 | -- | ||
14 | -- Maintainer : joe@jerkface.net | ||
15 | -- Stability : experimental | ||
16 | -- | ||
17 | -- A TCP client/server library. | ||
18 | -- | ||
19 | -- TODO: | ||
20 | -- | ||
21 | -- * interface tweaks | ||
22 | -- | ||
23 | module Connection.Tcp | ||
24 | ( module Connection.Tcp | ||
25 | , module Control.Concurrent.PingMachine ) where | ||
26 | |||
27 | import Data.ByteString (ByteString,hGetNonBlocking) | ||
28 | import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) | ||
29 | import Data.Conduit ( ConduitT, Void, Flush ) | ||
30 | #if MIN_VERSION_containers(0,5,0) | ||
31 | import qualified Data.Map.Strict as Map | ||
32 | import Data.Map.Strict (Map) | ||
33 | #else | ||
34 | import qualified Data.Map as Map | ||
35 | import Data.Map (Map) | ||
36 | #endif | ||
37 | import Data.Monoid ( (<>) ) | ||
38 | import Control.Concurrent.ThreadUtil | ||
39 | |||
40 | import Control.Arrow | ||
41 | import Control.Concurrent.STM | ||
42 | -- import Control.Concurrent.STM.TMVar | ||
43 | -- import Control.Concurrent.STM.TChan | ||
44 | -- import Control.Concurrent.STM.Delay | ||
45 | import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException) | ||
46 | import Control.Monad | ||
47 | import Control.Monad.Fix | ||
48 | -- import Control.Monad.STM | ||
49 | -- import Control.Monad.Trans.Resource | ||
50 | import Control.Monad.IO.Class (MonadIO (liftIO)) | ||
51 | import Data.Maybe | ||
52 | import System.IO.Error (isDoesNotExistError) | ||
53 | import System.IO | ||
54 | ( IOMode(..) | ||
55 | , hSetBuffering | ||
56 | , BufferMode(..) | ||
57 | , hWaitForInput | ||
58 | , hClose | ||
59 | , hIsEOF | ||
60 | , Handle | ||
61 | ) | ||
62 | import Network.Socket as Socket | ||
63 | import Network.BSD | ||
64 | ( getProtocolNumber | ||
65 | ) | ||
66 | import Debug.Trace | ||
67 | import Data.Time.Clock (getCurrentTime,diffUTCTime) | ||
68 | -- import SockAddr () | ||
69 | -- import System.Locale (defaultTimeLocale) | ||
70 | |||
71 | import qualified Data.Text as Text | ||
72 | ;import Data.Text (Text) | ||
73 | import DNSCache | ||
74 | import Control.Concurrent.Delay | ||
75 | import Control.Concurrent.PingMachine | ||
76 | import Network.StreamServer | ||
77 | import Network.SocketLike hiding (sClose) | ||
78 | import qualified Connection as G | ||
79 | ;import Connection (Manager (..), PeerAddress (..), Policy (..)) | ||
80 | import Network.Address (localhost4) | ||
81 | import DPut | ||
82 | import DebugTag | ||
83 | |||
84 | |||
85 | type Microseconds = Int | ||
86 | |||
87 | -- | This object is passed with the 'Listen' and 'Connect' | ||
88 | -- instructions in order to control the behavior of the | ||
89 | -- connections that are established. It is parameterized | ||
90 | -- by a user-suplied type @conkey@ that is used as a lookup | ||
91 | -- key for connections. | ||
92 | data ConnectionParameters conkey u = | ||
93 | ConnectionParameters | ||
94 | { pingInterval :: PingInterval | ||
95 | -- ^ The miliseconds of idle to allow before a 'RequiresPing' | ||
96 | -- event is signaled. | ||
97 | , timeout :: TimeOut | ||
98 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled | ||
99 | -- that are necessary for the connection to be considered | ||
100 | -- lost and signalling 'EOF'. | ||
101 | , makeConnKey :: (RestrictedSocket,(Local SockAddr, Remote SockAddr)) -> IO (conkey,u) | ||
102 | -- ^ This action creates a lookup key for a new connection. If 'duplex' | ||
103 | -- is 'True' and the result is already assocatied with an established | ||
104 | -- connection, then an 'EOF' will be forced before the the new | ||
105 | -- connection becomes active. | ||
106 | -- | ||
107 | , duplex :: Bool | ||
108 | -- ^ If True, then the connection will be treated as a normal | ||
109 | -- two-way socket. Otherwise, a readable socket is established | ||
110 | -- with 'Listen' and a writable socket is established with | ||
111 | -- 'Connect' and they are associated when 'makeConnKey' yields | ||
112 | -- same value for each. | ||
113 | } | ||
114 | |||
115 | -- | Use this function to select appropriate default values for | ||
116 | -- 'ConnectionParameters' other than 'makeConnKey'. | ||
117 | -- | ||
118 | -- Current defaults: | ||
119 | -- | ||
120 | -- * 'pingInterval' = 28000 | ||
121 | -- | ||
122 | -- * 'timeout' = 2000 | ||
123 | -- | ||
124 | -- * 'duplex' = True | ||
125 | -- | ||
126 | connectionDefaults | ||
127 | :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> IO (conkey,u)) -> ConnectionParameters conkey u | ||
128 | connectionDefaults f = ConnectionParameters | ||
129 | { pingInterval = 28000 | ||
130 | , timeout = 2000 | ||
131 | , makeConnKey = f | ||
132 | , duplex = True | ||
133 | } | ||
134 | |||
135 | -- | Instructions for a 'Server' object | ||
136 | -- | ||
137 | -- To issue a command, put it into the 'serverCommand' TMVar. | ||
138 | data ServerInstruction conkey u | ||
139 | = Quit | ||
140 | -- ^ kill the server. This command is automatically issued when | ||
141 | -- the server is released. | ||
142 | | Listen SockAddr (ConnectionParameters conkey u) | ||
143 | -- ^ listen for incoming connections on the given bind address. | ||
144 | | Connect SockAddr (ConnectionParameters conkey u) | ||
145 | -- ^ connect to addresses | ||
146 | | ConnectWithEndlessRetry SockAddr | ||
147 | (ConnectionParameters conkey u) | ||
148 | Miliseconds | ||
149 | -- ^ keep retrying the connection | ||
150 | | Ignore SockAddr | ||
151 | -- ^ stop listening on specified bind address | ||
152 | | Send conkey ByteString | ||
153 | -- ^ send bytes to an established connection | ||
154 | |||
155 | #ifdef TEST | ||
156 | deriving instance Show conkey => Show (ServerInstruction conkey u) | ||
157 | instance Show (a -> b) where show _ = "<function>" | ||
158 | deriving instance Show conkey => Show (ConnectionParameters conkey u) | ||
159 | #endif | ||
160 | |||
161 | -- | This type specifies which which half of a half-duplex | ||
162 | -- connection is of interest. | ||
163 | data InOrOut = In | Out | ||
164 | deriving (Enum,Eq,Ord,Show,Read) | ||
165 | |||
166 | -- | These events may be read from 'serverEvent' TChannel. | ||
167 | -- | ||
168 | data ConnectionEvent b | ||
169 | = Connection (STM Bool) (ConduitT () b IO ()) (ConduitT (Flush b) Void IO ()) | ||
170 | -- ^ A new connection was established | ||
171 | | ConnectFailure SockAddr | ||
172 | -- ^ A 'Connect' command failed. | ||
173 | | HalfConnection InOrOut | ||
174 | -- ^ Half of a half-duplex connection is avaliable. | ||
175 | | EOF | ||
176 | -- ^ A connection was terminated | ||
177 | | RequiresPing | ||
178 | -- ^ 'pingInterval' miliseconds of idle was experienced | ||
179 | |||
180 | #ifdef TEST | ||
181 | instance Show (IO a) where show _ = "<IO action>" | ||
182 | instance Show (STM a) where show _ = "<STM action>" | ||
183 | instance Eq (ByteString -> IO Bool) where (==) _ _ = True | ||
184 | instance Eq (IO (Maybe ByteString)) where (==) _ _ = True | ||
185 | instance Eq (STM Bool) where (==) _ _ = True | ||
186 | deriving instance Show b => Show (ConnectionEvent b) | ||
187 | deriving instance Eq b => Eq (ConnectionEvent b) | ||
188 | #endif | ||
189 | |||
190 | -- | This is the per-connection state. | ||
191 | data ConnectionRecord u | ||
192 | = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler | ||
193 | , cstate :: ConnectionState -- ^ used to send/receive data to the connection | ||
194 | , cdata :: u -- ^ user data, stored in the connection map for convenience | ||
195 | } | ||
196 | |||
197 | -- | This object accepts commands and signals events and maintains | ||
198 | -- the list of currently listening ports and established connections. | ||
199 | data Server a u releaseKey b | ||
200 | = Server { serverCommand :: TMVar (ServerInstruction a u) | ||
201 | , serverEvent :: TChan ((a,u), ConnectionEvent b) | ||
202 | , serverReleaseKey :: releaseKey | ||
203 | , conmap :: TVar (Map a (ConnectionRecord u)) | ||
204 | , listenmap :: TVar (Map SockAddr ServerHandle) | ||
205 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) | ||
206 | } | ||
207 | |||
208 | control :: Server a u releaseKey b -> ServerInstruction a u -> IO () | ||
209 | control sv = atomically . putTMVar (serverCommand sv) | ||
210 | |||
211 | type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) | ||
212 | |||
213 | noCleanUp :: MonadIO m => Allocate () m | ||
214 | noCleanUp io _ = ( (,) () ) `liftM` liftIO io | ||
215 | |||
216 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | ||
217 | -- to ensure proper cleanup. For example, | ||
218 | -- | ||
219 | -- > import Connection.Tcp | ||
220 | -- > import Control.Monad.Trans.Resource (runResourceT) | ||
221 | -- > import Control.Monad.IO.Class (liftIO) | ||
222 | -- > import Control.Monad.STM (atomically) | ||
223 | -- > import Control.Concurrent.STM.TMVar (putTMVar) | ||
224 | -- > import Control.Concurrent.STM.TChan (readTChan) | ||
225 | -- > | ||
226 | -- > main = runResourceT $ do | ||
227 | -- > sv <- server allocate | ||
228 | -- > let params = connectionDefaults (return . snd) | ||
229 | -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) | ||
230 | -- > let loop = do | ||
231 | -- > (_,event) <- atomically $ readTChan (serverEvent sv) | ||
232 | -- > case event of | ||
233 | -- > Connection getPingFlag readData writeData -> do | ||
234 | -- > forkIO $ do | ||
235 | -- > fix $ \readLoop -> do | ||
236 | -- > readData >>= mapM $ \bytes -> | ||
237 | -- > putStrLn $ "got: " ++ show bytes | ||
238 | -- > readLoop | ||
239 | -- > case event of EOF -> return () | ||
240 | -- > _ -> loop | ||
241 | -- > liftIO loop | ||
242 | -- | ||
243 | -- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp' | ||
244 | -- to do without automatic cleanup and be sure to remember to write 'Quit' to | ||
245 | -- the 'serverCommand' variable. | ||
246 | server :: | ||
247 | -- forall (m :: * -> *) a u conkey releaseKey. | ||
248 | (Show conkey, MonadIO m, Ord conkey) => | ||
249 | Allocate releaseKey m | ||
250 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) | ||
251 | -> m (Server conkey u releaseKey x) | ||
252 | server allocate sessionConduits = do | ||
253 | (key,cmds) <- allocate (atomically newEmptyTMVar) | ||
254 | (atomically . flip putTMVar Quit) | ||
255 | server <- liftIO . atomically $ do | ||
256 | tchan <- newTChan | ||
257 | conmap <- newTVar Map.empty | ||
258 | listenmap<- newTVar Map.empty | ||
259 | retrymap <- newTVar Map.empty | ||
260 | return Server { serverCommand = cmds | ||
261 | , serverEvent = tchan | ||
262 | , serverReleaseKey = key | ||
263 | , conmap = conmap | ||
264 | , listenmap = listenmap | ||
265 | , retrymap = retrymap | ||
266 | } | ||
267 | liftIO $ do | ||
268 | forkLabeled "server" $ fix $ \loop -> do | ||
269 | instr <- atomically $ takeTMVar cmds | ||
270 | -- warn $ "instr = " <> bshow instr | ||
271 | let again = do doit server instr | ||
272 | -- warn $ "finished " <> bshow instr | ||
273 | loop | ||
274 | case instr of Quit -> closeAll server | ||
275 | _ -> again | ||
276 | return server | ||
277 | where | ||
278 | closeAll server = do | ||
279 | listening <- atomically . readTVar $ listenmap server | ||
280 | mapM_ quitListening (Map.elems listening) | ||
281 | let stopRetry (v,d) = do atomically $ writeTVar v False | ||
282 | interruptDelay d | ||
283 | retriers <- atomically $ do | ||
284 | rmap <- readTVar $ retrymap server | ||
285 | writeTVar (retrymap server) Map.empty | ||
286 | return rmap | ||
287 | mapM_ stopRetry (Map.elems retriers) | ||
288 | cons <- atomically . readTVar $ conmap server | ||
289 | atomically $ mapM_ (connClose . cstate) (Map.elems cons) | ||
290 | atomically $ mapM_ (connWait . cstate) (Map.elems cons) | ||
291 | atomically $ writeTVar (conmap server) Map.empty | ||
292 | |||
293 | |||
294 | doit server (Listen port params) = do | ||
295 | |||
296 | listening <- Map.member port | ||
297 | `fmap` atomically (readTVar $ listenmap server) | ||
298 | when (not listening) $ do | ||
299 | |||
300 | dput XMisc $ "Started listening on "++show port | ||
301 | |||
302 | sserv <- flip streamServer [port] ServerConfig | ||
303 | { serverWarn = dput XMisc | ||
304 | , serverSession = \sock _ h -> do | ||
305 | (conkey,u) <- makeConnKey params sock | ||
306 | _ <- newConnection server sessionConduits params conkey u h In | ||
307 | return () | ||
308 | } | ||
309 | |||
310 | atomically $ listenmap server `modifyTVar'` Map.insert port sserv | ||
311 | |||
312 | doit server (Ignore port) = do | ||
313 | dput XMisc $ "Stopping listen on "++show port | ||
314 | mb <- atomically $ do | ||
315 | map <- readTVar $ listenmap server | ||
316 | modifyTVar' (listenmap server) $ Map.delete port | ||
317 | return $ Map.lookup port map | ||
318 | maybe (return ()) quitListening $ mb | ||
319 | |||
320 | doit server (Send con bs) = do -- . void . forkIO $ do | ||
321 | map <- atomically $ readTVar (conmap server) | ||
322 | let post False = (trace ("cant send: "++show bs) $ return ()) | ||
323 | post True = return () | ||
324 | maybe (post False) | ||
325 | (post <=< flip connWrite bs . cstate) | ||
326 | $ Map.lookup con map | ||
327 | |||
328 | doit server (Connect addr params) = join $ atomically $ do | ||
329 | Map.lookup addr <$> readTVar (retrymap server) | ||
330 | >>= return . \case | ||
331 | Nothing -> forkit | ||
332 | Just (v,d) -> do b <- atomically $ readTVar v | ||
333 | interruptDelay d | ||
334 | when (not b) forkit | ||
335 | where | ||
336 | forkit = void . forkLabeled ( "Connect." ++ show addr ) $ do | ||
337 | proto <- getProtocolNumber "tcp" | ||
338 | sock <- socket (socketFamily addr) Stream proto | ||
339 | handle (\e -> do -- let t = ioeGetErrorType e | ||
340 | when (isDoesNotExistError e) $ return () -- warn "GOTCHA" | ||
341 | -- warn $ "connect-error: " <> bshow e | ||
342 | (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) -- XXX: ? | ||
343 | Socket.close sock | ||
344 | atomically | ||
345 | $ writeTChan (serverEvent server) | ||
346 | $ ((conkey,u),ConnectFailure addr)) | ||
347 | $ do | ||
348 | connect sock addr | ||
349 | laddr <- Socket.getSocketName sock | ||
350 | (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) | ||
351 | h <- socketToHandle sock ReadWriteMode | ||
352 | newConnection server sessionConduits params conkey u h Out | ||
353 | return () | ||
354 | |||
355 | doit server (ConnectWithEndlessRetry addr params interval) = do | ||
356 | proto <- getProtocolNumber "tcp" | ||
357 | void . forkLabeled ("ConnectWithEndlessRetry." ++ show addr) $ do | ||
358 | timer <- interruptibleDelay | ||
359 | (retryVar,action) <- atomically $ do | ||
360 | map <- readTVar (retrymap server) | ||
361 | action <- case Map.lookup addr map of | ||
362 | Nothing -> return $ return () | ||
363 | Just (v,d) -> do writeTVar v False | ||
364 | return $ interruptDelay d | ||
365 | v <- newTVar True | ||
366 | writeTVar (retrymap server) $! Map.insert addr (v,timer) map | ||
367 | return (v,action :: IO ()) | ||
368 | action | ||
369 | fix $ \retryLoop -> do | ||
370 | utc <- getCurrentTime | ||
371 | shouldRetry <- do | ||
372 | handle (\(SomeException e) -> do | ||
373 | -- Exceptions thrown by 'socket' need to be handled specially | ||
374 | -- since we don't have enough information to broadcast a ConnectFailure | ||
375 | -- on serverEvent. | ||
376 | warn $ "Failed to create socket: " <> bshow e | ||
377 | atomically $ readTVar retryVar) $ do | ||
378 | sock <- socket (socketFamily addr) Stream proto | ||
379 | handle (\(SomeException e) -> do | ||
380 | -- Any thing else goes wrong and we broadcast ConnectFailure. | ||
381 | do (conkey,u) <- makeConnKey params (restrictSocket sock,(Local localhost4, Remote addr)) | ||
382 | Socket.close sock | ||
383 | atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr) | ||
384 | `onException` return () | ||
385 | atomically $ readTVar retryVar) $ do | ||
386 | connect sock addr | ||
387 | laddr <- Socket.getSocketName sock | ||
388 | (conkey,u) <- makeConnKey params (restrictSocket sock, (Local laddr, Remote addr)) | ||
389 | h <- socketToHandle sock ReadWriteMode | ||
390 | threads <- newConnection server sessionConduits params conkey u h Out | ||
391 | atomically $ do threadsWait threads | ||
392 | readTVar retryVar | ||
393 | fin_utc <- getCurrentTime | ||
394 | when shouldRetry $ do | ||
395 | let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) | ||
396 | expected = fromIntegral interval | ||
397 | when (shouldRetry && elapsed < expected) $ do | ||
398 | debugNoise $ "Waiting to retry " <> bshow addr | ||
399 | void $ startDelay timer (round $ 1000 * (expected-elapsed)) | ||
400 | debugNoise $ "retry " <> bshow (shouldRetry,addr) | ||
401 | when shouldRetry $ retryLoop | ||
402 | |||
403 | |||
404 | -- INTERNAL ---------------------------------------------------------- | ||
405 | |||
406 | {- | ||
407 | hWriteUntilNothing h outs = | ||
408 | fix $ \loop -> do | ||
409 | mb <- atomically $ takeTMVar outs | ||
410 | case mb of Just bs -> do S.hPutStrLn h bs | ||
411 | warn $ "wrote " <> bs | ||
412 | loop | ||
413 | Nothing -> do warn $ "wrote Nothing" | ||
414 | hClose h | ||
415 | |||
416 | -} | ||
417 | connRead :: ConnectionState -> IO (Maybe ByteString) | ||
418 | connRead (WriteOnlyConnection w) = do | ||
419 | -- atomically $ discardContents (threadsChannel w) | ||
420 | return Nothing | ||
421 | connRead conn = do | ||
422 | c <- atomically $ getThreads | ||
423 | threadsRead c | ||
424 | where | ||
425 | getThreads = | ||
426 | case conn of SaneConnection c -> return c | ||
427 | ReadOnlyConnection c -> return c | ||
428 | ConnectionPair c w -> do | ||
429 | -- discardContents (threadsChannel w) | ||
430 | return c | ||
431 | |||
432 | socketFamily :: SockAddr -> Family | ||
433 | socketFamily (SockAddrInet _ _) = AF_INET | ||
434 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
435 | socketFamily (SockAddrUnix _) = AF_UNIX | ||
436 | |||
437 | |||
438 | conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) | ||
439 | -> ConnectionState | ||
440 | -> ConnectionEvent x | ||
441 | conevent sessionConduits con = Connection pingflag read write | ||
442 | where | ||
443 | pingflag = swapTVar (pingFlag (connPingTimer con)) False | ||
444 | (read,write) = sessionConduits (connRead con) (connWrite con) | ||
445 | |||
446 | newConnection :: Ord a | ||
447 | => Server a u1 releaseKey x | ||
448 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( ConduitT () x IO (), ConduitT (Flush x) Void IO () ) ) | ||
449 | -> ConnectionParameters conkey u | ||
450 | -> a | ||
451 | -> u1 | ||
452 | -> Handle | ||
453 | -> InOrOut | ||
454 | -> IO ConnectionThreads | ||
455 | newConnection server sessionConduits params conkey u h inout = do | ||
456 | hSetBuffering h NoBuffering | ||
457 | let (idle_ms,timeout_ms) = | ||
458 | case (inout,duplex params) of | ||
459 | (Out,False) -> ( 0, 0 ) | ||
460 | _ -> ( pingInterval params | ||
461 | , timeout params ) | ||
462 | |||
463 | new <- do pinglogic <- forkPingMachine "newConnection" idle_ms timeout_ms | ||
464 | connectionThreads h pinglogic | ||
465 | started <- atomically $ newEmptyTMVar | ||
466 | kontvar <- atomically newEmptyTMVar | ||
467 | -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ? | ||
468 | let _ = kontvar :: TMVar (STM (IO ())) | ||
469 | forkLabeled ("connecting...") $ do | ||
470 | getkont <- atomically $ takeTMVar kontvar | ||
471 | kont <- atomically getkont | ||
472 | kont | ||
473 | |||
474 | atomically $ do | ||
475 | current <- fmap (Map.lookup conkey) $ readTVar (conmap server) | ||
476 | case current of | ||
477 | Nothing -> do | ||
478 | (newCon,e) <- return $ | ||
479 | if duplex params | ||
480 | then let newcon = SaneConnection new | ||
481 | in ( newcon, ((conkey,u), conevent sessionConduits newcon) ) | ||
482 | else ( case inout of | ||
483 | In -> ReadOnlyConnection new | ||
484 | Out -> WriteOnlyConnection new | ||
485 | , ((conkey,u), HalfConnection inout) ) | ||
486 | modifyTVar' (conmap server) $ Map.insert conkey | ||
487 | ConnectionRecord { ckont = kontvar | ||
488 | , cstate = newCon | ||
489 | , cdata = u } | ||
490 | announce e | ||
491 | putTMVar kontvar $ return $ do | ||
492 | myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice. | ||
493 | atomically $ putTMVar started () | ||
494 | -- Wait for something interesting. | ||
495 | handleEOF conkey u kontvar newCon | ||
496 | Just what@ConnectionRecord { ckont =mvar }-> do | ||
497 | putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread. | ||
498 | putTMVar mvar $ do | ||
499 | -- The action returned by updateConMap, eventually invokes handleEOF, | ||
500 | -- so the sequencer thread will not be terminated. | ||
501 | kont <- updateConMap conkey u new what | ||
502 | putTMVar started () | ||
503 | return kont | ||
504 | return new | ||
505 | where | ||
506 | |||
507 | announce e = writeTChan (serverEvent server) e | ||
508 | |||
509 | -- This function loops and will not quit unless an action is posted to the | ||
510 | -- mvar that does not in turn invoke this function, or if an EOF occurs. | ||
511 | handleEOF conkey u mvar newCon = do | ||
512 | action <- atomically . foldr1 orElse $ | ||
513 | [ takeTMVar mvar >>= id -- passed continuation | ||
514 | , connWait newCon >> return eof | ||
515 | , connWaitPing newCon >>= return . sendPing | ||
516 | -- , pingWait pingTimer >>= return . sendPing | ||
517 | ] | ||
518 | action :: IO () | ||
519 | where | ||
520 | eof = do | ||
521 | -- warn $ "EOF " <>bshow conkey | ||
522 | connCancelPing newCon | ||
523 | atomically $ do connFlush newCon | ||
524 | announce ((conkey,u),EOF) | ||
525 | modifyTVar' (conmap server) | ||
526 | $ Map.delete conkey | ||
527 | -- warn $ "fin-EOF "<>bshow conkey | ||
528 | |||
529 | sendPing PingTimeOut = do | ||
530 | {- | ||
531 | utc <- getCurrentTime | ||
532 | let utc' = formatTime defaultTimeLocale "%s" utc | ||
533 | warn $ "ping:TIMEOUT " <> bshow utc' | ||
534 | -} | ||
535 | atomically (connClose newCon) | ||
536 | eof | ||
537 | |||
538 | sendPing PingIdle = do | ||
539 | {- | ||
540 | utc <- getCurrentTime | ||
541 | let utc' = formatTime defaultTimeLocale "%s" utc | ||
542 | -- warn $ "ping:IDLE " <> bshow utc' | ||
543 | -} | ||
544 | atomically $ announce ((conkey,u),RequiresPing) | ||
545 | handleEOF conkey u mvar newCon | ||
546 | |||
547 | |||
548 | updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do | ||
549 | new' <- | ||
550 | if duplex params then do | ||
551 | announce ((conkey,u),EOF) | ||
552 | connClose replaced | ||
553 | let newcon = SaneConnection new | ||
554 | announce $ ((conkey,u),conevent sessionConduits newcon) | ||
555 | return $ newcon | ||
556 | else | ||
557 | case replaced of | ||
558 | WriteOnlyConnection w | inout==In -> | ||
559 | do let newcon = ConnectionPair new w | ||
560 | announce ((conkey,u),conevent sessionConduits newcon) | ||
561 | return newcon | ||
562 | ReadOnlyConnection r | inout==Out -> | ||
563 | do let newcon = ConnectionPair r new | ||
564 | announce ((conkey,u),conevent sessionConduits newcon) | ||
565 | return newcon | ||
566 | _ -> do -- connFlush todo | ||
567 | announce ((conkey,u0), EOF) | ||
568 | connClose replaced | ||
569 | announce ((conkey,u), HalfConnection inout) | ||
570 | return $ case inout of | ||
571 | In -> ReadOnlyConnection new | ||
572 | Out -> WriteOnlyConnection new | ||
573 | modifyTVar' (conmap server) $ Map.insert conkey | ||
574 | ConnectionRecord { ckont = mvar | ||
575 | , cstate = new' | ||
576 | , cdata = u } | ||
577 | return $ handleEOF conkey u mvar new' | ||
578 | |||
579 | |||
580 | getPacket :: Handle -> IO ByteString | ||
581 | getPacket h = do hWaitForInput h (-1) | ||
582 | hGetNonBlocking h 1024 | ||
583 | |||
584 | |||
585 | |||
586 | -- | 'ConnectionThreads' is an interface to a pair of threads | ||
587 | -- that are reading and writing a 'Handle'. | ||
588 | data ConnectionThreads = ConnectionThreads | ||
589 | { threadsWriter :: TMVar (Maybe ByteString) | ||
590 | , threadsChannel :: TChan ByteString | ||
591 | , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close | ||
592 | , threadsPing :: PingMachine | ||
593 | } | ||
594 | |||
595 | -- | This spawns the reader and writer threads and returns a newly | ||
596 | -- constructed 'ConnectionThreads' object. | ||
597 | connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads | ||
598 | connectionThreads h pinglogic = do | ||
599 | |||
600 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar | ||
601 | |||
602 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan | ||
603 | readerThread <- forkLabeled "readerThread" $ do | ||
604 | let finished e = do | ||
605 | hClose h | ||
606 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) | ||
607 | -- let _ = fmap ioeGetErrorType e -- type hint | ||
608 | let _ = fmap what e where what (SomeException _) = undefined | ||
609 | atomically $ do tryTakeTMVar outs | ||
610 | putTMVar outs Nothing -- quit writer | ||
611 | putTMVar doner () | ||
612 | handle (finished . Just) $ do | ||
613 | pingBump pinglogic -- start the ping timer | ||
614 | fix $ \loop -> do | ||
615 | packet <- getPacket h | ||
616 | -- warn $ "read: " <> S.take 60 packet | ||
617 | atomically $ writeTChan incomming packet | ||
618 | pingBump pinglogic | ||
619 | -- warn $ "bumped: " <> S.take 60 packet | ||
620 | isEof <- hIsEOF h | ||
621 | if isEof then finished Nothing else loop | ||
622 | |||
623 | writerThread <- forkLabeled "writerThread" . fix $ \loop -> do | ||
624 | let finished = do -- warn $ "finished write" | ||
625 | -- hClose h -- quit reader | ||
626 | throwTo readerThread (ErrorCall "EOF") | ||
627 | atomically $ putTMVar donew () | ||
628 | mb <- atomically $ readTMVar outs | ||
629 | case mb of Just bs -> handle (\(SomeException e)->finished) | ||
630 | (do -- warn $ "writing: " <> S.take 60 bs | ||
631 | S.hPutStr h bs | ||
632 | -- warn $ "wrote: " <> S.take 60 bs | ||
633 | atomically $ takeTMVar outs | ||
634 | loop) | ||
635 | Nothing -> finished | ||
636 | |||
637 | let wait = do readTMVar donew | ||
638 | readTMVar doner | ||
639 | return () | ||
640 | return ConnectionThreads { threadsWriter = outs | ||
641 | , threadsChannel = incomming | ||
642 | , threadsWait = wait | ||
643 | , threadsPing = pinglogic } | ||
644 | |||
645 | |||
646 | -- | 'threadsWrite' writes the given 'ByteString' to the | ||
647 | -- 'ConnectionThreads' object. It blocks until the ByteString | ||
648 | -- is written and 'True' is returned, or the connection is | ||
649 | -- interrupted and 'False' is returned. | ||
650 | threadsWrite :: ConnectionThreads -> ByteString -> IO Bool | ||
651 | threadsWrite c bs = atomically $ | ||
652 | orElse (const False `fmap` threadsWait c) | ||
653 | (const True `fmap` putTMVar (threadsWriter c) (Just bs)) | ||
654 | |||
655 | -- | 'threadsClose' signals for the 'ConnectionThreads' object | ||
656 | -- to quit and close the associated 'Handle'. This operation | ||
657 | -- is non-blocking, follow it with 'threadsWait' if you want | ||
658 | -- to wait for the operation to complete. | ||
659 | threadsClose :: ConnectionThreads -> STM () | ||
660 | threadsClose c = do | ||
661 | let mvar = threadsWriter c | ||
662 | v <- tryReadTMVar mvar | ||
663 | case v of | ||
664 | Just Nothing -> return () -- already closed | ||
665 | _ -> putTMVar mvar Nothing | ||
666 | |||
667 | -- | 'threadsRead' blocks until a 'ByteString' is available which | ||
668 | -- is returned to the caller, or the connection is interrupted and | ||
669 | -- 'Nothing' is returned. | ||
670 | threadsRead :: ConnectionThreads -> IO (Maybe ByteString) | ||
671 | threadsRead c = atomically $ | ||
672 | orElse (const Nothing `fmap` threadsWait c) | ||
673 | (Just `fmap` readTChan (threadsChannel c)) | ||
674 | |||
675 | -- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' | ||
676 | -- or to a pair of 'ConnectionThreads' objects that are considered as one | ||
677 | -- connection. | ||
678 | data ConnectionState = | ||
679 | SaneConnection ConnectionThreads | ||
680 | -- ^ ordinary read/write connection | ||
681 | | WriteOnlyConnection ConnectionThreads | ||
682 | | ReadOnlyConnection ConnectionThreads | ||
683 | | ConnectionPair ConnectionThreads ConnectionThreads | ||
684 | -- ^ Two 'ConnectionThreads' objects, read operations use the | ||
685 | -- first, write operations use the second. | ||
686 | |||
687 | |||
688 | |||
689 | connWrite :: ConnectionState -> ByteString -> IO Bool | ||
690 | connWrite (ReadOnlyConnection _) bs = return False | ||
691 | connWrite conn bs = threadsWrite c bs | ||
692 | where | ||
693 | c = case conn of SaneConnection c -> c | ||
694 | WriteOnlyConnection c -> c | ||
695 | ConnectionPair _ c -> c | ||
696 | |||
697 | |||
698 | mapConn :: Bool -> | ||
699 | (ConnectionThreads -> STM ()) -> ConnectionState -> STM () | ||
700 | mapConn both action c = | ||
701 | case c of | ||
702 | SaneConnection rw -> action rw | ||
703 | ReadOnlyConnection r -> action r | ||
704 | WriteOnlyConnection w -> action w | ||
705 | ConnectionPair r w -> do | ||
706 | rem <- orElse (const w `fmap` action r) | ||
707 | (const r `fmap` action w) | ||
708 | when both $ action rem | ||
709 | |||
710 | connClose :: ConnectionState -> STM () | ||
711 | connClose c = mapConn True threadsClose c | ||
712 | |||
713 | connWait :: ConnectionState -> STM () | ||
714 | connWait c = doit -- mapConn False threadsWait c | ||
715 | where | ||
716 | action = threadsWait | ||
717 | doit = | ||
718 | case c of | ||
719 | SaneConnection rw -> action rw | ||
720 | ReadOnlyConnection r -> action r | ||
721 | WriteOnlyConnection w -> action w | ||
722 | ConnectionPair r w -> do | ||
723 | rem <- orElse (const w `fmap` action r) | ||
724 | (const r `fmap` action w) | ||
725 | threadsClose rem | ||
726 | |||
727 | connPingTimer :: ConnectionState -> PingMachine | ||
728 | connPingTimer c = | ||
729 | case c of | ||
730 | SaneConnection rw -> threadsPing rw | ||
731 | ReadOnlyConnection r -> threadsPing r | ||
732 | WriteOnlyConnection w -> threadsPing w -- should be disabled. | ||
733 | ConnectionPair r w -> threadsPing r | ||
734 | |||
735 | connCancelPing :: ConnectionState -> IO () | ||
736 | connCancelPing c = pingCancel (connPingTimer c) | ||
737 | |||
738 | connWaitPing :: ConnectionState -> STM PingEvent | ||
739 | connWaitPing c = pingWait (connPingTimer c) | ||
740 | |||
741 | connFlush :: ConnectionState -> STM () | ||
742 | connFlush c = | ||
743 | case c of | ||
744 | SaneConnection rw -> waitChan rw | ||
745 | ReadOnlyConnection r -> waitChan r | ||
746 | WriteOnlyConnection w -> return () | ||
747 | ConnectionPair r w -> waitChan r | ||
748 | where | ||
749 | waitChan t = do | ||
750 | b <- isEmptyTChan (threadsChannel t) | ||
751 | when (not b) retry | ||
752 | |||
753 | bshow :: Show a => a -> ByteString | ||
754 | bshow e = S.pack . show $ e | ||
755 | |||
756 | warn :: ByteString -> IO () | ||
757 | warn str =dputB XMisc str | ||
758 | |||
759 | debugNoise :: Monad m => t -> m () | ||
760 | debugNoise str = return () | ||
761 | |||
762 | data TCPStatus = Resolving | AwaitingRead | AwaitingWrite | ||
763 | |||
764 | -- SockAddr -> (SockAddr, ConnectionParameters SockAddr ConnectionData, Miliseconds) | ||
765 | |||
766 | |||
767 | tcpManager :: (PeerAddress -> (SockAddr, ConnectionParameters PeerAddress u, Miliseconds)) | ||
768 | -- -> (String -> Maybe Text) | ||
769 | -- -> (Text -> IO (Maybe PeerAddress)) | ||
770 | -> Server PeerAddress u releaseKey x | ||
771 | -> IO (Manager TCPStatus Text) | ||
772 | tcpManager grokKey sv = do | ||
773 | rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey) | ||
774 | nullping <- forkPingMachine "tcpManager" 0 0 | ||
775 | (rslv,rev) <- do | ||
776 | dns <- newDNSCache | ||
777 | let rslv k = map PeerAddress <$> forwardResolve dns k | ||
778 | rev (PeerAddress addr) = reverseResolve dns addr | ||
779 | return (rslv,rev) | ||
780 | return Manager { | ||
781 | setPolicy = \k -> \case | ||
782 | TryingToConnect -> join $ atomically $ do | ||
783 | r <- readTVar rmap | ||
784 | case Map.lookup k r of | ||
785 | Just {} -> return $ return () -- Connection already in progress. | ||
786 | Nothing -> do | ||
787 | modifyTVar' rmap $ Map.insert k Nothing | ||
788 | return $ void $ forkLabeled ("resolve."++show k) $ do | ||
789 | mconkey <- listToMaybe <$> rslv k | ||
790 | case mconkey of | ||
791 | Nothing -> atomically $ modifyTVar' rmap $ Map.delete k | ||
792 | Just conkey -> do | ||
793 | control sv $ case grokKey conkey of | ||
794 | (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms | ||
795 | OpenToConnect -> dput XMisc "TODO: TCP OpenToConnect" | ||
796 | RefusingToConnect -> dput XMisc "TODO: TCP RefusingToConnect" | ||
797 | , status = \k -> do | ||
798 | c <- readTVar (conmap sv) | ||
799 | ck <- Map.lookup k <$> readTVar rmap | ||
800 | return $ exportConnection c (join ck) | ||
801 | , connections = Map.keys <$> readTVar rmap | ||
802 | , stringToKey = Just . Text.pack | ||
803 | , showProgress = \case | ||
804 | Resolving -> "resolving" | ||
805 | AwaitingRead -> "awaiting inbound" | ||
806 | AwaitingWrite -> "awaiting outbound" | ||
807 | , showKey = show | ||
808 | , resolvePeer = rslv | ||
809 | , reverseAddress = rev | ||
810 | } | ||
811 | |||
812 | exportConnection :: Ord conkey => Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus | ||
813 | exportConnection conmap mkey = G.Connection | ||
814 | { G.connStatus = case mkey of | ||
815 | Nothing -> G.Dormant | ||
816 | Just conkey -> case Map.lookup conkey conmap of | ||
817 | Nothing -> G.InProgress Resolving | ||
818 | Just (ConnectionRecord ckont cstate cdata) -> case cstate of | ||
819 | SaneConnection {} -> G.Established | ||
820 | ConnectionPair {} -> G.Established | ||
821 | ReadOnlyConnection {} -> G.InProgress AwaitingWrite | ||
822 | WriteOnlyConnection {} -> G.InProgress AwaitingRead | ||
823 | , G.connPolicy = TryingToConnect | ||
824 | } | ||