summaryrefslogtreecommitdiff
path: root/Presence
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2017-11-20 22:40:50 +0000
committerJames Crayne <jim.crayne@gmail.com>2017-11-20 22:52:07 +0000
commit8c9d1cba722ae6539ec6701d1b0290ed635a55d1 (patch)
tree8ecdba79d49f9e5d53eb511a2dcd79de1a63f046 /Presence
parentde3223b62bf002232ca659c2738441ad6c1708eb (diff)
Rename: Server -> Connection.Tcp
Diffstat (limited to 'Presence')
-rw-r--r--Presence/Server.hs826
-rw-r--r--Presence/XMPPServer.hs2
2 files changed, 1 insertions, 827 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
deleted file mode 100644
index c38aec2a..00000000
--- a/Presence/Server.hs
+++ /dev/null
@@ -1,826 +0,0 @@
1{-# OPTIONS_HADDOCK prune #-}
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE DoAndIfThenElse #-}
4{-# LANGUAGE FlexibleInstances #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7{-# LANGUAGE StandaloneDeriving #-}
8{-# LANGUAGE TupleSections #-}
9{-# LANGUAGE LambdaCase #-}
10-----------------------------------------------------------------------------
11-- |
12-- Module : Server
13--
14-- Maintainer : joe@jerkface.net
15-- Stability : experimental
16--
17-- A TCP client/server library.
18--
19-- TODO:
20--
21-- * interface tweaks
22--
23module Server
24 ( module Server
25 , module PingMachine ) where
26
27import Data.ByteString (ByteString,hGetNonBlocking)
28import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack)
29import Data.Conduit ( Source, Sink, Flush )
30#if MIN_VERSION_containers(0,5,0)
31import qualified Data.Map.Strict as Map
32import Data.Map.Strict (Map)
33#else
34import qualified Data.Map as Map
35import Data.Map (Map)
36#endif
37import Data.Monoid ( (<>) )
38#ifdef THREAD_DEBUG
39import Control.Concurrent.Lifted.Instrument
40#else
41import Control.Concurrent.Lifted
42import GHC.Conc (labelThread)
43#endif
44
45import Control.Concurrent.STM
46-- import Control.Concurrent.STM.TMVar
47-- import Control.Concurrent.STM.TChan
48-- import Control.Concurrent.STM.Delay
49import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..),onException)
50import Control.Monad
51import Control.Monad.Fix
52-- import Control.Monad.STM
53-- import Control.Monad.Trans.Resource
54import Control.Monad.IO.Class (MonadIO (liftIO))
55import System.IO.Error (isDoesNotExistError)
56import System.IO
57 ( IOMode(..)
58 , hSetBuffering
59 , BufferMode(..)
60 , hWaitForInput
61 , hClose
62 , hIsEOF
63 , stderr
64 , Handle
65 , hFlush
66 , hPutStrLn
67 )
68import Network.Socket as Socket
69import Network.BSD
70 ( getProtocolNumber
71 )
72import Debug.Trace
73import Data.Time.Clock (getCurrentTime,diffUTCTime)
74-- import SockAddr ()
75-- import System.Locale (defaultTimeLocale)
76
77import InterruptibleDelay
78import PingMachine
79import Network.StreamServer
80import Network.SocketLike hiding (sClose)
81import qualified Connection as G
82 ;import Connection (Manager (..), Policy(..))
83
84
85type Microseconds = Int
86
87-- | This object is passed with the 'Listen' and 'Connect'
88-- instructions in order to control the behavior of the
89-- connections that are established. It is parameterized
90-- by a user-suplied type @conkey@ that is used as a lookup
91-- key for connections.
92data ConnectionParameters conkey u =
93 ConnectionParameters
94 { pingInterval :: PingInterval
95 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
96 -- event is signaled.
97 , timeout :: TimeOut
98 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
99 -- that are necessary for the connection to be considered
100 -- lost and signalling 'EOF'.
101 , makeConnKey :: RestrictedSocket -> IO (conkey,u)
102 -- ^ This action creates a lookup key for a new connection. If 'duplex'
103 -- is 'True' and the result is already assocatied with an established
104 -- connection, then an 'EOF' will be forced before the the new
105 -- connection becomes active.
106 --
107 , duplex :: Bool
108 -- ^ If True, then the connection will be treated as a normal
109 -- two-way socket. Otherwise, a readable socket is established
110 -- with 'Listen' and a writable socket is established with
111 -- 'Connect' and they are associated when 'makeConnKey' yields
112 -- same value for each.
113 }
114
115-- | Use this function to select appropriate default values for
116-- 'ConnectionParameters' other than 'makeConnKey'.
117--
118-- Current defaults:
119--
120-- * 'pingInterval' = 28000
121--
122-- * 'timeout' = 2000
123--
124-- * 'duplex' = True
125--
126connectionDefaults
127 :: (RestrictedSocket -> IO (conkey,u)) -> ConnectionParameters conkey u
128connectionDefaults f = ConnectionParameters
129 { pingInterval = 28000
130 , timeout = 2000
131 , makeConnKey = f
132 , duplex = True
133 }
134
135-- | Instructions for a 'Server' object
136--
137-- To issue a command, put it into the 'serverCommand' TMVar.
138data ServerInstruction conkey u
139 = Quit
140 -- ^ kill the server. This command is automatically issued when
141 -- the server is released.
142 | Listen PortNumber (ConnectionParameters conkey u)
143 -- ^ listen for incoming connections
144 | Connect SockAddr (ConnectionParameters conkey u)
145 -- ^ connect to addresses
146 | ConnectWithEndlessRetry SockAddr
147 (ConnectionParameters conkey u)
148 Miliseconds
149 -- ^ keep retrying the connection
150 | Ignore PortNumber
151 -- ^ stop listening on specified port
152 | Send conkey ByteString
153 -- ^ send bytes to an established connection
154
155#ifdef TEST
156deriving instance Show conkey => Show (ServerInstruction conkey u)
157instance Show (a -> b) where show _ = "<function>"
158deriving instance Show conkey => Show (ConnectionParameters conkey u)
159#endif
160
161-- | This type specifies which which half of a half-duplex
162-- connection is of interest.
163data InOrOut = In | Out
164 deriving (Enum,Eq,Ord,Show,Read)
165
166-- | These events may be read from 'serverEvent' TChannel.
167--
168data ConnectionEvent b
169 = Connection (STM Bool) (Source IO b) (Sink (Flush b) IO ())
170 -- ^ A new connection was established
171 | ConnectFailure SockAddr
172 -- ^ A 'Connect' command failed.
173 | HalfConnection InOrOut
174 -- ^ Half of a half-duplex connection is avaliable.
175 | EOF
176 -- ^ A connection was terminated
177 | RequiresPing
178 -- ^ 'pingInterval' miliseconds of idle was experienced
179
180#ifdef TEST
181instance Show (IO a) where show _ = "<IO action>"
182instance Show (STM a) where show _ = "<STM action>"
183instance Eq (ByteString -> IO Bool) where (==) _ _ = True
184instance Eq (IO (Maybe ByteString)) where (==) _ _ = True
185instance Eq (STM Bool) where (==) _ _ = True
186deriving instance Show b => Show (ConnectionEvent b)
187deriving instance Eq b => Eq (ConnectionEvent b)
188#endif
189
190-- | This is the per-connection state.
191data ConnectionRecord u
192 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler
193 , cstate :: ConnectionState -- ^ used to send/receive data to the connection
194 , cdata :: u -- ^ user data, stored in the connection map for convenience
195 }
196
197-- | This object accepts commands and signals events and maintains
198-- the list of currently listening ports and established connections.
199data Server a u releaseKey b
200 = Server { serverCommand :: TMVar (ServerInstruction a u)
201 , serverEvent :: TChan ((a,u), ConnectionEvent b)
202 , serverReleaseKey :: releaseKey
203 , conmap :: TVar (Map a (ConnectionRecord u))
204 , listenmap :: TVar (Map PortNumber ServerHandle)
205 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
206 }
207
208control :: Server a u releaseKey b -> ServerInstruction a u -> IO ()
209control sv = atomically . putTMVar (serverCommand sv)
210
211type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
212
213noCleanUp :: MonadIO m => Allocate () m
214noCleanUp io _ = ( (,) () ) `liftM` liftIO io
215
216-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
217-- to ensure proper cleanup. For example,
218--
219-- > import Server
220-- > import Control.Monad.Trans.Resource (runResourceT)
221-- > import Control.Monad.IO.Class (liftIO)
222-- > import Control.Monad.STM (atomically)
223-- > import Control.Concurrent.STM.TMVar (putTMVar)
224-- > import Control.Concurrent.STM.TChan (readTChan)
225-- >
226-- > main = runResourceT $ do
227-- > sv <- server allocate
228-- > let params = connectionDefaults (return . snd)
229-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params)
230-- > let loop = do
231-- > (_,event) <- atomically $ readTChan (serverEvent sv)
232-- > case event of
233-- > Connection getPingFlag readData writeData -> do
234-- > forkIO $ do
235-- > fix $ \readLoop -> do
236-- > readData >>= mapM $ \bytes ->
237-- > putStrLn $ "got: " ++ show bytes
238-- > readLoop
239-- > case event of EOF -> return ()
240-- > _ -> loop
241-- > liftIO loop
242--
243-- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp'
244-- to do without automatic cleanup and be sure to remember to write 'Quit' to
245-- the 'serverCommand' variable.
246server ::
247 -- forall (m :: * -> *) a u conkey releaseKey.
248 (Show conkey, MonadIO m, Ord conkey) =>
249 Allocate releaseKey m
250 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
251 -> m (Server conkey u releaseKey x)
252server allocate sessionConduits = do
253 (key,cmds) <- allocate (atomically newEmptyTMVar)
254 (atomically . flip putTMVar Quit)
255 server <- liftIO . atomically $ do
256 tchan <- newTChan
257 conmap <- newTVar Map.empty
258 listenmap<- newTVar Map.empty
259 retrymap <- newTVar Map.empty
260 return Server { serverCommand = cmds
261 , serverEvent = tchan
262 , serverReleaseKey = key
263 , conmap = conmap
264 , listenmap = listenmap
265 , retrymap = retrymap
266 }
267 liftIO $ do
268 tid <- forkIO $ fix $ \loop -> do
269 instr <- atomically $ takeTMVar cmds
270 -- warn $ "instr = " <> bshow instr
271 let again = do doit server instr
272 -- warn $ "finished " <> bshow instr
273 loop
274 case instr of Quit -> closeAll server
275 _ -> again
276 labelThread tid "server"
277 return server
278 where
279 closeAll server = do
280 listening <- atomically . readTVar $ listenmap server
281 mapM_ quitListening (Map.elems listening)
282 let stopRetry (v,d) = do atomically $ writeTVar v False
283 interruptDelay d
284 retriers <- atomically $ do
285 rmap <- readTVar $ retrymap server
286 writeTVar (retrymap server) Map.empty
287 return rmap
288 mapM_ stopRetry (Map.elems retriers)
289 cons <- atomically . readTVar $ conmap server
290 atomically $ mapM_ (connClose . cstate) (Map.elems cons)
291 atomically $ mapM_ (connWait . cstate) (Map.elems cons)
292 atomically $ writeTVar (conmap server) Map.empty
293
294
295 doit server (Listen port params) = do
296
297 listening <- Map.member port
298 `fmap` atomically (readTVar $ listenmap server)
299 when (not listening) $ do
300
301 hPutStrLn stderr $ "Started listening on "++show port
302
303 let family = AF_INET6
304 address = case family of
305 AF_INET -> SockAddrInet port iNADDR_ANY
306 AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0
307
308 sserv <- flip streamServer address ServerConfig
309 { serverWarn = hPutStrLn stderr
310 , serverSession = \sock _ h -> do
311 (conkey,u) <- makeConnKey params sock
312 _ <- newConnection server sessionConduits params conkey u h In
313 return ()
314 }
315
316 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
317
318 doit server (Ignore port) = do
319 hPutStrLn stderr $ "Stopping listen on "++show port
320 mb <- atomically $ do
321 map <- readTVar $ listenmap server
322 modifyTVar' (listenmap server) $ Map.delete port
323 return $ Map.lookup port map
324 maybe (return ()) quitListening $ mb
325
326 doit server (Send con bs) = do -- . void . forkIO $ do
327 map <- atomically $ readTVar (conmap server)
328 let post False = (trace ("cant send: "++show bs) $ return ())
329 post True = return ()
330 maybe (post False)
331 (post <=< flip connWrite bs . cstate)
332 $ Map.lookup con map
333
334 doit server (Connect addr params) = join $ atomically $ do
335 Map.lookup addr <$> readTVar (retrymap server)
336 >>= return . \case
337 Nothing -> forkit
338 Just (v,d) -> do b <- atomically $ readTVar v
339 interruptDelay d
340 when (not b) forkit
341 where
342 forkit = void . forkIO $ do
343 myThreadId >>= flip labelThread ( "Connect." ++ show addr )
344 proto <- getProtocolNumber "tcp"
345 sock <- socket (socketFamily addr) Stream proto
346 handle (\e -> do -- let t = ioeGetErrorType e
347 when (isDoesNotExistError e) $ return () -- warn "GOTCHA"
348 -- warn $ "connect-error: " <> bshow e
349 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ?
350 Socket.close sock
351 atomically
352 $ writeTChan (serverEvent server)
353 $ ((conkey,u),ConnectFailure addr))
354 $ do
355 connect sock addr
356 (conkey,u) <- makeConnKey params (restrictSocket sock)
357 h <- socketToHandle sock ReadWriteMode
358 newConnection server sessionConduits params conkey u h Out
359 return ()
360
361 doit server (ConnectWithEndlessRetry addr params interval) = do
362 proto <- getProtocolNumber "tcp"
363 void . forkIO $ do
364 myThreadId >>= flip labelThread ("ConnectWithEndlessRetry." ++ show addr)
365 timer <- interruptibleDelay
366 (retryVar,action) <- atomically $ do
367 map <- readTVar (retrymap server)
368 action <- case Map.lookup addr map of
369 Nothing -> return $ return ()
370 Just (v,d) -> do writeTVar v False
371 return $ interruptDelay d
372 v <- newTVar True
373 writeTVar (retrymap server) $! Map.insert addr (v,timer) map
374 return (v,action :: IO ())
375 action
376 fix $ \retryLoop -> do
377 utc <- getCurrentTime
378 shouldRetry <- do
379 handle (\(SomeException e) -> do
380 -- Exceptions thrown by 'socket' need to be handled specially
381 -- since we don't have enough information to broadcast a ConnectFailure
382 -- on serverEvent.
383 warn $ "Failed to create socket: " <> bshow e
384 atomically $ readTVar retryVar) $ do
385 sock <- socket (socketFamily addr) Stream proto
386 handle (\(SomeException e) -> do
387 -- Any thing else goes wrong and we broadcast ConnectFailure.
388 do (conkey,u) <- makeConnKey params (restrictSocket sock)
389 Socket.close sock
390 atomically $ writeTChan (serverEvent server) ((conkey,u),ConnectFailure addr)
391 `onException` return ()
392 atomically $ readTVar retryVar) $ do
393 connect sock addr
394 (conkey,u) <- makeConnKey params (restrictSocket sock)
395 h <- socketToHandle sock ReadWriteMode
396 threads <- newConnection server sessionConduits params conkey u h Out
397 atomically $ do threadsWait threads
398 readTVar retryVar
399 fin_utc <- getCurrentTime
400 when shouldRetry $ do
401 let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc)
402 expected = fromIntegral interval
403 when (shouldRetry && elapsed < expected) $ do
404 debugNoise $ "Waiting to retry " <> bshow addr
405 void $ startDelay timer (round $ 1000 * (expected-elapsed))
406 debugNoise $ "retry " <> bshow (shouldRetry,addr)
407 when shouldRetry $ retryLoop
408
409
410-- INTERNAL ----------------------------------------------------------
411
412{-
413hWriteUntilNothing h outs =
414 fix $ \loop -> do
415 mb <- atomically $ takeTMVar outs
416 case mb of Just bs -> do S.hPutStrLn h bs
417 warn $ "wrote " <> bs
418 loop
419 Nothing -> do warn $ "wrote Nothing"
420 hClose h
421
422-}
423connRead :: ConnectionState -> IO (Maybe ByteString)
424connRead (WriteOnlyConnection w) = do
425 -- atomically $ discardContents (threadsChannel w)
426 return Nothing
427connRead conn = do
428 c <- atomically $ getThreads
429 threadsRead c
430 where
431 getThreads =
432 case conn of SaneConnection c -> return c
433 ReadOnlyConnection c -> return c
434 ConnectionPair c w -> do
435 -- discardContents (threadsChannel w)
436 return c
437
438socketFamily :: SockAddr -> Family
439socketFamily (SockAddrInet _ _) = AF_INET
440socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
441socketFamily (SockAddrUnix _) = AF_UNIX
442
443
444conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
445 -> ConnectionState
446 -> ConnectionEvent x
447conevent sessionConduits con = Connection pingflag read write
448 where
449 pingflag = swapTVar (pingFlag (connPingTimer con)) False
450 (read,write) = sessionConduits (connRead con) (connWrite con)
451
452newConnection :: Ord a
453 => Server a u1 releaseKey x
454 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
455 -> ConnectionParameters conkey u
456 -> a
457 -> u1
458 -> Handle
459 -> InOrOut
460 -> IO ConnectionThreads
461newConnection server sessionConduits params conkey u h inout = do
462 hSetBuffering h NoBuffering
463 let (idle_ms,timeout_ms) =
464 case (inout,duplex params) of
465 (Out,False) -> ( 0, 0 )
466 _ -> ( pingInterval params
467 , timeout params )
468
469 new <- do pinglogic <- forkPingMachine idle_ms timeout_ms
470 connectionThreads h pinglogic
471 started <- atomically $ newEmptyTMVar
472 kontvar <- atomically newEmptyTMVar
473 -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ?
474 let _ = kontvar :: TMVar (STM (IO ()))
475 forkIO $ do
476 myThreadId >>= flip labelThread ("connecting...")
477 getkont <- atomically $ takeTMVar kontvar
478 kont <- atomically getkont
479 kont
480
481 atomically $ do
482 current <- fmap (Map.lookup conkey) $ readTVar (conmap server)
483 case current of
484 Nothing -> do
485 (newCon,e) <- return $
486 if duplex params
487 then let newcon = SaneConnection new
488 in ( newcon, ((conkey,u), conevent sessionConduits newcon) )
489 else ( case inout of
490 In -> ReadOnlyConnection new
491 Out -> WriteOnlyConnection new
492 , ((conkey,u), HalfConnection inout) )
493 modifyTVar' (conmap server) $ Map.insert conkey
494 ConnectionRecord { ckont = kontvar
495 , cstate = newCon
496 , cdata = u }
497 announce e
498 putTMVar kontvar $ return $ do
499 myThreadId >>= flip labelThread ("connection."++show inout) -- XXX: more info would be nice.
500 atomically $ putTMVar started ()
501 -- Wait for something interesting.
502 handleEOF conkey u kontvar newCon
503 Just what@ConnectionRecord { ckont =mvar }-> do
504 putTMVar kontvar $ return $ return () -- Kill redundant "connecting..." thread.
505 putTMVar mvar $ do
506 -- The action returned by updateConMap, eventually invokes handleEOF,
507 -- so the sequencer thread will not be terminated.
508 kont <- updateConMap conkey u new what
509 putTMVar started ()
510 return kont
511 return new
512 where
513
514 announce e = writeTChan (serverEvent server) e
515
516 -- This function loops and will not quit unless an action is posted to the
517 -- mvar that does not in turn invoke this function, or if an EOF occurs.
518 handleEOF conkey u mvar newCon = do
519 action <- atomically . foldr1 orElse $
520 [ takeTMVar mvar >>= id -- passed continuation
521 , connWait newCon >> return eof
522 , connWaitPing newCon >>= return . sendPing
523 -- , pingWait pingTimer >>= return . sendPing
524 ]
525 action :: IO ()
526 where
527 eof = do
528 -- warn $ "EOF " <>bshow conkey
529 connCancelPing newCon
530 atomically $ do connFlush newCon
531 announce ((conkey,u),EOF)
532 modifyTVar' (conmap server)
533 $ Map.delete conkey
534 -- warn $ "fin-EOF "<>bshow conkey
535
536 sendPing PingTimeOut = do
537 {-
538 utc <- getCurrentTime
539 let utc' = formatTime defaultTimeLocale "%s" utc
540 warn $ "ping:TIMEOUT " <> bshow utc'
541 -}
542 atomically (connClose newCon)
543 eof
544
545 sendPing PingIdle = do
546 {-
547 utc <- getCurrentTime
548 let utc' = formatTime defaultTimeLocale "%s" utc
549 -- warn $ "ping:IDLE " <> bshow utc'
550 -}
551 atomically $ announce ((conkey,u),RequiresPing)
552 handleEOF conkey u mvar newCon
553
554
555 updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do
556 new' <-
557 if duplex params then do
558 announce ((conkey,u),EOF)
559 connClose replaced
560 let newcon = SaneConnection new
561 announce $ ((conkey,u),conevent sessionConduits newcon)
562 return $ newcon
563 else
564 case replaced of
565 WriteOnlyConnection w | inout==In ->
566 do let newcon = ConnectionPair new w
567 announce ((conkey,u),conevent sessionConduits newcon)
568 return newcon
569 ReadOnlyConnection r | inout==Out ->
570 do let newcon = ConnectionPair r new
571 announce ((conkey,u),conevent sessionConduits newcon)
572 return newcon
573 _ -> do -- connFlush todo
574 announce ((conkey,u0), EOF)
575 connClose replaced
576 announce ((conkey,u), HalfConnection inout)
577 return $ case inout of
578 In -> ReadOnlyConnection new
579 Out -> WriteOnlyConnection new
580 modifyTVar' (conmap server) $ Map.insert conkey
581 ConnectionRecord { ckont = mvar
582 , cstate = new'
583 , cdata = u }
584 return $ handleEOF conkey u mvar new'
585
586
587getPacket :: Handle -> IO ByteString
588getPacket h = do hWaitForInput h (-1)
589 hGetNonBlocking h 1024
590
591
592
593-- | 'ConnectionThreads' is an interface to a pair of threads
594-- that are reading and writing a 'Handle'.
595data ConnectionThreads = ConnectionThreads
596 { threadsWriter :: TMVar (Maybe ByteString)
597 , threadsChannel :: TChan ByteString
598 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close
599 , threadsPing :: PingMachine
600 }
601
602-- | This spawns the reader and writer threads and returns a newly
603-- constructed 'ConnectionThreads' object.
604connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
605connectionThreads h pinglogic = do
606
607 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
608
609 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
610 readerThread <- forkIO $ do
611 myThreadId >>= flip labelThread ("readerThread")
612 let finished e = do
613 hClose h
614 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
615 -- let _ = fmap ioeGetErrorType e -- type hint
616 let _ = fmap what e where what (SomeException _) = undefined
617 atomically $ do tryTakeTMVar outs
618 putTMVar outs Nothing -- quit writer
619 putTMVar doner ()
620 handle (finished . Just) $ do
621 pingBump pinglogic -- start the ping timer
622 fix $ \loop -> do
623 packet <- getPacket h
624 -- warn $ "read: " <> S.take 60 packet
625 atomically $ writeTChan incomming packet
626 pingBump pinglogic
627 -- warn $ "bumped: " <> S.take 60 packet
628 isEof <- hIsEOF h
629 if isEof then finished Nothing else loop
630
631 writerThread <- forkIO . fix $ \loop -> do
632 myThreadId >>= flip labelThread ("writerThread")
633 let finished = do -- warn $ "finished write"
634 -- hClose h -- quit reader
635 throwTo readerThread (ErrorCall "EOF")
636 atomically $ putTMVar donew ()
637 mb <- atomically $ readTMVar outs
638 case mb of Just bs -> handle (\(SomeException e)->finished)
639 (do -- warn $ "writing: " <> S.take 60 bs
640 S.hPutStr h bs
641 -- warn $ "wrote: " <> S.take 60 bs
642 atomically $ takeTMVar outs
643 loop)
644 Nothing -> finished
645
646 let wait = do readTMVar donew
647 readTMVar doner
648 return ()
649 return ConnectionThreads { threadsWriter = outs
650 , threadsChannel = incomming
651 , threadsWait = wait
652 , threadsPing = pinglogic }
653
654
655-- | 'threadsWrite' writes the given 'ByteString' to the
656-- 'ConnectionThreads' object. It blocks until the ByteString
657-- is written and 'True' is returned, or the connection is
658-- interrupted and 'False' is returned.
659threadsWrite :: ConnectionThreads -> ByteString -> IO Bool
660threadsWrite c bs = atomically $
661 orElse (const False `fmap` threadsWait c)
662 (const True `fmap` putTMVar (threadsWriter c) (Just bs))
663
664-- | 'threadsClose' signals for the 'ConnectionThreads' object
665-- to quit and close the associated 'Handle'. This operation
666-- is non-blocking, follow it with 'threadsWait' if you want
667-- to wait for the operation to complete.
668threadsClose :: ConnectionThreads -> STM ()
669threadsClose c = do
670 let mvar = threadsWriter c
671 v <- tryReadTMVar mvar
672 case v of
673 Just Nothing -> return () -- already closed
674 _ -> putTMVar mvar Nothing
675
676-- | 'threadsRead' blocks until a 'ByteString' is available which
677-- is returned to the caller, or the connection is interrupted and
678-- 'Nothing' is returned.
679threadsRead :: ConnectionThreads -> IO (Maybe ByteString)
680threadsRead c = atomically $
681 orElse (const Nothing `fmap` threadsWait c)
682 (Just `fmap` readTChan (threadsChannel c))
683
684-- | A 'ConnectionState' is an interface to a single 'ConnectionThreads'
685-- or to a pair of 'ConnectionThreads' objects that are considered as one
686-- connection.
687data ConnectionState =
688 SaneConnection ConnectionThreads
689 -- ^ ordinary read/write connection
690 | WriteOnlyConnection ConnectionThreads
691 | ReadOnlyConnection ConnectionThreads
692 | ConnectionPair ConnectionThreads ConnectionThreads
693 -- ^ Two 'ConnectionThreads' objects, read operations use the
694 -- first, write operations use the second.
695
696
697
698connWrite :: ConnectionState -> ByteString -> IO Bool
699connWrite (ReadOnlyConnection _) bs = return False
700connWrite conn bs = threadsWrite c bs
701 where
702 c = case conn of SaneConnection c -> c
703 WriteOnlyConnection c -> c
704 ConnectionPair _ c -> c
705
706
707mapConn :: Bool ->
708 (ConnectionThreads -> STM ()) -> ConnectionState -> STM ()
709mapConn both action c =
710 case c of
711 SaneConnection rw -> action rw
712 ReadOnlyConnection r -> action r
713 WriteOnlyConnection w -> action w
714 ConnectionPair r w -> do
715 rem <- orElse (const w `fmap` action r)
716 (const r `fmap` action w)
717 when both $ action rem
718
719connClose :: ConnectionState -> STM ()
720connClose c = mapConn True threadsClose c
721
722connWait :: ConnectionState -> STM ()
723connWait c = doit -- mapConn False threadsWait c
724 where
725 action = threadsWait
726 doit =
727 case c of
728 SaneConnection rw -> action rw
729 ReadOnlyConnection r -> action r
730 WriteOnlyConnection w -> action w
731 ConnectionPair r w -> do
732 rem <- orElse (const w `fmap` action r)
733 (const r `fmap` action w)
734 threadsClose rem
735
736connPingTimer :: ConnectionState -> PingMachine
737connPingTimer c =
738 case c of
739 SaneConnection rw -> threadsPing rw
740 ReadOnlyConnection r -> threadsPing r
741 WriteOnlyConnection w -> threadsPing w -- should be disabled.
742 ConnectionPair r w -> threadsPing r
743
744connCancelPing :: ConnectionState -> IO ()
745connCancelPing c = pingCancel (connPingTimer c)
746
747connWaitPing :: ConnectionState -> STM PingEvent
748connWaitPing c = pingWait (connPingTimer c)
749
750connFlush :: ConnectionState -> STM ()
751connFlush c =
752 case c of
753 SaneConnection rw -> waitChan rw
754 ReadOnlyConnection r -> waitChan r
755 WriteOnlyConnection w -> return ()
756 ConnectionPair r w -> waitChan r
757 where
758 waitChan t = do
759 b <- isEmptyTChan (threadsChannel t)
760 when (not b) retry
761
762bshow :: Show a => a -> ByteString
763bshow e = S.pack . show $ e
764
765warn :: ByteString -> IO ()
766warn str = S.hPutStrLn stderr str >> hFlush stderr
767
768debugNoise :: Monad m => t -> m ()
769debugNoise str = return ()
770
771data TCPStatus = Resolving | AwaitingRead | AwaitingWrite
772
773tcpManager :: ( Show k, Ord k, Ord conkey ) =>
774 (conkey -> (SockAddr, ConnectionParameters conkey u, Miliseconds))
775 -> (String -> Maybe k)
776 -> (k -> IO (Maybe conkey))
777 -> Server conkey u releaseKey x
778 -> IO (Manager TCPStatus k)
779tcpManager grokKey s2k resolvKey sv = do
780 rmap <- atomically $ newTVar Map.empty -- Map k (Maybe conkey)
781 nullping <- forkPingMachine 0 0
782 return Manager {
783 setPolicy = \k -> \case
784 TryingToConnect -> join $ atomically $ do
785 r <- readTVar rmap
786 case Map.lookup k r of
787 Just {} -> return $ return () -- Connection already in progress.
788 Nothing -> do
789 modifyTVar' rmap $ Map.insert k Nothing
790 return $ void $ forkIO $ do
791 myThreadId >>= flip labelThread ("resolve."++show k)
792 mconkey <- resolvKey k
793 case mconkey of
794 Nothing -> atomically $ modifyTVar' rmap $ Map.delete k
795 Just conkey -> do
796 control sv $ case grokKey conkey of
797 (saddr,params,ms) -> ConnectWithEndlessRetry saddr params ms
798 OpenToConnect -> hPutStrLn stderr "TODO: TCP OpenToConnect"
799 RefusingToConnect -> hPutStrLn stderr "TODO: TCP RefusingToConnect"
800 , connections = do
801 c <- readTVar $ conmap sv
802 fmap (exportConnection nullping c) <$> readTVar rmap
803 , stringToKey = s2k
804 , showProgress = \case
805 Resolving -> "resolving"
806 AwaitingRead -> "awaiting inbound"
807 AwaitingWrite -> "awaiting outbound"
808 , showKey = show
809 }
810
811exportConnection :: Ord conkey => PingMachine -> Map conkey (ConnectionRecord u) -> Maybe conkey -> G.Connection TCPStatus
812exportConnection nullping conmap mkey = G.Connection
813 { G.connStatus = case mkey of
814 Nothing -> return $ G.Dormant
815 Just conkey -> case Map.lookup conkey conmap of
816 Nothing -> return $ G.InProgress Resolving
817 Just (ConnectionRecord ckont cstate cdata) -> return $ case cstate of
818 SaneConnection {} -> G.Established
819 ConnectionPair {} -> G.Established
820 ReadOnlyConnection {} -> G.InProgress AwaitingWrite
821 WriteOnlyConnection {} -> G.InProgress AwaitingRead
822 , G.connPolicy = return TryingToConnect
823 , G.connPingLogic = case mkey >>= flip Map.lookup conmap of
824 Nothing -> nullping
825 Just (ConnectionRecord _ cstate _) -> connPingTimer cstate
826 }
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
index 6d6d3bd7..5a0ed20e 100644
--- a/Presence/XMPPServer.hs
+++ b/Presence/XMPPServer.hs
@@ -34,7 +34,7 @@ module XMPPServer
34import ConnectionKey 34import ConnectionKey
35import qualified Control.Concurrent.STM.UpdateStream as Slotted 35import qualified Control.Concurrent.STM.UpdateStream as Slotted
36import Nesting 36import Nesting
37import Server 37import Connection.Tcp
38import EventUtil 38import EventUtil
39import ControlMaybe 39import ControlMaybe
40import LockedChan 40import LockedChan