diff options
author | joe <joe@jerkface.net> | 2017-11-16 21:01:52 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-16 21:01:52 -0500 |
commit | d0311af98728855128b6417a4a591186a42345f9 (patch) | |
tree | 0c0c7e7882f4f8bd52a702872f73c9502cb949e0 /Presence/Server.hs | |
parent | 0abae348d1bec63e964a52b2c5b513048225d4a4 (diff) |
Changed Connection event to provide XML.Event streams.
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r-- | Presence/Server.hs | 83 |
1 files changed, 37 insertions, 46 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index 5f85c262..46b32b81 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -24,6 +24,7 @@ module Server where | |||
24 | 24 | ||
25 | import Data.ByteString (ByteString,hGetNonBlocking) | 25 | import Data.ByteString (ByteString,hGetNonBlocking) |
26 | import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) | 26 | import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) |
27 | import Data.Conduit ( Source, Sink, Flush ) | ||
27 | #if MIN_VERSION_containers(0,5,0) | 28 | #if MIN_VERSION_containers(0,5,0) |
28 | import qualified Data.Map.Strict as Map | 29 | import qualified Data.Map.Strict as Map |
29 | import Data.Map.Strict (Map) | 30 | import Data.Map.Strict (Map) |
@@ -160,9 +161,7 @@ data InOrOut = In | Out | |||
160 | -- | These events may be read from 'serverEvent' TChannel. | 161 | -- | These events may be read from 'serverEvent' TChannel. |
161 | -- | 162 | -- |
162 | data ConnectionEvent b | 163 | data ConnectionEvent b |
163 | = Got b | 164 | = Connection (STM Bool) (Source IO b) (Sink (Flush b) IO ()) |
164 | -- ^ Arrival of data from a socket | ||
165 | | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) | ||
166 | -- ^ A new connection was established | 165 | -- ^ A new connection was established |
167 | | ConnectFailure SockAddr | 166 | | ConnectFailure SockAddr |
168 | -- ^ A 'Connect' command failed. | 167 | -- ^ A 'Connect' command failed. |
@@ -192,16 +191,16 @@ data ConnectionRecord u | |||
192 | 191 | ||
193 | -- | This object accepts commands and signals events and maintains | 192 | -- | This object accepts commands and signals events and maintains |
194 | -- the list of currently listening ports and established connections. | 193 | -- the list of currently listening ports and established connections. |
195 | data Server a u releaseKey | 194 | data Server a u releaseKey b |
196 | = Server { serverCommand :: TMVar (ServerInstruction a u) | 195 | = Server { serverCommand :: TMVar (ServerInstruction a u) |
197 | , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) | 196 | , serverEvent :: TChan ((a,u), ConnectionEvent b) |
198 | , serverReleaseKey :: releaseKey | 197 | , serverReleaseKey :: releaseKey |
199 | , conmap :: TVar (Map a (ConnectionRecord u)) | 198 | , conmap :: TVar (Map a (ConnectionRecord u)) |
200 | , listenmap :: TVar (Map PortNumber ServerHandle) | 199 | , listenmap :: TVar (Map PortNumber ServerHandle) |
201 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) | 200 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) |
202 | } | 201 | } |
203 | 202 | ||
204 | control :: Server a u releaseKey -> ServerInstruction a u -> IO () | 203 | control :: Server a u releaseKey b -> ServerInstruction a u -> IO () |
205 | control sv = atomically . putTMVar (serverCommand sv) | 204 | control sv = atomically . putTMVar (serverCommand sv) |
206 | 205 | ||
207 | type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) | 206 | type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) |
@@ -226,8 +225,12 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io | |||
226 | -- > let loop = do | 225 | -- > let loop = do |
227 | -- > (_,event) <- atomically $ readTChan (serverEvent sv) | 226 | -- > (_,event) <- atomically $ readTChan (serverEvent sv) |
228 | -- > case event of | 227 | -- > case event of |
229 | -- > Got bytes -> putStrLn $ "got: " ++ show bytes | 228 | -- > Connection getPingFlag readData writeData -> do |
230 | -- > _ -> return () | 229 | -- > forkIO $ do |
230 | -- > fix $ \readLoop -> do | ||
231 | -- > readData >>= mapM $ \bytes -> | ||
232 | -- > putStrLn $ "got: " ++ show bytes | ||
233 | -- > readLoop | ||
231 | -- > case event of EOF -> return () | 234 | -- > case event of EOF -> return () |
232 | -- > _ -> loop | 235 | -- > _ -> loop |
233 | -- > liftIO loop | 236 | -- > liftIO loop |
@@ -238,8 +241,10 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io | |||
238 | server :: | 241 | server :: |
239 | -- forall (m :: * -> *) a u conkey releaseKey. | 242 | -- forall (m :: * -> *) a u conkey releaseKey. |
240 | (Show conkey, MonadIO m, Ord conkey) => | 243 | (Show conkey, MonadIO m, Ord conkey) => |
241 | Allocate releaseKey m -> m (Server conkey u releaseKey) | 244 | Allocate releaseKey m |
242 | server allocate = do | 245 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) |
246 | -> m (Server conkey u releaseKey x) | ||
247 | server allocate sessionConduits = do | ||
243 | (key,cmds) <- allocate (atomically newEmptyTMVar) | 248 | (key,cmds) <- allocate (atomically newEmptyTMVar) |
244 | (atomically . flip putTMVar Quit) | 249 | (atomically . flip putTMVar Quit) |
245 | server <- liftIO . atomically $ do | 250 | server <- liftIO . atomically $ do |
@@ -288,6 +293,8 @@ server allocate = do | |||
288 | `fmap` atomically (readTVar $ listenmap server) | 293 | `fmap` atomically (readTVar $ listenmap server) |
289 | when (not listening) $ do | 294 | when (not listening) $ do |
290 | 295 | ||
296 | hPutStrLn stderr $ "Started listening on "++show port | ||
297 | |||
291 | let family = AF_INET6 | 298 | let family = AF_INET6 |
292 | address = case family of | 299 | address = case family of |
293 | AF_INET -> SockAddrInet port iNADDR_ANY | 300 | AF_INET -> SockAddrInet port iNADDR_ANY |
@@ -297,13 +304,14 @@ server allocate = do | |||
297 | { serverWarn = hPutStrLn stderr | 304 | { serverWarn = hPutStrLn stderr |
298 | , serverSession = \sock _ h -> do | 305 | , serverSession = \sock _ h -> do |
299 | (conkey,u) <- makeConnKey params sock | 306 | (conkey,u) <- makeConnKey params sock |
300 | _ <- newConnection server params conkey u h In | 307 | _ <- newConnection server sessionConduits params conkey u h In |
301 | return () | 308 | return () |
302 | } | 309 | } |
303 | 310 | ||
304 | atomically $ listenmap server `modifyTVar'` Map.insert port sserv | 311 | atomically $ listenmap server `modifyTVar'` Map.insert port sserv |
305 | 312 | ||
306 | doit server (Ignore port) = do | 313 | doit server (Ignore port) = do |
314 | hPutStrLn stderr $ "Stopping listen on "++show port | ||
307 | mb <- atomically $ do | 315 | mb <- atomically $ do |
308 | map <- readTVar $ listenmap server | 316 | map <- readTVar $ listenmap server |
309 | modifyTVar' (listenmap server) $ Map.delete port | 317 | modifyTVar' (listenmap server) $ Map.delete port |
@@ -342,7 +350,7 @@ server allocate = do | |||
342 | connect sock addr | 350 | connect sock addr |
343 | (conkey,u) <- makeConnKey params (restrictSocket sock) | 351 | (conkey,u) <- makeConnKey params (restrictSocket sock) |
344 | h <- socketToHandle sock ReadWriteMode | 352 | h <- socketToHandle sock ReadWriteMode |
345 | newConnection server params conkey u h Out | 353 | newConnection server sessionConduits params conkey u h Out |
346 | return () | 354 | return () |
347 | 355 | ||
348 | doit server (ConnectWithEndlessRetry addr params interval) = do | 356 | doit server (ConnectWithEndlessRetry addr params interval) = do |
@@ -380,7 +388,7 @@ server allocate = do | |||
380 | connect sock addr | 388 | connect sock addr |
381 | (conkey,u) <- makeConnKey params (restrictSocket sock) | 389 | (conkey,u) <- makeConnKey params (restrictSocket sock) |
382 | h <- socketToHandle sock ReadWriteMode | 390 | h <- socketToHandle sock ReadWriteMode |
383 | threads <- newConnection server params conkey u h Out | 391 | threads <- newConnection server sessionConduits params conkey u h Out |
384 | atomically $ do threadsWait threads | 392 | atomically $ do threadsWait threads |
385 | readTVar retryVar | 393 | readTVar retryVar |
386 | fin_utc <- getCurrentTime | 394 | fin_utc <- getCurrentTime |
@@ -428,30 +436,29 @@ socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | |||
428 | socketFamily (SockAddrUnix _) = AF_UNIX | 436 | socketFamily (SockAddrUnix _) = AF_UNIX |
429 | 437 | ||
430 | 438 | ||
431 | conevent :: ConnectionState -> ConnectionEvent b | 439 | conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) |
432 | conevent con = Connection pingflag read write | 440 | -> ConnectionState |
441 | -> ConnectionEvent x | ||
442 | conevent sessionConduits con = Connection pingflag read write | ||
433 | where | 443 | where |
434 | pingflag = swapTVar (pingFlag (connPingTimer con)) False | 444 | pingflag = swapTVar (pingFlag (connPingTimer con)) False |
435 | read = connRead con | 445 | (read,write) = sessionConduits (connRead con) (connWrite con) |
436 | write = connWrite con | ||
437 | 446 | ||
438 | newConnection :: Ord a | 447 | newConnection :: Ord a |
439 | => Server a u1 releaseKey | 448 | => Server a u1 releaseKey x |
449 | -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) ) | ||
440 | -> ConnectionParameters conkey u | 450 | -> ConnectionParameters conkey u |
441 | -> a | 451 | -> a |
442 | -> u1 | 452 | -> u1 |
443 | -> Handle | 453 | -> Handle |
444 | -> InOrOut | 454 | -> InOrOut |
445 | -> IO ConnectionThreads | 455 | -> IO ConnectionThreads |
446 | newConnection server params conkey u h inout = do | 456 | newConnection server sessionConduits params conkey u h inout = do |
447 | hSetBuffering h NoBuffering | 457 | hSetBuffering h NoBuffering |
448 | let (forward,idle_ms,timeout_ms) = | 458 | let (idle_ms,timeout_ms) = |
449 | case (inout,duplex params) of | 459 | case (inout,duplex params) of |
450 | (Out,False) -> ( const $ return () | 460 | (Out,False) -> ( 0, 0 ) |
451 | , 0 | 461 | _ -> ( pingInterval params |
452 | , 0 ) | ||
453 | _ -> ( announce . ((conkey,u),) . Got | ||
454 | , pingInterval params | ||
455 | , timeout params ) | 462 | , timeout params ) |
456 | 463 | ||
457 | new <- do pinglogic <- forkPingMachine idle_ms timeout_ms | 464 | new <- do pinglogic <- forkPingMachine idle_ms timeout_ms |
@@ -473,7 +480,7 @@ newConnection server params conkey u h inout = do | |||
473 | (newCon,e) <- return $ | 480 | (newCon,e) <- return $ |
474 | if duplex params | 481 | if duplex params |
475 | then let newcon = SaneConnection new | 482 | then let newcon = SaneConnection new |
476 | in ( newcon, ((conkey,u), conevent newcon) ) | 483 | in ( newcon, ((conkey,u), conevent sessionConduits newcon) ) |
477 | else ( case inout of | 484 | else ( case inout of |
478 | In -> ReadOnlyConnection new | 485 | In -> ReadOnlyConnection new |
479 | Out -> WriteOnlyConnection new | 486 | Out -> WriteOnlyConnection new |
@@ -496,22 +503,6 @@ newConnection server params conkey u h inout = do | |||
496 | kont <- updateConMap conkey u new what | 503 | kont <- updateConMap conkey u new what |
497 | putTMVar started () | 504 | putTMVar started () |
498 | return kont | 505 | return kont |
499 | #ifdef TEST | ||
500 | -- enable this for 'Got' events | ||
501 | forkIO $ do -- inout==In || duplex params then forkIO $ do | ||
502 | warn $ "waiting gots thread: " <> bshow (conkey,inout) | ||
503 | atomically $ takeTMVar started | ||
504 | -- pingBump pinglogic -- start the ping timer | ||
505 | fix $ \loop -> do | ||
506 | -- warn $ "read thread: " <> bshow (conkey,inout) | ||
507 | mb <- threadsRead new | ||
508 | -- pingBump pinglogic | ||
509 | warn $ "got: " <> bshow (mb,(conkey,inout)) | ||
510 | maybe (return ()) | ||
511 | (atomically . forward >=> const loop) | ||
512 | mb | ||
513 | warn $ "quit-gots: " <> bshow (conkey,inout) | ||
514 | #endif | ||
515 | return new | 506 | return new |
516 | where | 507 | where |
517 | 508 | ||
@@ -562,17 +553,17 @@ newConnection server params conkey u h inout = do | |||
562 | announce ((conkey,u),EOF) | 553 | announce ((conkey,u),EOF) |
563 | connClose replaced | 554 | connClose replaced |
564 | let newcon = SaneConnection new | 555 | let newcon = SaneConnection new |
565 | announce $ ((conkey,u),conevent newcon) | 556 | announce $ ((conkey,u),conevent sessionConduits newcon) |
566 | return $ newcon | 557 | return $ newcon |
567 | else | 558 | else |
568 | case replaced of | 559 | case replaced of |
569 | WriteOnlyConnection w | inout==In -> | 560 | WriteOnlyConnection w | inout==In -> |
570 | do let newcon = ConnectionPair new w | 561 | do let newcon = ConnectionPair new w |
571 | announce ((conkey,u),conevent newcon) | 562 | announce ((conkey,u),conevent sessionConduits newcon) |
572 | return newcon | 563 | return newcon |
573 | ReadOnlyConnection r | inout==Out -> | 564 | ReadOnlyConnection r | inout==Out -> |
574 | do let newcon = ConnectionPair r new | 565 | do let newcon = ConnectionPair r new |
575 | announce ((conkey,u),conevent newcon) | 566 | announce ((conkey,u),conevent sessionConduits newcon) |
576 | return newcon | 567 | return newcon |
577 | _ -> do -- connFlush todo | 568 | _ -> do -- connFlush todo |
578 | announce ((conkey,u0), EOF) | 569 | announce ((conkey,u0), EOF) |