summaryrefslogtreecommitdiff
path: root/Presence/Server.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-09 21:57:30 -0500
committerjoe <joe@jerkface.net>2014-02-09 21:57:30 -0500
commit1b0182eae555aeb2952e40a522bd5215ae0fc6d9 (patch)
treedc3fa54ad0930309d39b6c7b0f6d2eb211f951a2 /Presence/Server.hs
parent716dffcc5f0c21d39c08e512f0dd51950d1bc482 (diff)
new Server code, xmppServer demo
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r--Presence/Server.hs640
1 files changed, 640 insertions, 0 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
new file mode 100644
index 00000000..da0cda17
--- /dev/null
+++ b/Presence/Server.hs
@@ -0,0 +1,640 @@
1{-# OPTIONS_HADDOCK prune #-}
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE StandaloneDeriving #-}
4{-# LANGUAGE OverloadedStrings #-}
5{-# LANGUAGE TupleSections #-}
6-----------------------------------------------------------------------------
7-- |
8-- Module : Server
9--
10-- Maintainer : joe@jerkface.net
11-- Stability : experimental
12--
13-- A TCP client/server library.
14--
15-- TODO:
16--
17-- * interface tweaks
18--
19module Server where
20
21import Data.ByteString (ByteString,hGetNonBlocking)
22import qualified Data.ByteString.Char8 as S ( hPutStrLn, hPutStr, pack)
23#if MIN_VERSION_containers(0,5,0)
24import qualified Data.Map.Strict as Map
25import Data.Map.Strict (Map)
26#else
27import qualified Data.Map as Map
28import Data.Map (Map)
29#endif
30import Data.Monoid ( (<>) )
31import Control.Concurrent
32import Control.Concurrent.STM
33-- import Control.Concurrent.STM.TMVar
34-- import Control.Concurrent.STM.TChan
35-- import Control.Concurrent.STM.Delay
36import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,ErrorCall(..))
37import Control.Monad
38import Control.Monad.Fix
39-- import Control.Monad.STM
40import Control.Monad.Trans.Resource
41import Control.Monad.IO.Class (MonadIO (liftIO))
42import System.IO.Error (ioeGetErrorType)
43import System.IO
44 ( IOMode(..)
45 , hSetBuffering
46 , BufferMode(..)
47 , hWaitForInput
48 , hClose
49 , hIsEOF
50 , stderr
51 , Handle
52 , hFlush
53 )
54import Network.Socket
55import Network.BSD
56 ( getProtocolNumber
57 )
58import Debug.Trace
59
60todo = error "unimplemented"
61
62type TimeOut = Int -- ^ miliseconds
63type PingInterval = Int -- ^ miliseconds
64
65-- | This object is passed with the 'Listen' and 'Connect'
66-- instructions in order to control the behavior of the
67-- connections that are established. It is parameterized
68-- by a user-suplied type @conkey@ that is used as a lookup
69-- key for connections.
70data ConnectionParameters conkey =
71 ConnectionParameters
72 { pingInterval :: PingInterval
73 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
74 -- event is signaled.
75 , timeout :: TimeOut
76 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
77 -- that are necessary for the connection to be considered
78 -- lost and signalling 'EOF'.
79 , makeConnKey :: (Socket,SockAddr) -> IO conkey
80 -- ^ This action creates a lookup key for a new connection. If 'duplex'
81 -- is 'True' and the result is already assocatied with an established
82 -- connection, then an 'EOF' will be forced before the the new
83 -- connection becomes active.
84 --
85 , duplex :: Bool
86 -- ^ If True, then the connection will be treated as a normal
87 -- two-way socket. Otherwise, a readable socket is established
88 -- with 'Listen' and a writable socket is established with
89 -- 'Connect' and they are associated when 'makeConnKey' yields
90 -- same value for each.
91 }
92
93-- | Use this function to select appropriate default values for
94-- 'ConnectionParameters' other than 'makeConnKey'.
95--
96-- Current defaults:
97--
98-- * 'pingInterval' = 28000
99--
100-- * 'timeout' = 2000
101--
102-- * 'duplex' = True
103--
104connectionDefaults
105 :: ((Socket, SockAddr) -> IO conkey) -> ConnectionParameters conkey
106connectionDefaults f = ConnectionParameters
107 { pingInterval = 28000
108 , timeout = 2000
109 , makeConnKey = f
110 , duplex = True
111 }
112
113-- | Instructions for a 'Server' object
114--
115-- To issue a command, put it into the 'serverCommand' TMVar.
116data ServerInstruction conkey
117 = Quit
118 -- ^ kill the server. This command is automatically issued when
119 -- the server is released.
120 | Listen PortNumber (ConnectionParameters conkey)
121 -- ^ listen for incomming connections
122 | Connect SockAddr (ConnectionParameters conkey)
123 -- ^ connect to addresses
124 | Ignore PortNumber
125 -- ^ stop listening on specified port
126 | Send conkey ByteString
127 -- ^ send bytes to an established connection
128
129#ifdef TEST
130deriving instance Show conkey => Show (ServerInstruction conkey)
131instance Show (a -> b) where show _ = "<function>"
132deriving instance Show conkey => Show (ConnectionParameters conkey)
133#endif
134
135-- | This type specifies which which half of a half-duplex
136-- connection is of interest.
137data InOrOut = In | Out
138 deriving (Enum,Eq,Ord,Show,Read)
139
140-- | These events may be read from 'serverEvent' TChannel.
141--
142data ConnectionEvent b
143 = Got b
144 -- ^ Arrival of data from a socket
145 | Connection
146 -- ^ A new connection was established
147 | HalfConnection InOrOut
148 -- ^ Half of a half-duplex connection is avaliable.
149 | EOF
150 -- ^ A connection was terminated
151 | RequiresPing
152 -- ^ 'pingInterval' miliseconds of idle was experienced
153
154deriving instance Show b => Show (ConnectionEvent b)
155deriving instance Eq b => Eq (ConnectionEvent b)
156
157-- | This object accepts commands and signals events and maintains
158-- the list of currently listening ports and established connections.
159data Server a
160 = Server { serverCommand :: TMVar (ServerInstruction a)
161 , serverEvent :: TChan (a, ConnectionEvent ByteString)
162 , serverReleaseKey :: ReleaseKey
163 , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState))
164 , listenmap :: TVar (Map PortNumber (ThreadId,Socket))
165 }
166
167-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
168-- to ensure proper cleanup. For example,
169--
170-- > import Server
171-- > import Control.Monad.Trans.Resource (runResourceT)
172-- > import Control.Monad.IO.Class (liftIO)
173-- > import Control.Monad.STM (atomically)
174-- > import Control.Concurrent.STM.TMVar (putTMVar)
175-- > import Control.Concurrent.STM.TChan (readTChan)
176-- >
177-- > main = runResourceT $ do
178-- > sv <- server
179-- > let params = connectionDefaults (return . snd)
180-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params)
181-- > let loop = do
182-- > (_,event) <- atomically $ readTChan (serverEvent sv)
183-- > case event of
184-- > Got bytes -> putStrLn $ "got: " ++ show bytes
185-- > _ -> return ()
186-- > case event of EOF -> return ()
187-- > _ -> loop
188-- > liftIO loop
189server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a)
190server = do
191 (key,cmds) <- allocate (atomically newEmptyTMVar)
192 (atomically . flip putTMVar Quit)
193 server <- liftIO . atomically $ do
194 tchan <- newTChan
195 conmap <- newTVar Map.empty
196 listenmap<- newTVar Map.empty
197 return Server { serverCommand = cmds
198 , serverEvent = tchan
199 , serverReleaseKey = key
200 , conmap = conmap
201 , listenmap = listenmap
202 }
203 liftIO $ do
204 forkIO $ fix $ \loop -> do
205 instr <- atomically $ takeTMVar cmds
206 -- warn $ "instr = " <> bshow instr
207 let again = do doit server instr
208 -- warn $ "finished " <> bshow instr
209 loop
210 case instr of Quit -> closeAll server
211 _ -> again
212 return server
213 where
214 closeAll server = liftIO $ do
215 listening <- atomically . readTVar $ listenmap server
216 mapM_ killListener (Map.elems listening)
217 cons <- atomically . readTVar $ conmap server
218 atomically $ mapM_ (connClose . snd) (Map.elems cons)
219 atomically $ mapM_ (connWait . snd) (Map.elems cons)
220 atomically $ writeTVar (conmap server) Map.empty
221
222
223 doit server (Listen port params) = liftIO $ do
224
225 listening <- Map.member port
226 `fmap` atomically (readTVar $ listenmap server)
227 when (not listening) $ do
228
229 let family = AF_INET6
230
231 sock <- socket family Stream 0
232 setSocketOption sock ReuseAddr 1
233 let address =
234 case family of
235 AF_INET -> SockAddrInet port iNADDR_ANY
236 AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0
237 fix $ \loop -> do
238 handle (\(SomeException e)-> do
239 warn $ "BIND-ERROR:"<>bshow address <> " " <> bshow e
240 threadDelay 5000000
241 loop)
242 $ bindSocket sock address
243 listen sock 2
244 thread <- forkIO $ acceptLoop server params sock
245 atomically $ listenmap server `modifyTVar'` Map.insert port (thread,sock)
246
247 doit server (Ignore port) = liftIO $ do
248 mb <- atomically $ do
249 map <- readTVar $ listenmap server
250 modifyTVar' (listenmap server) $ Map.delete port
251 return $ Map.lookup port map
252 maybe (return ()) killListener $ mb
253
254 doit server (Send con bs) = liftIO $ do -- . void . forkIO $ do
255 map <- atomically $ readTVar (conmap server)
256 let post False = (trace ("cant send: "++show bs) $ return ())
257 post True = return ()
258 maybe (post False)
259 (post <=< flip connWrite bs . snd)
260 $ Map.lookup con map
261
262 doit server (Connect addr params) = liftIO $ do
263 void . forkIO $ do
264 proto <- getProtocolNumber "tcp"
265 sock <- bracketOnError
266 (socket (socketFamily addr) Stream proto)
267 (sClose . trace "connect-error" ) -- only done if there's an error
268 $ \sock -> do connect sock addr
269 return sock
270 me <- getSocketName sock
271 conkey <- makeConnKey params (sock,me)
272 h <- socketToHandle sock ReadWriteMode
273 newConnection server params conkey h Out
274
275
276-- INTERNAL ----------------------------------------------------------
277
278{-
279hWriteUntilNothing h outs =
280 fix $ \loop -> do
281 mb <- atomically $ takeTMVar outs
282 case mb of Just bs -> do S.hPutStrLn h bs
283 warn $ "wrote " <> bs
284 loop
285 Nothing -> do warn $ "wrote Nothing"
286 hClose h
287
288connRead :: ConnectionState -> IO (Maybe ByteString)
289connRead (WriteOnlyConnection w) = do
290 atomically $ discardContents (threadsChannel w)
291 return Nothing
292connRead conn = do
293 c <- atomically $ getThreads
294 threadsRead c
295 where
296 getThreads =
297 case conn of SaneConnection c -> return c
298 ReadOnlyConnection c -> return c
299 ConnectionPair c w -> do
300 discardContents (threadsChannel w)
301 return c
302-}
303
304socketFamily (SockAddrInet _ _) = AF_INET
305socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
306socketFamily (SockAddrUnix _) = AF_UNIX
307
308killListener (thread,sock) = do sClose sock
309 -- killThread thread
310
311
312newConnection server params conkey h inout = do
313 hSetBuffering h NoBuffering
314 new <- connectionThreads h
315 started <- atomically $ newEmptyTMVar
316 kontvar <- atomically newEmptyTMVar
317 forkIO $ do
318 getkont <- atomically $ takeTMVar kontvar
319 kont <- atomically getkont
320 kont
321
322 pinglogic <- pingMachine (pingInterval params) (timeout params)
323 atomically $ do
324 current <- fmap (Map.lookup conkey) $ readTVar (conmap server)
325 case current of
326 Nothing -> do
327 (newCon,e) <- return $
328 if duplex params
329 then ( SaneConnection new, (conkey, Connection) )
330 else ( case inout of
331 In -> ReadOnlyConnection new
332 Out -> WriteOnlyConnection new
333 , (conkey, HalfConnection inout) )
334 modifyTVar' (conmap server) $ Map.insert conkey (kontvar,newCon)
335 announce e
336 putTMVar kontvar $ return $ do
337 atomically $ putTMVar started ()
338 handleEOF conkey kontvar newCon pinglogic
339 Just what@(mvar,_) -> do
340 putTMVar kontvar $ return $ return ()
341 putTMVar mvar $ do
342 kont <- updateConMap conkey new what pinglogic
343 putTMVar started ()
344 return kont
345 forkIO $ do -- inout==In || duplex params then forkIO $ do
346 -- warn $ "waiting read thread: " <> bshow (conkey,inout)
347 let forward =
348 case (inout,duplex params) of
349 (Out,True) -> const $ return ()
350 _ -> announce . (conkey,) . Got
351 atomically $ takeTMVar started
352 pingBump pinglogic -- start the ping timer
353 fix $ \loop -> do
354 -- warn $ "read thread: " <> bshow (conkey,inout)
355 mb <- threadsRead new
356 pingBump pinglogic
357 -- warn $ "got: " <> bshow (mb,(conkey,inout))
358 maybe (return ())
359 (atomically . forward >=> const loop)
360 mb
361 return ()
362 where
363
364 announce e = writeTChan (serverEvent server) e
365
366 handleEOF conkey mvar newCon pingTimer = do
367 action <- atomically . foldr1 orElse $
368 [ takeTMVar mvar >>= id -- passed continuation
369 , connWait newCon >> return eof
370 , pingWait pingTimer >>= return . sendPing
371 ]
372 action :: IO ()
373 where
374 eof = do
375 -- warn $ "EOF " <>bshow conkey
376 pingCancel pingTimer -- force cleanup of ping thread
377 atomically $ do connFlush newCon
378 announce (conkey,EOF)
379 modifyTVar' (conmap server)
380 $ Map.delete conkey
381 -- warn $ "fin-EOF "<>bshow conkey
382
383 sendPing PingTimeOut = do atomically (connClose newCon)
384 eof
385 sendPing PingIdle = do
386 atomically . announce $ (conkey,RequiresPing)
387 handleEOF conkey mvar newCon pingTimer
388
389
390 updateConMap conkey new (mvar,replaced) pingTimer = do
391 new' <-
392 if duplex params then do
393 announce (conkey,EOF)
394 connClose replaced
395 announce $ (conkey,Connection)
396 return $ SaneConnection new
397 else
398 case replaced of
399 WriteOnlyConnection w | inout==In ->
400 do announce (conkey,Connection)
401 return $ ConnectionPair new w
402 ReadOnlyConnection r | inout==Out ->
403 do announce (conkey,Connection)
404 return $ ConnectionPair r new
405 _ -> do -- connFlush todo
406 announce (conkey, EOF)
407 connClose replaced
408 announce (conkey, HalfConnection inout)
409 return $ case inout of
410 In -> ReadOnlyConnection new
411 Out -> WriteOnlyConnection new
412 modifyTVar' (conmap server) $ Map.insert conkey (mvar,new')
413 return $ handleEOF conkey mvar new' pingTimer
414
415acceptLoop server params sock = handle (acceptException server params sock) $ do
416 con <- accept sock
417 conkey <- makeConnKey params con
418 h <- socketToHandle (fst con) ReadWriteMode
419 newConnection server params conkey h In
420 acceptLoop server params sock
421
422acceptException server params sock ioerror = do
423 sClose sock
424 case show (ioeGetErrorType ioerror) of
425 "resource exhausted" -> do -- try again
426 warn ("acceptLoop: resource exhasted")
427 threadDelay 500000
428 acceptLoop server params sock
429 "invalid argument" -> do -- quit on closed socket
430 return ()
431 message -> do -- unexpected exception
432 warn ("acceptLoop: "<>bshow message)
433 return ()
434
435
436
437getPacket h = do hWaitForInput h (-1)
438 hGetNonBlocking h 1024
439
440
441
442-- | 'ConnectionThreads' is an interface to a pair of threads
443-- that are reading and writing a 'Handle'.
444data ConnectionThreads = ConnectionThreads
445 { threadsWriter :: TMVar (Maybe ByteString)
446 , threadsChannel :: TChan ByteString
447 , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close
448 }
449
450-- | This spawns the reader and writer threads and returns a newly
451-- constructed 'ConnectionThreads' object.
452connectionThreads :: Handle -> IO ConnectionThreads
453connectionThreads h = do
454
455 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
456 writerThread <- forkIO . fix $ \loop -> do
457 let finished = do -- warn $ "finished write"
458 hClose h -- quit reader
459 atomically $ putTMVar donew ()
460 mb <- atomically $ readTMVar outs
461 case mb of Just bs -> handle (\(SomeException e)->finished)
462 (do S.hPutStr h bs
463 atomically $ takeTMVar outs
464 loop)
465 Nothing -> finished
466
467 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
468 readerThread <- forkIO $ do
469 let finished e = do
470 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
471 let _ = fmap ioeGetErrorType e -- type hint
472 atomically $ do putTMVar outs Nothing -- quit writer
473 putTMVar doner ()
474 handle (finished . Just) . fix $ \loop -> do
475 packet <- getPacket h
476 atomically $ writeTChan incomming packet
477 isEof <- liftIO $ hIsEOF h
478 if isEof then finished Nothing else loop
479
480 let wait = do readTMVar donew
481 readTMVar doner
482 return ()
483 return ConnectionThreads { threadsWriter = outs
484 , threadsChannel = incomming
485 , threadsWait = wait }
486
487
488-- | 'threadsWrite' writes the given 'ByteString' to the
489-- 'ConnectionThreads' object. It blocks until the ByteString
490-- is written and 'True' is returned, or the connection is
491-- interrupted and 'False' is returned.
492threadsWrite :: ConnectionThreads -> ByteString -> IO Bool
493threadsWrite c bs = atomically $
494 orElse (const False `fmap` threadsWait c)
495 (const True `fmap` putTMVar (threadsWriter c) (Just bs))
496
497-- | 'threadsClose' signals for the 'ConnectionThreads' object
498-- to quit and close the associated 'Handle'. This operation
499-- is non-blocking, follow it with 'threadsWait' if you want
500-- to wait for the operation to complete.
501threadsClose :: ConnectionThreads -> STM ()
502threadsClose c = do
503 let mvar = threadsWriter c
504 v <- tryReadTMVar mvar
505 case v of
506 Just Nothing -> return () -- already closed
507 _ -> putTMVar mvar Nothing
508
509-- | 'threadsRead' blocks until a 'ByteString' is available which
510-- is returned to the caller, or the connection is interrupted and
511-- 'Nothing' is returned.
512threadsRead :: ConnectionThreads -> IO (Maybe ByteString)
513threadsRead c = atomically $
514 orElse (const Nothing `fmap` threadsWait c)
515 (Just `fmap` readTChan (threadsChannel c))
516
517-- | A 'ConnectionState' is an interface to a single 'ConnectionThreads'
518-- or to a pair of 'ConnectionThreads' objects that are considered as one
519-- connection.
520data ConnectionState =
521 SaneConnection ConnectionThreads
522 -- ^ ordinary read/write connection
523 | WriteOnlyConnection ConnectionThreads
524 | ReadOnlyConnection ConnectionThreads
525 | ConnectionPair ConnectionThreads ConnectionThreads
526 -- ^ Two 'ConnectionThreads' objects, read operations use the
527 -- first, write operations use the second.
528
529
530
531connWrite :: ConnectionState -> ByteString -> IO Bool
532connWrite (ReadOnlyConnection _) bs = return False
533connWrite conn bs = threadsWrite c bs
534 where
535 c = case conn of SaneConnection c -> c
536 WriteOnlyConnection c -> c
537 ConnectionPair _ c -> c
538
539
540mapConn :: Bool ->
541 (ConnectionThreads -> STM ()) -> ConnectionState -> STM ()
542mapConn both action c =
543 case c of
544 SaneConnection rw -> action rw
545 ReadOnlyConnection r -> action r
546 WriteOnlyConnection w -> action w
547 ConnectionPair r w -> do
548 rem <- orElse (const w `fmap` action r)
549 (const r `fmap` action w)
550 when both $ action rem
551
552connClose :: ConnectionState -> STM ()
553connClose c = mapConn True threadsClose c
554
555connWait :: ConnectionState -> STM ()
556connWait c = mapConn False threadsWait c
557
558connFlush c =
559 case c of
560 SaneConnection rw -> waitChan rw
561 ReadOnlyConnection r -> waitChan r
562 WriteOnlyConnection w -> return ()
563 ConnectionPair r w -> waitChan r
564 where
565 waitChan t = do
566 b <- isEmptyTChan (threadsChannel t)
567 when (not b) retry
568
569bshow e = S.pack . show $ e
570warn str = S.hPutStrLn stderr str >> hFlush stderr
571
572
573data PingEvent = PingIdle | PingTimeOut
574
575data PingMachine = PingMachine
576 { pingIdle :: PingInterval
577 , pingTimeOut :: TimeOut
578 , pingDelay :: TMVar (Int,PingEvent)
579 , pingEvent :: TMVar PingEvent
580 , pingStarted :: TVar Bool -- True when a threadDelay is running
581 , pingThread :: ThreadId
582 }
583
584pingMachine :: PingInterval -> TimeOut -> IO PingMachine
585pingMachine idle timeout = do
586 me <- do
587 (delayVar,eventVar,startedVar) <- atomically $ do
588 d <- newEmptyTMVar
589 e <- newEmptyTMVar
590 s <- newTVar False
591 return (d,e,s)
592 return PingMachine { pingIdle = idle
593 , pingTimeOut = timeout
594 , pingDelay = delayVar
595 , pingEvent = eventVar
596 , pingStarted = startedVar
597 , pingThread = undefined }
598 thread <- forkIO . when (pingIdle me /=0) . fix $
599 \loop -> do
600 (delay,event) <- atomically $ takeTMVar (pingDelay me)
601 when (delay /= 0) $ do
602 handle (\(ErrorCall _)-> do
603 atomically $ writeTVar (pingStarted me) False
604 loop)
605 (do atomically $ writeTVar (pingStarted me) True
606 threadDelay delay
607 atomically $ writeTVar (pingStarted me) False
608 atomically $ putTMVar (pingEvent me) event
609 case event of PingTimeOut -> return ()
610 PingIdle -> loop)
611
612 return me { pingThread = thread }
613
614pingCancel :: PingMachine -> IO ()
615pingCancel me = do
616 b <- atomically $ do
617 tryTakeTMVar (pingDelay me) -- no hang
618 putTMVar (pingDelay me) (0,PingTimeOut)
619 readTVar (pingStarted me)
620 when b $ throwTo (pingThread me) $ ErrorCall ""
621
622pingBump :: PingMachine -> IO ()
623pingBump me = do
624 b <- atomically $ do
625 when (pingIdle me /= 0) $
626 putTMVar (pingDelay me) (1000*pingIdle me,PingIdle)
627 readTVar (pingStarted me)
628 when b $ throwTo (pingThread me) $ ErrorCall ""
629
630pingWait :: PingMachine -> STM PingEvent
631pingWait me = do
632 e <- takeTMVar (pingEvent me)
633 case e of
634 PingIdle -> putTMVar (pingDelay me)
635 (1000*pingTimeOut me,PingTimeOut)
636 PingTimeOut -> putTMVar (pingDelay me)
637 (0,PingTimeOut)
638 return e
639
640