summaryrefslogtreecommitdiff
path: root/Presence/Server.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-16 21:01:52 -0500
committerjoe <joe@jerkface.net>2017-11-16 21:01:52 -0500
commitd0311af98728855128b6417a4a591186a42345f9 (patch)
tree0c0c7e7882f4f8bd52a702872f73c9502cb949e0 /Presence/Server.hs
parent0abae348d1bec63e964a52b2c5b513048225d4a4 (diff)
Changed Connection event to provide XML.Event streams.
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r--Presence/Server.hs83
1 files changed, 37 insertions, 46 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 5f85c262..46b32b81 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -24,6 +24,7 @@ module Server where
24 24
25import Data.ByteString (ByteString,hGetNonBlocking) 25import Data.ByteString (ByteString,hGetNonBlocking)
26import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) 26import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack)
27import Data.Conduit ( Source, Sink, Flush )
27#if MIN_VERSION_containers(0,5,0) 28#if MIN_VERSION_containers(0,5,0)
28import qualified Data.Map.Strict as Map 29import qualified Data.Map.Strict as Map
29import Data.Map.Strict (Map) 30import Data.Map.Strict (Map)
@@ -160,9 +161,7 @@ data InOrOut = In | Out
160-- | These events may be read from 'serverEvent' TChannel. 161-- | These events may be read from 'serverEvent' TChannel.
161-- 162--
162data ConnectionEvent b 163data ConnectionEvent b
163 = Got b 164 = Connection (STM Bool) (Source IO b) (Sink (Flush b) IO ())
164 -- ^ Arrival of data from a socket
165 | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool)
166 -- ^ A new connection was established 165 -- ^ A new connection was established
167 | ConnectFailure SockAddr 166 | ConnectFailure SockAddr
168 -- ^ A 'Connect' command failed. 167 -- ^ A 'Connect' command failed.
@@ -192,16 +191,16 @@ data ConnectionRecord u
192 191
193-- | This object accepts commands and signals events and maintains 192-- | This object accepts commands and signals events and maintains
194-- the list of currently listening ports and established connections. 193-- the list of currently listening ports and established connections.
195data Server a u releaseKey 194data Server a u releaseKey b
196 = Server { serverCommand :: TMVar (ServerInstruction a u) 195 = Server { serverCommand :: TMVar (ServerInstruction a u)
197 , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) 196 , serverEvent :: TChan ((a,u), ConnectionEvent b)
198 , serverReleaseKey :: releaseKey 197 , serverReleaseKey :: releaseKey
199 , conmap :: TVar (Map a (ConnectionRecord u)) 198 , conmap :: TVar (Map a (ConnectionRecord u))
200 , listenmap :: TVar (Map PortNumber ServerHandle) 199 , listenmap :: TVar (Map PortNumber ServerHandle)
201 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) 200 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
202 } 201 }
203 202
204control :: Server a u releaseKey -> ServerInstruction a u -> IO () 203control :: Server a u releaseKey b -> ServerInstruction a u -> IO ()
205control sv = atomically . putTMVar (serverCommand sv) 204control sv = atomically . putTMVar (serverCommand sv)
206 205
207type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) 206type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
@@ -226,8 +225,12 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io
226-- > let loop = do 225-- > let loop = do
227-- > (_,event) <- atomically $ readTChan (serverEvent sv) 226-- > (_,event) <- atomically $ readTChan (serverEvent sv)
228-- > case event of 227-- > case event of
229-- > Got bytes -> putStrLn $ "got: " ++ show bytes 228-- > Connection getPingFlag readData writeData -> do
230-- > _ -> return () 229-- > forkIO $ do
230-- > fix $ \readLoop -> do
231-- > readData >>= mapM $ \bytes ->
232-- > putStrLn $ "got: " ++ show bytes
233-- > readLoop
231-- > case event of EOF -> return () 234-- > case event of EOF -> return ()
232-- > _ -> loop 235-- > _ -> loop
233-- > liftIO loop 236-- > liftIO loop
@@ -238,8 +241,10 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io
238server :: 241server ::
239 -- forall (m :: * -> *) a u conkey releaseKey. 242 -- forall (m :: * -> *) a u conkey releaseKey.
240 (Show conkey, MonadIO m, Ord conkey) => 243 (Show conkey, MonadIO m, Ord conkey) =>
241 Allocate releaseKey m -> m (Server conkey u releaseKey) 244 Allocate releaseKey m
242server allocate = do 245 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
246 -> m (Server conkey u releaseKey x)
247server allocate sessionConduits = do
243 (key,cmds) <- allocate (atomically newEmptyTMVar) 248 (key,cmds) <- allocate (atomically newEmptyTMVar)
244 (atomically . flip putTMVar Quit) 249 (atomically . flip putTMVar Quit)
245 server <- liftIO . atomically $ do 250 server <- liftIO . atomically $ do
@@ -288,6 +293,8 @@ server allocate = do
288 `fmap` atomically (readTVar $ listenmap server) 293 `fmap` atomically (readTVar $ listenmap server)
289 when (not listening) $ do 294 when (not listening) $ do
290 295
296 hPutStrLn stderr $ "Started listening on "++show port
297
291 let family = AF_INET6 298 let family = AF_INET6
292 address = case family of 299 address = case family of
293 AF_INET -> SockAddrInet port iNADDR_ANY 300 AF_INET -> SockAddrInet port iNADDR_ANY
@@ -297,13 +304,14 @@ server allocate = do
297 { serverWarn = hPutStrLn stderr 304 { serverWarn = hPutStrLn stderr
298 , serverSession = \sock _ h -> do 305 , serverSession = \sock _ h -> do
299 (conkey,u) <- makeConnKey params sock 306 (conkey,u) <- makeConnKey params sock
300 _ <- newConnection server params conkey u h In 307 _ <- newConnection server sessionConduits params conkey u h In
301 return () 308 return ()
302 } 309 }
303 310
304 atomically $ listenmap server `modifyTVar'` Map.insert port sserv 311 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
305 312
306 doit server (Ignore port) = do 313 doit server (Ignore port) = do
314 hPutStrLn stderr $ "Stopping listen on "++show port
307 mb <- atomically $ do 315 mb <- atomically $ do
308 map <- readTVar $ listenmap server 316 map <- readTVar $ listenmap server
309 modifyTVar' (listenmap server) $ Map.delete port 317 modifyTVar' (listenmap server) $ Map.delete port
@@ -342,7 +350,7 @@ server allocate = do
342 connect sock addr 350 connect sock addr
343 (conkey,u) <- makeConnKey params (restrictSocket sock) 351 (conkey,u) <- makeConnKey params (restrictSocket sock)
344 h <- socketToHandle sock ReadWriteMode 352 h <- socketToHandle sock ReadWriteMode
345 newConnection server params conkey u h Out 353 newConnection server sessionConduits params conkey u h Out
346 return () 354 return ()
347 355
348 doit server (ConnectWithEndlessRetry addr params interval) = do 356 doit server (ConnectWithEndlessRetry addr params interval) = do
@@ -380,7 +388,7 @@ server allocate = do
380 connect sock addr 388 connect sock addr
381 (conkey,u) <- makeConnKey params (restrictSocket sock) 389 (conkey,u) <- makeConnKey params (restrictSocket sock)
382 h <- socketToHandle sock ReadWriteMode 390 h <- socketToHandle sock ReadWriteMode
383 threads <- newConnection server params conkey u h Out 391 threads <- newConnection server sessionConduits params conkey u h Out
384 atomically $ do threadsWait threads 392 atomically $ do threadsWait threads
385 readTVar retryVar 393 readTVar retryVar
386 fin_utc <- getCurrentTime 394 fin_utc <- getCurrentTime
@@ -428,30 +436,29 @@ socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
428socketFamily (SockAddrUnix _) = AF_UNIX 436socketFamily (SockAddrUnix _) = AF_UNIX
429 437
430 438
431conevent :: ConnectionState -> ConnectionEvent b 439conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
432conevent con = Connection pingflag read write 440 -> ConnectionState
441 -> ConnectionEvent x
442conevent sessionConduits con = Connection pingflag read write
433 where 443 where
434 pingflag = swapTVar (pingFlag (connPingTimer con)) False 444 pingflag = swapTVar (pingFlag (connPingTimer con)) False
435 read = connRead con 445 (read,write) = sessionConduits (connRead con) (connWrite con)
436 write = connWrite con
437 446
438newConnection :: Ord a 447newConnection :: Ord a
439 => Server a u1 releaseKey 448 => Server a u1 releaseKey x
449 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
440 -> ConnectionParameters conkey u 450 -> ConnectionParameters conkey u
441 -> a 451 -> a
442 -> u1 452 -> u1
443 -> Handle 453 -> Handle
444 -> InOrOut 454 -> InOrOut
445 -> IO ConnectionThreads 455 -> IO ConnectionThreads
446newConnection server params conkey u h inout = do 456newConnection server sessionConduits params conkey u h inout = do
447 hSetBuffering h NoBuffering 457 hSetBuffering h NoBuffering
448 let (forward,idle_ms,timeout_ms) = 458 let (idle_ms,timeout_ms) =
449 case (inout,duplex params) of 459 case (inout,duplex params) of
450 (Out,False) -> ( const $ return () 460 (Out,False) -> ( 0, 0 )
451 , 0 461 _ -> ( pingInterval params
452 , 0 )
453 _ -> ( announce . ((conkey,u),) . Got
454 , pingInterval params
455 , timeout params ) 462 , timeout params )
456 463
457 new <- do pinglogic <- forkPingMachine idle_ms timeout_ms 464 new <- do pinglogic <- forkPingMachine idle_ms timeout_ms
@@ -473,7 +480,7 @@ newConnection server params conkey u h inout = do
473 (newCon,e) <- return $ 480 (newCon,e) <- return $
474 if duplex params 481 if duplex params
475 then let newcon = SaneConnection new 482 then let newcon = SaneConnection new
476 in ( newcon, ((conkey,u), conevent newcon) ) 483 in ( newcon, ((conkey,u), conevent sessionConduits newcon) )
477 else ( case inout of 484 else ( case inout of
478 In -> ReadOnlyConnection new 485 In -> ReadOnlyConnection new
479 Out -> WriteOnlyConnection new 486 Out -> WriteOnlyConnection new
@@ -496,22 +503,6 @@ newConnection server params conkey u h inout = do
496 kont <- updateConMap conkey u new what 503 kont <- updateConMap conkey u new what
497 putTMVar started () 504 putTMVar started ()
498 return kont 505 return kont
499#ifdef TEST
500 -- enable this for 'Got' events
501 forkIO $ do -- inout==In || duplex params then forkIO $ do
502 warn $ "waiting gots thread: " <> bshow (conkey,inout)
503 atomically $ takeTMVar started
504 -- pingBump pinglogic -- start the ping timer
505 fix $ \loop -> do
506 -- warn $ "read thread: " <> bshow (conkey,inout)
507 mb <- threadsRead new
508 -- pingBump pinglogic
509 warn $ "got: " <> bshow (mb,(conkey,inout))
510 maybe (return ())
511 (atomically . forward >=> const loop)
512 mb
513 warn $ "quit-gots: " <> bshow (conkey,inout)
514#endif
515 return new 506 return new
516 where 507 where
517 508
@@ -562,17 +553,17 @@ newConnection server params conkey u h inout = do
562 announce ((conkey,u),EOF) 553 announce ((conkey,u),EOF)
563 connClose replaced 554 connClose replaced
564 let newcon = SaneConnection new 555 let newcon = SaneConnection new
565 announce $ ((conkey,u),conevent newcon) 556 announce $ ((conkey,u),conevent sessionConduits newcon)
566 return $ newcon 557 return $ newcon
567 else 558 else
568 case replaced of 559 case replaced of
569 WriteOnlyConnection w | inout==In -> 560 WriteOnlyConnection w | inout==In ->
570 do let newcon = ConnectionPair new w 561 do let newcon = ConnectionPair new w
571 announce ((conkey,u),conevent newcon) 562 announce ((conkey,u),conevent sessionConduits newcon)
572 return newcon 563 return newcon
573 ReadOnlyConnection r | inout==Out -> 564 ReadOnlyConnection r | inout==Out ->
574 do let newcon = ConnectionPair r new 565 do let newcon = ConnectionPair r new
575 announce ((conkey,u),conevent newcon) 566 announce ((conkey,u),conevent sessionConduits newcon)
576 return newcon 567 return newcon
577 _ -> do -- connFlush todo 568 _ -> do -- connFlush todo
578 announce ((conkey,u0), EOF) 569 announce ((conkey,u0), EOF)