diff options
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r-- | Presence/Server.hs | 640 |
1 files changed, 640 insertions, 0 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs new file mode 100644 index 00000000..da0cda17 --- /dev/null +++ b/Presence/Server.hs | |||
@@ -0,0 +1,640 @@ | |||
1 | {-# OPTIONS_HADDOCK prune #-} | ||
2 | {-# LANGUAGE CPP #-} | ||
3 | {-# LANGUAGE StandaloneDeriving #-} | ||
4 | {-# LANGUAGE OverloadedStrings #-} | ||
5 | {-# LANGUAGE TupleSections #-} | ||
6 | ----------------------------------------------------------------------------- | ||
7 | -- | | ||
8 | -- Module : Server | ||
9 | -- | ||
10 | -- Maintainer : joe@jerkface.net | ||
11 | -- Stability : experimental | ||
12 | -- | ||
13 | -- A TCP client/server library. | ||
14 | -- | ||
15 | -- TODO: | ||
16 | -- | ||
17 | -- * interface tweaks | ||
18 | -- | ||
19 | module Server where | ||
20 | |||
21 | import Data.ByteString (ByteString,hGetNonBlocking) | ||
22 | import qualified Data.ByteString.Char8 as S ( hPutStrLn, hPutStr, pack) | ||
23 | #if MIN_VERSION_containers(0,5,0) | ||
24 | import qualified Data.Map.Strict as Map | ||
25 | import Data.Map.Strict (Map) | ||
26 | #else | ||
27 | import qualified Data.Map as Map | ||
28 | import Data.Map (Map) | ||
29 | #endif | ||
30 | import Data.Monoid ( (<>) ) | ||
31 | import Control.Concurrent | ||
32 | import Control.Concurrent.STM | ||
33 | -- import Control.Concurrent.STM.TMVar | ||
34 | -- import Control.Concurrent.STM.TChan | ||
35 | -- import Control.Concurrent.STM.Delay | ||
36 | import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,ErrorCall(..)) | ||
37 | import Control.Monad | ||
38 | import Control.Monad.Fix | ||
39 | -- import Control.Monad.STM | ||
40 | import Control.Monad.Trans.Resource | ||
41 | import Control.Monad.IO.Class (MonadIO (liftIO)) | ||
42 | import System.IO.Error (ioeGetErrorType) | ||
43 | import System.IO | ||
44 | ( IOMode(..) | ||
45 | , hSetBuffering | ||
46 | , BufferMode(..) | ||
47 | , hWaitForInput | ||
48 | , hClose | ||
49 | , hIsEOF | ||
50 | , stderr | ||
51 | , Handle | ||
52 | , hFlush | ||
53 | ) | ||
54 | import Network.Socket | ||
55 | import Network.BSD | ||
56 | ( getProtocolNumber | ||
57 | ) | ||
58 | import Debug.Trace | ||
59 | |||
60 | todo = error "unimplemented" | ||
61 | |||
62 | type TimeOut = Int -- ^ miliseconds | ||
63 | type PingInterval = Int -- ^ miliseconds | ||
64 | |||
65 | -- | This object is passed with the 'Listen' and 'Connect' | ||
66 | -- instructions in order to control the behavior of the | ||
67 | -- connections that are established. It is parameterized | ||
68 | -- by a user-suplied type @conkey@ that is used as a lookup | ||
69 | -- key for connections. | ||
70 | data ConnectionParameters conkey = | ||
71 | ConnectionParameters | ||
72 | { pingInterval :: PingInterval | ||
73 | -- ^ The miliseconds of idle to allow before a 'RequiresPing' | ||
74 | -- event is signaled. | ||
75 | , timeout :: TimeOut | ||
76 | -- ^ The miliseconds of idle after 'RequiresPing' is signaled | ||
77 | -- that are necessary for the connection to be considered | ||
78 | -- lost and signalling 'EOF'. | ||
79 | , makeConnKey :: (Socket,SockAddr) -> IO conkey | ||
80 | -- ^ This action creates a lookup key for a new connection. If 'duplex' | ||
81 | -- is 'True' and the result is already assocatied with an established | ||
82 | -- connection, then an 'EOF' will be forced before the the new | ||
83 | -- connection becomes active. | ||
84 | -- | ||
85 | , duplex :: Bool | ||
86 | -- ^ If True, then the connection will be treated as a normal | ||
87 | -- two-way socket. Otherwise, a readable socket is established | ||
88 | -- with 'Listen' and a writable socket is established with | ||
89 | -- 'Connect' and they are associated when 'makeConnKey' yields | ||
90 | -- same value for each. | ||
91 | } | ||
92 | |||
93 | -- | Use this function to select appropriate default values for | ||
94 | -- 'ConnectionParameters' other than 'makeConnKey'. | ||
95 | -- | ||
96 | -- Current defaults: | ||
97 | -- | ||
98 | -- * 'pingInterval' = 28000 | ||
99 | -- | ||
100 | -- * 'timeout' = 2000 | ||
101 | -- | ||
102 | -- * 'duplex' = True | ||
103 | -- | ||
104 | connectionDefaults | ||
105 | :: ((Socket, SockAddr) -> IO conkey) -> ConnectionParameters conkey | ||
106 | connectionDefaults f = ConnectionParameters | ||
107 | { pingInterval = 28000 | ||
108 | , timeout = 2000 | ||
109 | , makeConnKey = f | ||
110 | , duplex = True | ||
111 | } | ||
112 | |||
113 | -- | Instructions for a 'Server' object | ||
114 | -- | ||
115 | -- To issue a command, put it into the 'serverCommand' TMVar. | ||
116 | data ServerInstruction conkey | ||
117 | = Quit | ||
118 | -- ^ kill the server. This command is automatically issued when | ||
119 | -- the server is released. | ||
120 | | Listen PortNumber (ConnectionParameters conkey) | ||
121 | -- ^ listen for incomming connections | ||
122 | | Connect SockAddr (ConnectionParameters conkey) | ||
123 | -- ^ connect to addresses | ||
124 | | Ignore PortNumber | ||
125 | -- ^ stop listening on specified port | ||
126 | | Send conkey ByteString | ||
127 | -- ^ send bytes to an established connection | ||
128 | |||
129 | #ifdef TEST | ||
130 | deriving instance Show conkey => Show (ServerInstruction conkey) | ||
131 | instance Show (a -> b) where show _ = "<function>" | ||
132 | deriving instance Show conkey => Show (ConnectionParameters conkey) | ||
133 | #endif | ||
134 | |||
135 | -- | This type specifies which which half of a half-duplex | ||
136 | -- connection is of interest. | ||
137 | data InOrOut = In | Out | ||
138 | deriving (Enum,Eq,Ord,Show,Read) | ||
139 | |||
140 | -- | These events may be read from 'serverEvent' TChannel. | ||
141 | -- | ||
142 | data ConnectionEvent b | ||
143 | = Got b | ||
144 | -- ^ Arrival of data from a socket | ||
145 | | Connection | ||
146 | -- ^ A new connection was established | ||
147 | | HalfConnection InOrOut | ||
148 | -- ^ Half of a half-duplex connection is avaliable. | ||
149 | | EOF | ||
150 | -- ^ A connection was terminated | ||
151 | | RequiresPing | ||
152 | -- ^ 'pingInterval' miliseconds of idle was experienced | ||
153 | |||
154 | deriving instance Show b => Show (ConnectionEvent b) | ||
155 | deriving instance Eq b => Eq (ConnectionEvent b) | ||
156 | |||
157 | -- | This object accepts commands and signals events and maintains | ||
158 | -- the list of currently listening ports and established connections. | ||
159 | data Server a | ||
160 | = Server { serverCommand :: TMVar (ServerInstruction a) | ||
161 | , serverEvent :: TChan (a, ConnectionEvent ByteString) | ||
162 | , serverReleaseKey :: ReleaseKey | ||
163 | , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) | ||
164 | , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) | ||
165 | } | ||
166 | |||
167 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | ||
168 | -- to ensure proper cleanup. For example, | ||
169 | -- | ||
170 | -- > import Server | ||
171 | -- > import Control.Monad.Trans.Resource (runResourceT) | ||
172 | -- > import Control.Monad.IO.Class (liftIO) | ||
173 | -- > import Control.Monad.STM (atomically) | ||
174 | -- > import Control.Concurrent.STM.TMVar (putTMVar) | ||
175 | -- > import Control.Concurrent.STM.TChan (readTChan) | ||
176 | -- > | ||
177 | -- > main = runResourceT $ do | ||
178 | -- > sv <- server | ||
179 | -- > let params = connectionDefaults (return . snd) | ||
180 | -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) | ||
181 | -- > let loop = do | ||
182 | -- > (_,event) <- atomically $ readTChan (serverEvent sv) | ||
183 | -- > case event of | ||
184 | -- > Got bytes -> putStrLn $ "got: " ++ show bytes | ||
185 | -- > _ -> return () | ||
186 | -- > case event of EOF -> return () | ||
187 | -- > _ -> loop | ||
188 | -- > liftIO loop | ||
189 | server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a) | ||
190 | server = do | ||
191 | (key,cmds) <- allocate (atomically newEmptyTMVar) | ||
192 | (atomically . flip putTMVar Quit) | ||
193 | server <- liftIO . atomically $ do | ||
194 | tchan <- newTChan | ||
195 | conmap <- newTVar Map.empty | ||
196 | listenmap<- newTVar Map.empty | ||
197 | return Server { serverCommand = cmds | ||
198 | , serverEvent = tchan | ||
199 | , serverReleaseKey = key | ||
200 | , conmap = conmap | ||
201 | , listenmap = listenmap | ||
202 | } | ||
203 | liftIO $ do | ||
204 | forkIO $ fix $ \loop -> do | ||
205 | instr <- atomically $ takeTMVar cmds | ||
206 | -- warn $ "instr = " <> bshow instr | ||
207 | let again = do doit server instr | ||
208 | -- warn $ "finished " <> bshow instr | ||
209 | loop | ||
210 | case instr of Quit -> closeAll server | ||
211 | _ -> again | ||
212 | return server | ||
213 | where | ||
214 | closeAll server = liftIO $ do | ||
215 | listening <- atomically . readTVar $ listenmap server | ||
216 | mapM_ killListener (Map.elems listening) | ||
217 | cons <- atomically . readTVar $ conmap server | ||
218 | atomically $ mapM_ (connClose . snd) (Map.elems cons) | ||
219 | atomically $ mapM_ (connWait . snd) (Map.elems cons) | ||
220 | atomically $ writeTVar (conmap server) Map.empty | ||
221 | |||
222 | |||
223 | doit server (Listen port params) = liftIO $ do | ||
224 | |||
225 | listening <- Map.member port | ||
226 | `fmap` atomically (readTVar $ listenmap server) | ||
227 | when (not listening) $ do | ||
228 | |||
229 | let family = AF_INET6 | ||
230 | |||
231 | sock <- socket family Stream 0 | ||
232 | setSocketOption sock ReuseAddr 1 | ||
233 | let address = | ||
234 | case family of | ||
235 | AF_INET -> SockAddrInet port iNADDR_ANY | ||
236 | AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0 | ||
237 | fix $ \loop -> do | ||
238 | handle (\(SomeException e)-> do | ||
239 | warn $ "BIND-ERROR:"<>bshow address <> " " <> bshow e | ||
240 | threadDelay 5000000 | ||
241 | loop) | ||
242 | $ bindSocket sock address | ||
243 | listen sock 2 | ||
244 | thread <- forkIO $ acceptLoop server params sock | ||
245 | atomically $ listenmap server `modifyTVar'` Map.insert port (thread,sock) | ||
246 | |||
247 | doit server (Ignore port) = liftIO $ do | ||
248 | mb <- atomically $ do | ||
249 | map <- readTVar $ listenmap server | ||
250 | modifyTVar' (listenmap server) $ Map.delete port | ||
251 | return $ Map.lookup port map | ||
252 | maybe (return ()) killListener $ mb | ||
253 | |||
254 | doit server (Send con bs) = liftIO $ do -- . void . forkIO $ do | ||
255 | map <- atomically $ readTVar (conmap server) | ||
256 | let post False = (trace ("cant send: "++show bs) $ return ()) | ||
257 | post True = return () | ||
258 | maybe (post False) | ||
259 | (post <=< flip connWrite bs . snd) | ||
260 | $ Map.lookup con map | ||
261 | |||
262 | doit server (Connect addr params) = liftIO $ do | ||
263 | void . forkIO $ do | ||
264 | proto <- getProtocolNumber "tcp" | ||
265 | sock <- bracketOnError | ||
266 | (socket (socketFamily addr) Stream proto) | ||
267 | (sClose . trace "connect-error" ) -- only done if there's an error | ||
268 | $ \sock -> do connect sock addr | ||
269 | return sock | ||
270 | me <- getSocketName sock | ||
271 | conkey <- makeConnKey params (sock,me) | ||
272 | h <- socketToHandle sock ReadWriteMode | ||
273 | newConnection server params conkey h Out | ||
274 | |||
275 | |||
276 | -- INTERNAL ---------------------------------------------------------- | ||
277 | |||
278 | {- | ||
279 | hWriteUntilNothing h outs = | ||
280 | fix $ \loop -> do | ||
281 | mb <- atomically $ takeTMVar outs | ||
282 | case mb of Just bs -> do S.hPutStrLn h bs | ||
283 | warn $ "wrote " <> bs | ||
284 | loop | ||
285 | Nothing -> do warn $ "wrote Nothing" | ||
286 | hClose h | ||
287 | |||
288 | connRead :: ConnectionState -> IO (Maybe ByteString) | ||
289 | connRead (WriteOnlyConnection w) = do | ||
290 | atomically $ discardContents (threadsChannel w) | ||
291 | return Nothing | ||
292 | connRead conn = do | ||
293 | c <- atomically $ getThreads | ||
294 | threadsRead c | ||
295 | where | ||
296 | getThreads = | ||
297 | case conn of SaneConnection c -> return c | ||
298 | ReadOnlyConnection c -> return c | ||
299 | ConnectionPair c w -> do | ||
300 | discardContents (threadsChannel w) | ||
301 | return c | ||
302 | -} | ||
303 | |||
304 | socketFamily (SockAddrInet _ _) = AF_INET | ||
305 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
306 | socketFamily (SockAddrUnix _) = AF_UNIX | ||
307 | |||
308 | killListener (thread,sock) = do sClose sock | ||
309 | -- killThread thread | ||
310 | |||
311 | |||
312 | newConnection server params conkey h inout = do | ||
313 | hSetBuffering h NoBuffering | ||
314 | new <- connectionThreads h | ||
315 | started <- atomically $ newEmptyTMVar | ||
316 | kontvar <- atomically newEmptyTMVar | ||
317 | forkIO $ do | ||
318 | getkont <- atomically $ takeTMVar kontvar | ||
319 | kont <- atomically getkont | ||
320 | kont | ||
321 | |||
322 | pinglogic <- pingMachine (pingInterval params) (timeout params) | ||
323 | atomically $ do | ||
324 | current <- fmap (Map.lookup conkey) $ readTVar (conmap server) | ||
325 | case current of | ||
326 | Nothing -> do | ||
327 | (newCon,e) <- return $ | ||
328 | if duplex params | ||
329 | then ( SaneConnection new, (conkey, Connection) ) | ||
330 | else ( case inout of | ||
331 | In -> ReadOnlyConnection new | ||
332 | Out -> WriteOnlyConnection new | ||
333 | , (conkey, HalfConnection inout) ) | ||
334 | modifyTVar' (conmap server) $ Map.insert conkey (kontvar,newCon) | ||
335 | announce e | ||
336 | putTMVar kontvar $ return $ do | ||
337 | atomically $ putTMVar started () | ||
338 | handleEOF conkey kontvar newCon pinglogic | ||
339 | Just what@(mvar,_) -> do | ||
340 | putTMVar kontvar $ return $ return () | ||
341 | putTMVar mvar $ do | ||
342 | kont <- updateConMap conkey new what pinglogic | ||
343 | putTMVar started () | ||
344 | return kont | ||
345 | forkIO $ do -- inout==In || duplex params then forkIO $ do | ||
346 | -- warn $ "waiting read thread: " <> bshow (conkey,inout) | ||
347 | let forward = | ||
348 | case (inout,duplex params) of | ||
349 | (Out,True) -> const $ return () | ||
350 | _ -> announce . (conkey,) . Got | ||
351 | atomically $ takeTMVar started | ||
352 | pingBump pinglogic -- start the ping timer | ||
353 | fix $ \loop -> do | ||
354 | -- warn $ "read thread: " <> bshow (conkey,inout) | ||
355 | mb <- threadsRead new | ||
356 | pingBump pinglogic | ||
357 | -- warn $ "got: " <> bshow (mb,(conkey,inout)) | ||
358 | maybe (return ()) | ||
359 | (atomically . forward >=> const loop) | ||
360 | mb | ||
361 | return () | ||
362 | where | ||
363 | |||
364 | announce e = writeTChan (serverEvent server) e | ||
365 | |||
366 | handleEOF conkey mvar newCon pingTimer = do | ||
367 | action <- atomically . foldr1 orElse $ | ||
368 | [ takeTMVar mvar >>= id -- passed continuation | ||
369 | , connWait newCon >> return eof | ||
370 | , pingWait pingTimer >>= return . sendPing | ||
371 | ] | ||
372 | action :: IO () | ||
373 | where | ||
374 | eof = do | ||
375 | -- warn $ "EOF " <>bshow conkey | ||
376 | pingCancel pingTimer -- force cleanup of ping thread | ||
377 | atomically $ do connFlush newCon | ||
378 | announce (conkey,EOF) | ||
379 | modifyTVar' (conmap server) | ||
380 | $ Map.delete conkey | ||
381 | -- warn $ "fin-EOF "<>bshow conkey | ||
382 | |||
383 | sendPing PingTimeOut = do atomically (connClose newCon) | ||
384 | eof | ||
385 | sendPing PingIdle = do | ||
386 | atomically . announce $ (conkey,RequiresPing) | ||
387 | handleEOF conkey mvar newCon pingTimer | ||
388 | |||
389 | |||
390 | updateConMap conkey new (mvar,replaced) pingTimer = do | ||
391 | new' <- | ||
392 | if duplex params then do | ||
393 | announce (conkey,EOF) | ||
394 | connClose replaced | ||
395 | announce $ (conkey,Connection) | ||
396 | return $ SaneConnection new | ||
397 | else | ||
398 | case replaced of | ||
399 | WriteOnlyConnection w | inout==In -> | ||
400 | do announce (conkey,Connection) | ||
401 | return $ ConnectionPair new w | ||
402 | ReadOnlyConnection r | inout==Out -> | ||
403 | do announce (conkey,Connection) | ||
404 | return $ ConnectionPair r new | ||
405 | _ -> do -- connFlush todo | ||
406 | announce (conkey, EOF) | ||
407 | connClose replaced | ||
408 | announce (conkey, HalfConnection inout) | ||
409 | return $ case inout of | ||
410 | In -> ReadOnlyConnection new | ||
411 | Out -> WriteOnlyConnection new | ||
412 | modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') | ||
413 | return $ handleEOF conkey mvar new' pingTimer | ||
414 | |||
415 | acceptLoop server params sock = handle (acceptException server params sock) $ do | ||
416 | con <- accept sock | ||
417 | conkey <- makeConnKey params con | ||
418 | h <- socketToHandle (fst con) ReadWriteMode | ||
419 | newConnection server params conkey h In | ||
420 | acceptLoop server params sock | ||
421 | |||
422 | acceptException server params sock ioerror = do | ||
423 | sClose sock | ||
424 | case show (ioeGetErrorType ioerror) of | ||
425 | "resource exhausted" -> do -- try again | ||
426 | warn ("acceptLoop: resource exhasted") | ||
427 | threadDelay 500000 | ||
428 | acceptLoop server params sock | ||
429 | "invalid argument" -> do -- quit on closed socket | ||
430 | return () | ||
431 | message -> do -- unexpected exception | ||
432 | warn ("acceptLoop: "<>bshow message) | ||
433 | return () | ||
434 | |||
435 | |||
436 | |||
437 | getPacket h = do hWaitForInput h (-1) | ||
438 | hGetNonBlocking h 1024 | ||
439 | |||
440 | |||
441 | |||
442 | -- | 'ConnectionThreads' is an interface to a pair of threads | ||
443 | -- that are reading and writing a 'Handle'. | ||
444 | data ConnectionThreads = ConnectionThreads | ||
445 | { threadsWriter :: TMVar (Maybe ByteString) | ||
446 | , threadsChannel :: TChan ByteString | ||
447 | , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close | ||
448 | } | ||
449 | |||
450 | -- | This spawns the reader and writer threads and returns a newly | ||
451 | -- constructed 'ConnectionThreads' object. | ||
452 | connectionThreads :: Handle -> IO ConnectionThreads | ||
453 | connectionThreads h = do | ||
454 | |||
455 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar | ||
456 | writerThread <- forkIO . fix $ \loop -> do | ||
457 | let finished = do -- warn $ "finished write" | ||
458 | hClose h -- quit reader | ||
459 | atomically $ putTMVar donew () | ||
460 | mb <- atomically $ readTMVar outs | ||
461 | case mb of Just bs -> handle (\(SomeException e)->finished) | ||
462 | (do S.hPutStr h bs | ||
463 | atomically $ takeTMVar outs | ||
464 | loop) | ||
465 | Nothing -> finished | ||
466 | |||
467 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan | ||
468 | readerThread <- forkIO $ do | ||
469 | let finished e = do | ||
470 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) | ||
471 | let _ = fmap ioeGetErrorType e -- type hint | ||
472 | atomically $ do putTMVar outs Nothing -- quit writer | ||
473 | putTMVar doner () | ||
474 | handle (finished . Just) . fix $ \loop -> do | ||
475 | packet <- getPacket h | ||
476 | atomically $ writeTChan incomming packet | ||
477 | isEof <- liftIO $ hIsEOF h | ||
478 | if isEof then finished Nothing else loop | ||
479 | |||
480 | let wait = do readTMVar donew | ||
481 | readTMVar doner | ||
482 | return () | ||
483 | return ConnectionThreads { threadsWriter = outs | ||
484 | , threadsChannel = incomming | ||
485 | , threadsWait = wait } | ||
486 | |||
487 | |||
488 | -- | 'threadsWrite' writes the given 'ByteString' to the | ||
489 | -- 'ConnectionThreads' object. It blocks until the ByteString | ||
490 | -- is written and 'True' is returned, or the connection is | ||
491 | -- interrupted and 'False' is returned. | ||
492 | threadsWrite :: ConnectionThreads -> ByteString -> IO Bool | ||
493 | threadsWrite c bs = atomically $ | ||
494 | orElse (const False `fmap` threadsWait c) | ||
495 | (const True `fmap` putTMVar (threadsWriter c) (Just bs)) | ||
496 | |||
497 | -- | 'threadsClose' signals for the 'ConnectionThreads' object | ||
498 | -- to quit and close the associated 'Handle'. This operation | ||
499 | -- is non-blocking, follow it with 'threadsWait' if you want | ||
500 | -- to wait for the operation to complete. | ||
501 | threadsClose :: ConnectionThreads -> STM () | ||
502 | threadsClose c = do | ||
503 | let mvar = threadsWriter c | ||
504 | v <- tryReadTMVar mvar | ||
505 | case v of | ||
506 | Just Nothing -> return () -- already closed | ||
507 | _ -> putTMVar mvar Nothing | ||
508 | |||
509 | -- | 'threadsRead' blocks until a 'ByteString' is available which | ||
510 | -- is returned to the caller, or the connection is interrupted and | ||
511 | -- 'Nothing' is returned. | ||
512 | threadsRead :: ConnectionThreads -> IO (Maybe ByteString) | ||
513 | threadsRead c = atomically $ | ||
514 | orElse (const Nothing `fmap` threadsWait c) | ||
515 | (Just `fmap` readTChan (threadsChannel c)) | ||
516 | |||
517 | -- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' | ||
518 | -- or to a pair of 'ConnectionThreads' objects that are considered as one | ||
519 | -- connection. | ||
520 | data ConnectionState = | ||
521 | SaneConnection ConnectionThreads | ||
522 | -- ^ ordinary read/write connection | ||
523 | | WriteOnlyConnection ConnectionThreads | ||
524 | | ReadOnlyConnection ConnectionThreads | ||
525 | | ConnectionPair ConnectionThreads ConnectionThreads | ||
526 | -- ^ Two 'ConnectionThreads' objects, read operations use the | ||
527 | -- first, write operations use the second. | ||
528 | |||
529 | |||
530 | |||
531 | connWrite :: ConnectionState -> ByteString -> IO Bool | ||
532 | connWrite (ReadOnlyConnection _) bs = return False | ||
533 | connWrite conn bs = threadsWrite c bs | ||
534 | where | ||
535 | c = case conn of SaneConnection c -> c | ||
536 | WriteOnlyConnection c -> c | ||
537 | ConnectionPair _ c -> c | ||
538 | |||
539 | |||
540 | mapConn :: Bool -> | ||
541 | (ConnectionThreads -> STM ()) -> ConnectionState -> STM () | ||
542 | mapConn both action c = | ||
543 | case c of | ||
544 | SaneConnection rw -> action rw | ||
545 | ReadOnlyConnection r -> action r | ||
546 | WriteOnlyConnection w -> action w | ||
547 | ConnectionPair r w -> do | ||
548 | rem <- orElse (const w `fmap` action r) | ||
549 | (const r `fmap` action w) | ||
550 | when both $ action rem | ||
551 | |||
552 | connClose :: ConnectionState -> STM () | ||
553 | connClose c = mapConn True threadsClose c | ||
554 | |||
555 | connWait :: ConnectionState -> STM () | ||
556 | connWait c = mapConn False threadsWait c | ||
557 | |||
558 | connFlush c = | ||
559 | case c of | ||
560 | SaneConnection rw -> waitChan rw | ||
561 | ReadOnlyConnection r -> waitChan r | ||
562 | WriteOnlyConnection w -> return () | ||
563 | ConnectionPair r w -> waitChan r | ||
564 | where | ||
565 | waitChan t = do | ||
566 | b <- isEmptyTChan (threadsChannel t) | ||
567 | when (not b) retry | ||
568 | |||
569 | bshow e = S.pack . show $ e | ||
570 | warn str = S.hPutStrLn stderr str >> hFlush stderr | ||
571 | |||
572 | |||
573 | data PingEvent = PingIdle | PingTimeOut | ||
574 | |||
575 | data PingMachine = PingMachine | ||
576 | { pingIdle :: PingInterval | ||
577 | , pingTimeOut :: TimeOut | ||
578 | , pingDelay :: TMVar (Int,PingEvent) | ||
579 | , pingEvent :: TMVar PingEvent | ||
580 | , pingStarted :: TVar Bool -- True when a threadDelay is running | ||
581 | , pingThread :: ThreadId | ||
582 | } | ||
583 | |||
584 | pingMachine :: PingInterval -> TimeOut -> IO PingMachine | ||
585 | pingMachine idle timeout = do | ||
586 | me <- do | ||
587 | (delayVar,eventVar,startedVar) <- atomically $ do | ||
588 | d <- newEmptyTMVar | ||
589 | e <- newEmptyTMVar | ||
590 | s <- newTVar False | ||
591 | return (d,e,s) | ||
592 | return PingMachine { pingIdle = idle | ||
593 | , pingTimeOut = timeout | ||
594 | , pingDelay = delayVar | ||
595 | , pingEvent = eventVar | ||
596 | , pingStarted = startedVar | ||
597 | , pingThread = undefined } | ||
598 | thread <- forkIO . when (pingIdle me /=0) . fix $ | ||
599 | \loop -> do | ||
600 | (delay,event) <- atomically $ takeTMVar (pingDelay me) | ||
601 | when (delay /= 0) $ do | ||
602 | handle (\(ErrorCall _)-> do | ||
603 | atomically $ writeTVar (pingStarted me) False | ||
604 | loop) | ||
605 | (do atomically $ writeTVar (pingStarted me) True | ||
606 | threadDelay delay | ||
607 | atomically $ writeTVar (pingStarted me) False | ||
608 | atomically $ putTMVar (pingEvent me) event | ||
609 | case event of PingTimeOut -> return () | ||
610 | PingIdle -> loop) | ||
611 | |||
612 | return me { pingThread = thread } | ||
613 | |||
614 | pingCancel :: PingMachine -> IO () | ||
615 | pingCancel me = do | ||
616 | b <- atomically $ do | ||
617 | tryTakeTMVar (pingDelay me) -- no hang | ||
618 | putTMVar (pingDelay me) (0,PingTimeOut) | ||
619 | readTVar (pingStarted me) | ||
620 | when b $ throwTo (pingThread me) $ ErrorCall "" | ||
621 | |||
622 | pingBump :: PingMachine -> IO () | ||
623 | pingBump me = do | ||
624 | b <- atomically $ do | ||
625 | when (pingIdle me /= 0) $ | ||
626 | putTMVar (pingDelay me) (1000*pingIdle me,PingIdle) | ||
627 | readTVar (pingStarted me) | ||
628 | when b $ throwTo (pingThread me) $ ErrorCall "" | ||
629 | |||
630 | pingWait :: PingMachine -> STM PingEvent | ||
631 | pingWait me = do | ||
632 | e <- takeTMVar (pingEvent me) | ||
633 | case e of | ||
634 | PingIdle -> putTMVar (pingDelay me) | ||
635 | (1000*pingTimeOut me,PingTimeOut) | ||
636 | PingTimeOut -> putTMVar (pingDelay me) | ||
637 | (0,PingTimeOut) | ||
638 | return e | ||
639 | |||
640 | |||