diff options
author | James Crayne <jim.crayne@gmail.com> | 2017-11-20 22:40:50 +0000 |
---|---|---|
committer | James Crayne <jim.crayne@gmail.com> | 2017-11-20 22:52:07 +0000 |
commit | 8c9d1cba722ae6539ec6701d1b0290ed635a55d1 (patch) | |
tree | 8ecdba79d49f9e5d53eb511a2dcd79de1a63f046 /Presence | |
parent | de3223b62bf002232ca659c2738441ad6c1708eb (diff) |
Rename: Server -> Connection.Tcp
Diffstat (limited to 'Presence')
-rw-r--r-- | Presence/Server.hs | 826 | ||||
-rw-r--r-- | Presence/XMPPServer.hs | 2 |
2 files changed, 1 insertions, 827 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs deleted file mode 100644 index c38aec2a..00000000 --- a/Presence/Server.hs +++ /dev/null | |||
@@ -1,826 +0,0 @@ | |||
1 | {-# OPTIONS_HADDOCK prune #-} | ||
2 | {-# LANGUAGE CPP #-} | ||
3 | {-# LANGUAGE DoAndIfThenElse #-} | ||
4 | {-# LANGUAGE FlexibleInstances #-} | ||
5 | {-# LANGUAGE OverloadedStrings #-} | ||
6 | {-# LANGUAGE RankNTypes #-} | ||
7 | {-# LANGUAGE StandaloneDeriving #-} | ||
8 | {-# LANGUAGE TupleSections #-} | ||
9 | {-# LANGUAGE LambdaCase #-} | ||
10 | ----------------------------------------------------------------------------- | ||
11 | -- | | ||
12 | -- Module : Server | ||
13 | -- | ||
14 | -- Maintainer : joe@jerkface.net | ||
15 | -- Stability : experimental | ||
16 | -- | ||
17 | -- A TCP client/server library. | ||
18 | -- | ||
19 | -- TODO: | ||
20 | -- | ||
21 | -- * interface tweaks | ||
22 | -- | ||
23 | module Server | ||
24 | ( module Server | ||
25 | , module PingMachine ) where | ||
26 | |||
27 | import Data.ByteString (ByteString,hGetNonBlocking) | ||
28 | import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) | ||
29 | import Data.Conduit ( Source, Sink, Flush ) | ||
30 | #if MIN_VERSION_containers(0,5,0) | ||
31 | import qualified Data.Map.Strict as Map | ||
32 | import Data.Map.Strict (Map) | ||
33 | #else | ||
34 | import qualified Data.Map as Map | ||
35 | import Data.Map (Map) | ||
36 | #endif | ||
37 | import Data.Monoid ( (<>) ) | ||
38 | #ifdef THREAD_DEBUG | ||
39 | import Control.Concurrent.Lifted.Instrument | ||
40 | #else | ||
41 | import Control.Concurrent.Lifted | ||
42 | import GHC.Conc (labelThread) | ||
43 | #endif | ||
44 | |||
45 | import Control.Concurrent.STM | ||
46 | -- import Control.Concurrent.STM.TMVar | ||
47 | -- import Control.Concurrent.STM.TChan | ||
48 | -- import Control.Concurrent.STM.Delay | ||
49 | import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException) | ||
50 | import Control.Monad | ||
51 | import Control.Monad.Fix | ||
52 | -- import Control.Monad.STM | ||
53 | -- import Control.Monad.Trans.Resource | ||
54 | import Control.Monad.IO.Class (MonadIO (liftIO)) | ||
55 | import System.IO.Error (isDoesNotExistError) | ||
56 | import System.IO | ||
57 | ( IOMode(..) | ||
58 | , hSetBuffering | ||
59 | , BufferMode(..) | ||
60 | , hWaitForInput | ||
61 | , hClose | ||
62 | , hIsEOF | ||
63 | , stderr | ||
64 | , Handle | ||
65 | , hFlush | ||
66 | , hPutStrLn | ||
67 | ) | ||
68 | import Network.Socket as Socket | ||
69 | import Network.BSD | ||
70 | ( getProtocolNumber | ||
71 | ) | ||
72 | import Debug.Trace | ||
73 | import Data.Time.Clock (getCurrentTime,diffUTCTime) | ||
74 | -- import SockAddr () | ||
75 | -- import System.Locale (defaultTimeLocale) | ||
76 | |||
77 | import InterruptibleDelay | ||
78 | import PingMachine | ||
79 | import Network.StreamServer | ||
80 | import Network.SocketLike hiding (sClose) | ||
81 | import qualified Connection as G | ||
82 | ;import Connection (Manager (..), Policy(..)) | ||
83 | |||
84 | |||
85 | type Microseconds = Int | ||
86 | |||
87 | -- | This object is passed with the 'Listen' and 'Connect' | ||
88 | -- instructions in order to control the behavior of the | ||
89 | -- connections that are established. It is parameterized | ||
90 | -- by a user-suplied type @conkey@ that is used as a lookup | ||
91 | -- key for connections. | ||
92 | data ConnectionParameters conkey u = | ||
93 | ConnectionParameters | ||
94 | { pingInterval :: PingInterval | ||
95 | -- ^ The miliseconds of idle to allow before a 'RequiresPing' | ||
96 | -- event is signaled. | ||
97 | , timeout :: TimeOut | ||
98 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled | ||
99 | -- that are necessary for the connection to be considered | ||
100 | -- lost and signalling 'EOF'. | ||
101 | , makeConnKey :: RestrictedSocket -> IO (conkey,u) | ||
102 | -- ^ This action creates a lookup key for a new connection. If 'duplex' | ||
103 | -- is 'True' and the result is already assocatied with an established | ||
104 | -- connection, then an 'EOF' will be forced before the the new | ||
105 | -- connection becomes active. | ||
106 | -- | ||
107 | , duplex :: Bool | ||
108 | -- ^ If True, then the connection will be treated as a normal | ||
109 | -- two-way socket. Otherwise, a readable socket is established | ||
110 | -- with 'Listen' and a writable socket is established with | ||
111 | -- 'Connect' and they are associated when 'makeConnKey' yields | ||
112 | -- same value for each. | ||
113 | } | ||
114 | |||
115 | -- | Use this function to select appropriate default values for | ||
116 | -- 'ConnectionParameters' other than 'makeConnKey'. | ||
117 | -- | ||
118 | -- Current defaults: | ||
119 | -- | ||
120 | -- * 'pingInterval' = 28000 | ||
121 | -- | ||
122 | -- * 'timeout' = 2000 | ||
123 | -- | ||
124 | -- * 'duplex' = True | ||
125 | -- | ||
126 | connectionDefaults | ||
127 | :: (RestrictedSocket -> IO (conkey,u)) -> ConnectionParameters conkey u | ||
128 | connectionDefaults f = ConnectionParameters | ||
129 | { pingInterval = 28000 | ||
130 | , timeout = 2000 | ||
131 | , makeConnKey = f | ||
132 | , duplex = True | ||
133 | } | ||
134 | |||
135 | -- | Instructions for a 'Server' object | ||
136 | -- | ||
137 | -- To issue a command, put it into the 'serverCommand' TMVar. | ||
138 | data ServerInstruction conkey u | ||
139 | = Quit | ||
140 | -- ^ kill the server. This command is automatically issued when | ||
141 | -- the server is released. | ||
142 | | Listen PortNumber (ConnectionParameters conkey u) | ||
143 | -- ^ listen for incoming connections | ||
144 | | Connect SockAddr (ConnectionParameters conkey u) | ||
145 | -- ^ connect to addresses | ||
146 | | ConnectWithEndlessRetry SockAddr | ||
147 | (ConnectionParameters conkey u) | ||
148 | Miliseconds | ||
149 | -- ^ keep retrying the connection | ||
150 | | Ignore PortNumber | ||
151 | -- ^ stop listening on specified port | ||
152 | | Send conkey ByteString | ||
153 | -- ^ send bytes to an established connection | ||
154 | |||
155 | #ifdef TEST | ||
156 | deriving instance Show conkey => Show (ServerInstruction conkey u) | ||
157 | instance Show (a -> b) where show _ = "<function>" | ||
158 | deriving instance Show conkey => Show (ConnectionParameters conkey u) | ||
159 | #endif | ||
160 | |||
161 | -- | This type specifies which which half of a half-duplex | ||
162 | -- connection is of interest. | ||
163 | data InOrOut = In | Out | ||
164 | deriving (Enum,Eq,Ord,Show,Read) | ||
165 | |||
166 | -- | These events may be read from 'serverEvent' TChannel. | ||
167 | -- | ||
168 | data ConnectionEvent b | ||
169 | = Connection (STM Bool) (Source IO b) (Sink (Flush b) IO ()) | ||
170 | -- ^ A new connection was established | ||
171 | | ConnectFailure SockAddr | ||
172 | -- ^ A 'Connect' command failed. | ||
173 | | HalfConnection InOrOut | ||
174 | -- ^ Half of a half-duplex connection is avaliable. | ||
175 | | EOF | ||
176 | -- ^ A connection was terminated | ||
177 | | RequiresPing | ||
178 | -- ^ 'pingInterval' miliseconds of idle was experienced | ||
179 | |||
180 | #ifdef TEST | ||
181 | instance Show (IO a) where show _ = "<IO action>" | ||
182 | instance Show (STM a) where show _ = "<STM action>" | ||
183 | instance Eq (ByteString -> IO Bool) where (==) _ _ = True | ||
184 | instance Eq (IO (Maybe ByteString)) where (==) _ _ = True | ||
185 | instance Eq (STM Bool) where (==) _ _ = True | ||
186 | deriving instance Show b => Show (ConnectionEvent b) | ||
187 | deriving instance Eq b => Eq (ConnectionEvent b) | ||
188 | #endif | ||
189 | |||
190 | -- | This is the per-connection state. | ||
191 | data ConnectionRecord u | ||
192 | = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler | ||
193 | , cstate :: ConnectionState -- ^ used to send/receive data to the connection | ||
194 | , cdata :: u -- ^ user data, stored in the connection map for convenience | ||
195 | } | ||
196 | |||
197 | -- | This object accepts commands and signals events and maintains | ||
198 | -- the list of currently listening ports and established connections. | ||
199 | data Server a u releaseKey b | ||
200 | = Server { serverCommand :: TMVar (ServerInstruction a u) | ||
201 | , serverEvent :: TChan ((a,u), ConnectionEvent b) | ||
202 | , serverReleaseKey :: releaseKey | ||
203 | , conmap :: TVar (Map a (ConnectionRecord u)) | ||
204 | , listenmap :: TVar (Map PortNumber ServerHandle) | ||
205 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) | ||
206 | } | ||
207 | |||
208 | control :: Server a u releaseKey b -> ServerInstruction a u -> IO () | ||
209 | control sv = atomically . putTMVar (serverCommand sv) | ||
210 | |||
211 | type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) | ||
212 | |||
213 | noCleanUp :: MonadIO m => Allocate () m | ||
214 | noCleanUp io _ = ( (,) () ) `liftM` liftIO io | ||
215 | |||
216 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | ||
217 | -- to ensure proper cleanup. For example, | ||
218 | -- | ||
219 | -- > import Server | ||
220 | -- > import Control.Monad.Trans.Resource (runResourceT) | ||
221 | -- > import Control.Monad.IO.Class (liftIO) | ||
222 | -- > import Control.Monad.STM (atomically) | ||
223 | -- > import Control.Concurrent.STM.TMVar (putTMVar) | ||
224 | -- > import Control.Concurrent.STM.TChan (readTChan) | ||
225 | -- > | ||
226 | -- > main = runResourceT $ do | ||
227 | -- > sv <- server allocate | ||
228 | -- > let params = connectionDefaults (return . snd) | ||
229 | -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) | ||
230 | -- > let loop = do | ||
231 | -- > (_,event) <- atomically $ readTChan (serverEvent sv) | ||
232 | -- > case event of | ||
233 | -- > Connection getPingFlag readData writeData -> do | ||
234 | -- > forkIO $ do | ||
235 | -- > fix $ \readLoop -> do | ||
236 | -- > readData >>= mapM $ \bytes -> | ||
237 | -- > putStrLn $ "got: " ++ show bytes | ||
238 | -- > readLoop | ||
239 | -- > case event of EOF -> return () | ||
240 | -- > _ -> loop | ||
241 | -- > liftIO loop | ||
242 | -- | ||
243 | -- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp' | ||
244 | -- to do without automatic cleanup and be sure to remember to write 'Quit' to | ||
245 | -- the 'serverCommand' variable. | ||
246 | server :: | ||
247 | -- forall (m :: * -> *) a u conkey releaseKey. | ||
248 | (Show conkey, MonadIO m, Ord conkey) => | ||
249 | Allocate releaseKey m | ||
250 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) | ||
251 | -> m (Server conkey u releaseKey x) | ||
252 | server allocate sessionConduits = do | ||
253 | (key,cmds) <- allocate (atomically newEmptyTMVar) | ||
254 | (atomically . flip putTMVar Quit) | ||
255 | server <- liftIO . atomically $ do | ||
256 | tchan <- newTChan | ||
257 | conmap <- newTVar Map.empty | ||
258 | listenmap<- newTVar Map.empty | ||
259 | retrymap <- newTVar Map.empty | ||
260 | return Server { serverCommand = cmds | ||
261 | , serverEvent = tchan | ||
262 | , serverReleaseKey = key | ||
263 | , conmap = conmap | ||
264 | , listenmap = listenmap | ||
265 | , retrymap = retrymap | ||
266 | } | ||
267 | liftIO $ do | ||
268 | tid <- forkIO $ fix $ \loop -> do | ||
269 | instr <- atomically $ takeTMVar cmds | ||
270 | -- warn $ "instr = " <> bshow instr | ||
271 | let again = do doit server instr | ||
272 | -- warn $ "finished " <> bshow instr | ||
273 | loop | ||
274 | case instr of Quit -> closeAll server | ||
275 | _ -> again | ||
276 | labelThread tid "server" | ||
277 | return server | ||
278 | where | ||
279 | closeAll server = do | ||
280 | listening <- atomically . readTVar $ listenmap server | ||
281 | mapM_ quitListening (Map.elems listening) | ||
282 | let stopRetry (v,d) = do atomically $ writeTVar v False | ||
283 | interruptDelay d | ||
284 | retriers <- atomically $ do | ||
285 | rmap <- readTVar $ retrymap server | ||
286 | writeTVar (retrymap server) Map.empty | ||
287 | return rmap | ||
288 | mapM_ stopRetry (Map.elems retriers) | ||
289 | cons <- atomically . readTVar $ conmap server | ||
290 | atomically $ mapM_ (connClose . cstate) (Map.elems cons) | ||
291 | atomically $ mapM_ (connWait . cstate) (Map.elems cons) | ||
292 | atomically $ writeTVar (conmap server) Map.empty | ||
293 | |||
294 | |||
295 | doit server (Listen port params) = do | ||
296 | |||
297 | listening <- Map.member port | ||
298 | `fmap` atomically (readTVar $ listenmap server) | ||
299 | when (not listening) $ do | ||
300 | |||
301 | hPutStrLn stderr $ "Started listening on "++show port | ||
302 | |||
303 | let family = AF_INET6 | ||
304 | address = case family of | ||
305 | AF_INET -> SockAddrInet port iNADDR_ANY | ||
306 | AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0 | ||
307 | |||
308 | sserv <- flip streamServer address ServerConfig | ||
309 | { serverWarn = hPutStrLn stderr | ||
310 | , serverSession = \sock _ h -> do | ||
311 | (conkey,u) <- makeConnKey params sock | ||
312 | _ <- newConnection server sessionConduits params conkey u h In | ||
313 | return () | ||
314 | } | ||
315 | |||
316 | atomically $ listenmap server `modifyTVar'` Map.insert port sserv | ||
317 | |||
318 | doit server (Ignore port) = do | ||
319 | hPutStrLn stderr $ "Stopping listen on "++show port | ||
320 | mb <- atomically $ do | ||
321 | map <- readTVar $ listenmap server | ||
322 | modifyTVar' (listenmap server) $ Map.delete port | ||
323 | return $ Map.lookup port map | ||
324 | maybe (return ()) quitListening $ mb | ||
325 | |||
326 | doit server (Send con bs) = do -- . void . forkIO $ do | ||
327 | map <- atomically $ readTVar (conmap server) | ||
328 | let post False = (trace ("cant send: "++show bs) $ return ()) | ||
329 | post True = return () | ||
330 | maybe (post False) | ||
331 | (post <=< flip connWrite bs . cstate) | ||
332 | $ Map.lookup con map | ||
333 | |||
334 | doit server (Connect addr params) = join $ atomically $ do | ||
335 | Map.lookup addr <$> readTVar (retrymap server) | ||
336 | >>= return . \case | ||
337 | Nothing -> forkit | ||
338 | Just (v,d) -> do b <- atomically $ readTVar v | ||
339 | interruptDelay d | ||
340 | when (not b) forkit | ||
341 | where | ||
342 | forkit = void . forkIO $ do | ||
343 | myThreadId >>= flip labelThread ( "Connect." ++ show addr ) | ||
344 | proto <- getProtocolNumber "tcp" | ||
345 | sock <- socket (socketFamily addr) Stream proto | ||
346 | handle (\e -> do -- let t = ioeGetErrorType e | ||
347 | when (isDoesNotExistError e) $ return () -- warn "GOTCHA" | ||
348 | -- warn $ "connect-error: " <> bshow e | ||
349 | (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? | ||
350 | Socket.close sock | ||
351 | atomically | ||
352 | $ writeTChan (serverEvent server) | ||
353 | $ ((conkey,u),ConnectFailure addr)) | ||
354 | $ do | ||
355 | connect sock addr | ||
356 | (conkey,u) <- makeConnKey params (restrictSocket sock) | ||
357 | h <- socketToHandle sock ReadWriteMode | ||
358 | newConnection server sessionConduits params conkey u h Out | ||
359 | return () | ||
360 | |||
361 | doit server (ConnectWithEndlessRetry addr params interval) = do | ||
362 | proto <- getProtocolNumber "tcp" | ||
363 | void . forkIO $ do | ||
364 | myThreadId >>= flip labelThread ("ConnectWithEndlessRetry." ++ show addr) | ||
365 | timer <- interruptibleDelay | ||
366 | (retryVar,action) <- atomically $ do | ||
367 | map <- readTVar (retrymap server) | ||
368 | action <- case Map.lookup addr map of | ||
369 | Nothing -> return $ return () | ||
370 | Just (v,d) -> do writeTVar v False | ||
371 | return $ interruptDelay d | ||
372 | v <- newTVar True | ||
373 | writeTVar (retrymap server) $! Map.insert addr (v,timer) map | ||
374 | return (v,action :: IO ()) | ||
375 | action | ||
376 | fix $ \retryLoop -> do | ||
377 | utc <- getCurrentTime | ||
378 | shouldRetry <- do | ||
379 | handle (\(SomeException e) -> do | ||
380 | -- Exceptions thrown by 'socket' need to be handled specially | ||
381 | -- since we don't have enough information to broadcast a ConnectFailure | ||
382 | -- on serverEvent. | ||
383 | warn $ "Failed to create socket: " <> bshow e | ||
384 | atomically $ readTVar retryVar) $ do | ||
385 | sock <- socket (socketFamily addr) Stream proto | ||
386 | handle (\(SomeException e) -> do | ||
387 | -- Any thing else goes wrong and we broadcast ConnectFailure. | ||
388 | do (conkey,u) <- makeConnKey params (restrictSocket sock) | ||
389 | Socket.close sock | ||
390 | atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr) | ||
391 | `onException` return () | ||
392 | atomically $ readTVar retryVar) $ do | ||
393 | connect sock addr | ||
394 | (conkey,u) <- makeConnKey params (restrictSocket sock) | ||
395 | h <- socketToHandle sock ReadWriteMode | ||
396 | threads <- newConnection server sessionConduits params conkey u h Out | ||
397 | atomically $ do threadsWait threads | ||
398 | readTVar retryVar | ||
399 | fin_utc <- getCurrentTime | ||
400 | when shouldRetry $ do | ||
401 | let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) | ||
402 | expected = fromIntegral interval | ||
403 | when (shouldRetry && elapsed < expected) $ do | ||
404 | debugNoise $ "Waiting to retry " <> bshow addr | ||
405 | void $ startDelay timer (round $ 1000 * (expected-elapsed)) | ||
406 | debugNoise $ "retry " <> bshow (shouldRetry,addr) | ||
407 | when shouldRetry $ retryLoop | ||
408 | |||
409 | |||
410 | -- INTERNAL ---------------------------------------------------------- | ||
411 | |||
412 | {- | ||
413 | hWriteUntilNothing h outs = | ||
414 | fix $ \loop -> do | ||
415 | mb <- atomically $ takeTMVar outs | ||
416 | case mb of Just bs -> do S.hPutStrLn h bs | ||
417 | warn $ "wrote " <> bs | ||
418 | loop | ||
419 | Nothing -> do warn $ "wrote Nothing" | ||
420 | hClose h | ||
421 | |||
422 | -} | ||
423 | connRead :: ConnectionState -> IO (Maybe ByteString) | ||
424 | connRead (WriteOnlyConnection w) = do | ||
425 | -- atomically $ discardContents (threadsChannel w) | ||
426 | return Nothing | ||
427 | connRead conn = do | ||
428 | c <- atomically $ getThreads | ||
429 | threadsRead c | ||
430 | where | ||
431 | getThreads = | ||
432 | case conn of SaneConnection c -> return c | ||
433 | ReadOnlyConnection c -> return c | ||
434 | ConnectionPair c w -> do | ||
435 | -- discardContents (threadsChannel w) | ||
436 | return c | ||
437 | |||
438 | socketFamily :: SockAddr -> Family | ||
439 | socketFamily (SockAddrInet _ _) = AF_INET | ||
440 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
441 | socketFamily (SockAddrUnix _) = AF_UNIX | ||
442 | |||
443 | |||
444 | conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) | ||
445 | -> ConnectionState | ||
446 | -> ConnectionEvent x | ||
447 | conevent sessionConduits con = Connection pingflag read write | ||
448 | where | ||
449 | pingflag = swapTVar (pingFlag (connPingTimer con)) False | ||
450 | (read,write) = sessionConduits (connRead con) (connWrite con) | ||
451 | |||
452 | newConnection :: Ord a | ||
453 | => Server a u1 releaseKey x | ||
454 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) | ||
455 | -> ConnectionParameters conkey u | ||
456 | -> a | ||
457 | -> u1 | ||
458 | -> Handle | ||
459 | -> InOrOut | ||
460 | -> IO ConnectionThreads | ||
461 | newConnection server sessionConduits params conkey u h inout = do | ||
462 | hSetBuffering h NoBuffering | ||
463 | let (idle_ms,timeout_ms) = | ||
464 | case (inout,duplex params) of | ||
465 | (Out,False) -> ( 0, 0 ) | ||
466 | _ -> ( pingInterval params | ||
467 | , timeout params ) | ||
468 | |||
469 | new <- do pinglogic <- forkPingMachine idle_ms timeout_ms | ||
470 | connectionThreads h pinglogic | ||
471 | started <- atomically $ newEmptyTMVar | ||
472 | kontvar <- atomically newEmptyTMVar | ||
473 | -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ? | ||
474 | let _ = kontvar :: TMVar (STM (IO ())) | ||
475 | forkIO $ do | ||
476 | myThreadId >>= flip labelThread ("connecting...") | ||
477 | getkont <- atomically $ takeTMVar kontvar | ||
478 | kont <- atomically getkont | ||
479 | kont | ||
480 | |||
481 | atomically $ do | ||
482 | current <- fmap (Map.lookup conkey) $ readTVar (conmap server) | ||
483 | case current of | ||
484 | Nothing -> do | ||
485 | (newCon,e) <- return $ | ||
486 | if duplex params | ||
487 | then let newcon = SaneConnection new | ||
488 | in ( newcon, ((conkey,u), conevent sessionConduits newcon) ) | ||
489 | else ( case inout of | ||
490 | In -> ReadOnlyConnection new | ||
491 | Out -> WriteOnlyConnection new | ||
492 | , ((conkey,u), HalfConnection inout) ) | ||
493 | modifyTVar' (conmap server) $ Map.insert conkey | ||
494 | ConnectionRecord { ckont = kontvar | ||
495 | , cstate = newCon | ||
496 | , cdata = u } | ||
497 | announce e | ||
498 | putTMVar kontvar $ return $ do | ||
499 | myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice. | ||
500 | atomically $ putTMVar started () | ||
501 | -- Wait for something interesting. | ||
502 | handleEOF conkey u kontvar newCon | ||
503 | Just what@ConnectionRecord { ckont =mvar }-> do | ||
504 | putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread. | ||
505 | putTMVar mvar $ do | ||
506 | -- The action returned by updateConMap, eventually invokes handleEOF, | ||
507 | -- so the sequencer thread will not be terminated. | ||
508 | kont <- updateConMap conkey u new what | ||
509 | putTMVar started () | ||
510 | return kont | ||
511 | return new | ||
512 | where | ||
513 | |||
514 | announce e = writeTChan (serverEvent server) e | ||
515 | |||
516 | -- This function loops and will not quit unless an action is posted to the | ||
517 | -- mvar that does not in turn invoke this function, or if an EOF occurs. | ||
518 | handleEOF conkey u mvar newCon = do | ||
519 | action <- atomically . foldr1 orElse $ | ||
520 | [ takeTMVar mvar >>= id -- passed continuation | ||
521 | , connWait newCon >> return eof | ||
522 | , connWaitPing newCon >>= return . sendPing | ||
523 | -- , pingWait pingTimer >>= return . sendPing | ||
524 | ] | ||
525 | action :: IO () | ||
526 | where | ||
527 | eof = do | ||
528 | -- warn $ "EOF " <>bshow conkey | ||
529 | connCancelPing newCon | ||
530 | atomically $ do connFlush newCon | ||
531 | announce ((conkey,u),EOF) | ||
532 | modifyTVar' (conmap server) | ||
533 | $ Map.delete conkey | ||
534 | -- warn $ "fin-EOF "<>bshow conkey | ||
535 | |||
536 | sendPing PingTimeOut = do | ||
537 | {- | ||
538 | utc <- getCurrentTime | ||
539 | let utc' = formatTime defaultTimeLocale "%s" utc | ||
540 | warn $ "ping:TIMEOUT " <> bshow utc' | ||
541 | -} | ||
542 | atomically (connClose newCon) | ||
543 | eof | ||
544 | |||
545 | sendPing PingIdle = do | ||
546 | {- | ||
547 | utc <- getCurrentTime | ||
548 | let utc' = formatTime defaultTimeLocale "%s" utc | ||
549 | -- warn $ "ping:IDLE " <> bshow utc' | ||
550 | -} | ||
551 | atomically $ announce ((conkey,u),RequiresPing) | ||
552 | handleEOF conkey u mvar newCon | ||
553 | |||
554 | |||
555 | updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do | ||
556 | new' <- | ||
557 | if duplex params then do | ||
558 | announce ((conkey,u),EOF) | ||
559 | connClose replaced | ||
560 | let newcon = SaneConnection new | ||
561 | announce $ ((conkey,u),conevent sessionConduits newcon) | ||
562 | return $ newcon | ||
563 | else | ||
564 | case replaced of | ||
565 | WriteOnlyConnection w | inout==In -> | ||
566 | do let newcon = ConnectionPair new w | ||
567 | announce ((conkey,u),conevent sessionConduits newcon) | ||
568 | return newcon | ||
569 | ReadOnlyConnection r | inout==Out -> | ||
570 | do let newcon = ConnectionPair r new | ||
571 | announce ((conkey,u),conevent sessionConduits newcon) | ||
572 | return newcon | ||
573 | _ -> do -- connFlush todo | ||
574 | announce ((conkey,u0), EOF) | ||
575 | connClose replaced | ||
576 | announce ((conkey,u), HalfConnection inout) | ||
577 | return $ case inout of | ||
578 | In -> ReadOnlyConnection new | ||
579 | Out -> WriteOnlyConnection new | ||
580 | modifyTVar' (conmap server) $ Map.insert conkey | ||
581 | ConnectionRecord { ckont = mvar | ||
582 | , cstate = new' | ||
583 | , cdata = u } | ||
584 | return $ handleEOF conkey u mvar new' | ||
585 | |||
586 | |||
587 | getPacket :: Handle -> IO ByteString | ||
588 | getPacket h = do hWaitForInput h (-1) | ||
589 | hGetNonBlocking h 1024 | ||
590 | |||
591 | |||
592 | |||
593 | -- | 'ConnectionThreads' is an interface to a pair of threads | ||
594 | -- that are reading and writing a 'Handle'. | ||
595 | data ConnectionThreads = ConnectionThreads | ||
596 | { threadsWriter :: TMVar (Maybe ByteString) | ||
597 | , threadsChannel :: TChan ByteString | ||
598 | , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close | ||
599 | , threadsPing :: PingMachine | ||
600 | } | ||
601 | |||
602 | -- | This spawns the reader and writer threads and returns a newly | ||
603 | -- constructed 'ConnectionThreads' object. | ||
604 | connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads | ||
605 | connectionThreads h pinglogic = do | ||
606 | |||
607 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar | ||
608 | |||
609 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan | ||
610 | readerThread <- forkIO $ do | ||
611 | myThreadId >>= flip labelThread ("readerThread") | ||
612 | let finished e = do | ||
613 | hClose h | ||
614 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) | ||
615 | -- let _ = fmap ioeGetErrorType e -- type hint | ||
616 | let _ = fmap what e where what (SomeException _) = undefined | ||
617 | atomically $ do tryTakeTMVar outs | ||
618 | putTMVar outs Nothing -- quit writer | ||
619 | putTMVar doner () | ||
620 | handle (finished . Just) $ do | ||
621 | pingBump pinglogic -- start the ping timer | ||
622 | fix $ \loop -> do | ||
623 | packet <- getPacket h | ||
624 | -- warn $ "read: " <> S.take 60 packet | ||
625 | atomically $ writeTChan incomming packet | ||
626 | pingBump pinglogic | ||
627 | -- warn $ "bumped: " <> S.take 60 packet | ||
628 | isEof <- hIsEOF h | ||
629 | if isEof then finished Nothing else loop | ||
630 | |||
631 | writerThread <- forkIO . fix $ \loop -> do | ||
632 | myThreadId >>= flip labelThread ("writerThread") | ||
633 | let finished = do -- warn $ "finished write" | ||
634 | -- hClose h -- quit reader | ||
635 | throwTo readerThread (ErrorCall "EOF") | ||
636 | atomically $ putTMVar donew () | ||
637 | mb <- atomically $ readTMVar outs | ||
638 | case mb of Just bs -> handle (\(SomeException e)->finished) | ||
639 | (do -- warn $ "writing: " <> S.take 60 bs | ||
640 | S.hPutStr h bs | ||
641 | -- warn $ "wrote: " <> S.take 60 bs | ||
642 | atomically $ takeTMVar outs | ||
643 | loop) | ||
644 | Nothing -> finished | ||
645 | |||
646 | let wait = do readTMVar donew | ||
647 | readTMVar doner | ||
648 | return () | ||
649 | return ConnectionThreads { threadsWriter = outs | ||
650 | , threadsChannel = incomming | ||
651 | , threadsWait = wait | ||
652 | , threadsPing = pinglogic } | ||
653 | |||
654 | |||
655 | -- | 'threadsWrite' writes the given 'ByteString' to the | ||
656 | -- 'ConnectionThreads' object. It blocks until the ByteString | ||
657 | -- is written and 'True' is returned, or the connection is | ||
658 | -- interrupted and 'False' is returned. | ||
659 | threadsWrite :: ConnectionThreads -> ByteString -> IO Bool | ||
660 | threadsWrite c bs = atomically $ | ||
661 | orElse (const False `fmap` threadsWait c) | ||
662 | (const True `fmap` putTMVar (threadsWriter c) (Just bs)) | ||
663 | |||
664 | -- | 'threadsClose' signals for the 'ConnectionThreads' object | ||
665 | -- to quit and close the associated 'Handle'. This operation | ||
666 | -- is non-blocking, follow it with 'threadsWait' if you want | ||
667 | -- to wait for the operation to complete. | ||
668 | threadsClose :: ConnectionThreads -> STM () | ||
669 | threadsClose c = do | ||
670 | let mvar = threadsWriter c | ||
671 | v <- tryReadTMVar mvar | ||
672 | case v of | ||
673 | Just Nothing -> return () -- already closed | ||
674 | _ -> putTMVar mvar Nothing | ||
675 | |||
676 | -- | 'threadsRead' blocks until a 'ByteString' is available which | ||
677 | -- is returned to the caller, or the connection is interrupted and | ||
678 | -- 'Nothing' is returned. | ||
679 | threadsRead :: ConnectionThreads -> IO (Maybe ByteString) | ||
680 | threadsRead c = atomically $ | ||
681 | orElse (const Nothing `fmap` threadsWait c) | ||
682 | (Just `fmap` readTChan (threadsChannel c)) | ||
683 | |||
684 | -- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' | ||
685 | -- or to a pair of 'ConnectionThreads' objects that are considered as one | ||
686 | -- connection. | ||
687 | data ConnectionState = | ||
688 | SaneConnection ConnectionThreads | ||
689 | -- ^ ordinary read/write connection | ||
690 | | WriteOnlyConnection ConnectionThreads | ||
691 | | ReadOnlyConnection ConnectionThreads | ||
692 | | ConnectionPair ConnectionThreads ConnectionThreads | ||
693 | -- ^ Two 'ConnectionThreads' objects, read operations use the | ||
694 | -- first, write operations use the second. | ||
695 | |||
696 | |||
697 | |||
698 | connWrite :: ConnectionState -> ByteString -> IO Bool | ||
699 | connWrite (ReadOnlyConnection _) bs = return False | ||
700 | connWrite conn bs = threadsWrite c bs | ||
701 | where | ||
702 | c = case conn of SaneConnection c -> c | ||
703 | WriteOnlyConnection c -> c | ||
704 | ConnectionPair _ c -> c | ||
705 | |||
706 | |||
707 | mapConn :: Bool -> | ||
708 | (ConnectionThreads -> STM ()) -> ConnectionState -> STM () | ||
709 | mapConn both action c = | ||
710 | case c of | ||
711 | SaneConnection rw -> action rw | ||
712 | ReadOnlyConnection r -> action r | ||
713 | WriteOnlyConnection w -> action w | ||
714 | ConnectionPair r w -> do | ||
715 | rem <- orElse (const w `fmap` action r) | ||
716 | (const r `fmap` action w) | ||
717 | when both $ action rem | ||
718 | |||
719 | connClose :: ConnectionState -> STM () | ||
720 | connClose c = mapConn True threadsClose c | ||
721 | |||
722 | connWait :: ConnectionState -> STM () | ||
723 | connWait c = doit -- mapConn False threadsWait c | ||
724 | where | ||
725 | action = threadsWait | ||
726 | doit = | ||
727 | case c of | ||
728 | SaneConnection rw -> action rw | ||
729 | ReadOnlyConnection r -> action r | ||
730 | WriteOnlyConnection w -> action w | ||
731 | ConnectionPair r w -> do | ||
732 | rem <- orElse (const w `fmap` action r) | ||
733 | (const r `fmap` action w) | ||
734 | threadsClose rem | ||
735 | |||
736 | connPingTimer :: ConnectionState -> PingMachine | ||
737 | connPingTimer c = | ||
738 | case c of | ||
739 | SaneConnection rw -> threadsPing rw | ||
740 | ReadOnlyConnection r -> threadsPing r | ||
741 | WriteOnlyConnection w -> threadsPing w -- should be disabled. | ||
742 | ConnectionPair r w -> threadsPing r | ||
743 | |||
744 | connCancelPing :: ConnectionState -> IO () | ||
745 | connCancelPing c = pingCancel (connPingTimer c) | ||
746 | |||
747 | connWaitPing :: ConnectionState -> STM PingEvent | ||
748 | connWaitPing c = pingWait (connPingTimer c) | ||
749 | |||
750 | connFlush :: ConnectionState -> STM () | ||
751 | connFlush c = | ||
752 | case c of | ||
753 | SaneConnection rw -> waitChan rw | ||
754 | ReadOnlyConnection r -> waitChan r | ||
755 | WriteOnlyConnection w -> return () | ||
756 | ConnectionPair r w -> waitChan r | ||
757 | where | ||
758 | waitChan t = do | ||
759 | b <- isEmptyTChan (threadsChannel t) | ||
760 | when (not b) retry | ||
761 | |||
762 | bshow :: Show a => a -> ByteString | ||
763 | bshow e = S.pack . show $ e | ||
764 | |||
765 | warn :: ByteString -> IO () | ||
766 | warn str = S.hPutStrLn stderr str >> hFlush stderr | ||
767 | |||
768 | debugNoise :: Monad m => t -> m () | ||
769 | debugNoise str = return () | ||
770 | |||
771 | data TCPStatus = Resolving | AwaitingRead | AwaitingWrite | ||
772 | |||
773 | tcpManager :: ( Show k, Ord k, Ord conkey ) => | ||
774 | (conkey -> (SockAddr, ConnectionParameters conkey u, Miliseconds)) | ||
775 | -> (String -> Maybe k) | ||
776 | -> (k -> IO (Maybe conkey)) | ||
777 | -> Server conkey u releaseKey x | ||
778 | -> IO (Manager TCPStatus k) | ||
779 | tcpManager grokKey s2k resolvKey sv = do | ||
780 | rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey) | ||
781 | nullping <- forkPingMachine 0 0 | ||
782 | return Manager { | ||
783 | setPolicy = \k -> \case | ||
784 | TryingToConnect -> join $ atomically $ do | ||
785 | r <- readTVar rmap | ||
786 | case Map.lookup k r of | ||
787 | Just {} -> return $ return () -- Connection already in progress. | ||
788 | Nothing -> do | ||
789 | modifyTVar' rmap $ Map.insert k Nothing | ||
790 | return $ void $ forkIO $ do | ||
791 | myThreadId >>= flip labelThread ("resolve."++show k) | ||
792 | mconkey <- resolvKey k | ||
793 | case mconkey of | ||
794 | Nothing -> atomically $ modifyTVar' rmap $ Map.delete k | ||
795 | Just conkey -> do | ||
796 | control sv $ case grokKey conkey of | ||
797 | (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms | ||
798 | OpenToConnect -> hPutStrLn stderr "TODO: TCP OpenToConnect" | ||
799 | RefusingToConnect -> hPutStrLn stderr "TODO: TCP RefusingToConnect" | ||
800 | , connections = do | ||
801 | c <- readTVar $ conmap sv | ||
802 | fmap (exportConnection nullping c) <$> readTVar rmap | ||
803 | , stringToKey = s2k | ||
804 | , showProgress = \case | ||
805 | Resolving -> "resolving" | ||
806 | AwaitingRead -> "awaiting inbound" | ||
807 | AwaitingWrite -> "awaiting outbound" | ||
808 | , showKey = show | ||
809 | } | ||
810 | |||
811 | exportConnection :: Ord conkey => PingMachine -> Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus | ||
812 | exportConnection nullping conmap mkey = G.Connection | ||
813 | { G.connStatus = case mkey of | ||
814 | Nothing -> return $ G.Dormant | ||
815 | Just conkey -> case Map.lookup conkey conmap of | ||
816 | Nothing -> return $ G.InProgress Resolving | ||
817 | Just (ConnectionRecord ckont cstate cdata) -> return $ case cstate of | ||
818 | SaneConnection {} -> G.Established | ||
819 | ConnectionPair {} -> G.Established | ||
820 | ReadOnlyConnection {} -> G.InProgress AwaitingWrite | ||
821 | WriteOnlyConnection {} -> G.InProgress AwaitingRead | ||
822 | , G.connPolicy = return TryingToConnect | ||
823 | , G.connPingLogic = case mkey >>= flip Map.lookup conmap of | ||
824 | Nothing -> nullping | ||
825 | Just (ConnectionRecord _ cstate _) -> connPingTimer cstate | ||
826 | } | ||
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index 6d6d3bd7..5a0ed20e 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -34,7 +34,7 @@ module XMPPServer | |||
34 | import ConnectionKey | 34 | import ConnectionKey |
35 | import qualified Control.Concurrent.STM.UpdateStream as Slotted | 35 | import qualified Control.Concurrent.STM.UpdateStream as Slotted |
36 | import Nesting | 36 | import Nesting |
37 | import Server | 37 | import Connection.Tcp |
38 | import EventUtil | 38 | import EventUtil |
39 | import ControlMaybe | 39 | import ControlMaybe |
40 | import LockedChan | 40 | import LockedChan |