diff options
-rw-r--r-- | Presence/Server.hs | 107 | ||||
-rw-r--r-- | xmppServer.hs | 102 |
2 files changed, 130 insertions, 79 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index 5e54ff27..1141f0b0 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -60,14 +60,16 @@ import Network.BSD | |||
60 | ( getProtocolNumber | 60 | ( getProtocolNumber |
61 | ) | 61 | ) |
62 | import Debug.Trace | 62 | import Debug.Trace |
63 | import Data.Time.Clock (UTCTime,getCurrentTime) | 63 | import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) |
64 | import Data.Time.Format (formatTime) | 64 | import Data.Time.Format (formatTime) |
65 | import System.Locale (defaultTimeLocale) | 65 | import System.Locale (defaultTimeLocale) |
66 | 66 | ||
67 | todo = error "unimplemented" | 67 | todo = error "unimplemented" |
68 | 68 | ||
69 | type TimeOut = Int -- ^ miliseconds | 69 | type Microseconds = Int |
70 | type PingInterval = Int -- ^ miliseconds | 70 | type Miliseconds = Int |
71 | type TimeOut = Miliseconds | ||
72 | type PingInterval = Miliseconds | ||
71 | 73 | ||
72 | -- | This object is passed with the 'Listen' and 'Connect' | 74 | -- | This object is passed with the 'Listen' and 'Connect' |
73 | -- instructions in order to control the behavior of the | 75 | -- instructions in order to control the behavior of the |
@@ -128,6 +130,10 @@ data ServerInstruction conkey | |||
128 | -- ^ listen for incomming connections | 130 | -- ^ listen for incomming connections |
129 | | Connect SockAddr (ConnectionParameters conkey) | 131 | | Connect SockAddr (ConnectionParameters conkey) |
130 | -- ^ connect to addresses | 132 | -- ^ connect to addresses |
133 | | ConnectWithEndlessRetry SockAddr | ||
134 | (ConnectionParameters conkey) | ||
135 | Miliseconds | ||
136 | -- ^ keep retrying the connection | ||
131 | | Ignore PortNumber | 137 | | Ignore PortNumber |
132 | -- ^ stop listening on specified port | 138 | -- ^ stop listening on specified port |
133 | | Send conkey ByteString | 139 | | Send conkey ByteString |
@@ -178,6 +184,7 @@ data Server a | |||
178 | , serverReleaseKey :: ReleaseKey | 184 | , serverReleaseKey :: ReleaseKey |
179 | , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) | 185 | , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) |
180 | , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) | 186 | , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) |
187 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) | ||
181 | } | 188 | } |
182 | 189 | ||
183 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | 190 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' |
@@ -210,11 +217,13 @@ server = do | |||
210 | tchan <- newTChan | 217 | tchan <- newTChan |
211 | conmap <- newTVar Map.empty | 218 | conmap <- newTVar Map.empty |
212 | listenmap<- newTVar Map.empty | 219 | listenmap<- newTVar Map.empty |
220 | retrymap <- newTVar Map.empty | ||
213 | return Server { serverCommand = cmds | 221 | return Server { serverCommand = cmds |
214 | , serverEvent = tchan | 222 | , serverEvent = tchan |
215 | , serverReleaseKey = key | 223 | , serverReleaseKey = key |
216 | , conmap = conmap | 224 | , conmap = conmap |
217 | , listenmap = listenmap | 225 | , listenmap = listenmap |
226 | , retrymap = retrymap | ||
218 | } | 227 | } |
219 | liftIO $ do | 228 | liftIO $ do |
220 | forkIO $ fix $ \loop -> do | 229 | forkIO $ fix $ \loop -> do |
@@ -230,6 +239,10 @@ server = do | |||
230 | closeAll server = liftIO $ do | 239 | closeAll server = liftIO $ do |
231 | listening <- atomically . readTVar $ listenmap server | 240 | listening <- atomically . readTVar $ listenmap server |
232 | mapM_ killListener (Map.elems listening) | 241 | mapM_ killListener (Map.elems listening) |
242 | let stopRetry (v,d) = do atomically $ writeTVar v False | ||
243 | interruptDelay d | ||
244 | retriers <- atomically . readTVar $ retrymap server | ||
245 | mapM_ stopRetry (Map.elems retriers) | ||
233 | cons <- atomically . readTVar $ conmap server | 246 | cons <- atomically . readTVar $ conmap server |
234 | atomically $ mapM_ (connClose . snd) (Map.elems cons) | 247 | atomically $ mapM_ (connClose . snd) (Map.elems cons) |
235 | atomically $ mapM_ (connWait . snd) (Map.elems cons) | 248 | atomically $ mapM_ (connWait . snd) (Map.elems cons) |
@@ -278,7 +291,7 @@ server = do | |||
278 | doit server (Connect addr params) = liftIO $ do | 291 | doit server (Connect addr params) = liftIO $ do |
279 | void . forkIO $ do | 292 | void . forkIO $ do |
280 | proto <- getProtocolNumber "tcp" | 293 | proto <- getProtocolNumber "tcp" |
281 | bracketOnError | 294 | sock <- bracketOnError |
282 | (socket (socketFamily addr) Stream proto) | 295 | (socket (socketFamily addr) Stream proto) |
283 | (\sock -> do -- only done if there's an error | 296 | (\sock -> do -- only done if there's an error |
284 | -- Weird hack: puting the would-be peer address | 297 | -- Weird hack: puting the would-be peer address |
@@ -288,12 +301,66 @@ server = do | |||
288 | atomically | 301 | atomically |
289 | $ writeTChan (serverEvent server) | 302 | $ writeTChan (serverEvent server) |
290 | $ (conkey,ConnectFailure addr)) | 303 | $ (conkey,ConnectFailure addr)) |
291 | $ \sock -> do | 304 | $ \sock -> do connect sock addr |
292 | connect sock addr | 305 | return sock |
293 | me <- getSocketName sock | 306 | me <- getSocketName sock |
294 | conkey <- makeConnKey params (sock,me) | 307 | conkey <- makeConnKey params (sock,me) |
295 | h <- socketToHandle sock ReadWriteMode | 308 | h <- socketToHandle sock ReadWriteMode |
296 | newConnection server params conkey h Out | 309 | newConnection server params conkey h Out |
310 | return () | ||
311 | |||
312 | doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do | ||
313 | proto <- getProtocolNumber "tcp" | ||
314 | void . forkIO $ do | ||
315 | resultVar <- atomically newEmptyTMVar | ||
316 | timer <- interruptableDelay | ||
317 | (retryVar,action) <- atomically $ do | ||
318 | let noop = return () | ||
319 | map <- readTVar (retrymap server) | ||
320 | let mb = Map.lookup addr map | ||
321 | action <- | ||
322 | maybe (return noop) | ||
323 | (\(v,d) -> do writeTVar v False | ||
324 | return $ interruptDelay d) | ||
325 | mb | ||
326 | v <- newTVar True | ||
327 | writeTVar (retrymap server) (Map.insert addr (v,timer) map) | ||
328 | return (v,action) | ||
329 | action | ||
330 | fix $ \retryLoop -> do | ||
331 | utc <- getCurrentTime | ||
332 | forkIO $ do | ||
333 | sock <- bracketOnError | ||
334 | (socket (socketFamily addr) Stream proto) | ||
335 | (\sock -> do -- only done if there's an error | ||
336 | -- Weird hack: puting the would-be peer address | ||
337 | -- instead of local socketName | ||
338 | conkey <- makeConnKey params (sock,addr) -- XXX: ? | ||
339 | sClose sock | ||
340 | atomically $ do | ||
341 | writeTChan (serverEvent server) | ||
342 | $ (conkey,ConnectFailure addr) | ||
343 | retry <- readTVar retryVar | ||
344 | putTMVar resultVar retry) | ||
345 | $ \sock -> do connect sock addr | ||
346 | return sock | ||
347 | me <- getSocketName sock | ||
348 | conkey <- makeConnKey params (sock,me) | ||
349 | h <- socketToHandle sock ReadWriteMode | ||
350 | threads <- newConnection server params conkey h Out | ||
351 | atomically $ do threadsWait threads | ||
352 | retry <- readTVar retryVar | ||
353 | putTMVar resultVar retry | ||
354 | retry <- atomically $ takeTMVar resultVar | ||
355 | fin_utc <- getCurrentTime | ||
356 | when retry $ do | ||
357 | let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) | ||
358 | expected = fromIntegral interval | ||
359 | when (retry && elapsed < expected) $ do | ||
360 | warn $ "Waiting to retry " <> bshow addr | ||
361 | startDelay timer (round $ 1000 * (expected-elapsed)) | ||
362 | warn $ "retry " <> bshow (retry,addr) | ||
363 | when retry $ retryLoop | ||
297 | 364 | ||
298 | 365 | ||
299 | -- INTERNAL ---------------------------------------------------------- | 366 | -- INTERNAL ---------------------------------------------------------- |
@@ -397,7 +464,7 @@ newConnection server params conkey h inout = do | |||
397 | mb | 464 | mb |
398 | warn $ "quit-gots: " <> bshow (conkey,inout) | 465 | warn $ "quit-gots: " <> bshow (conkey,inout) |
399 | #endif | 466 | #endif |
400 | return () | 467 | return new |
401 | where | 468 | where |
402 | 469 | ||
403 | announce e = writeTChan (serverEvent server) e | 470 | announce e = writeTChan (serverEvent server) e |
@@ -754,3 +821,29 @@ pingWait me = do | |||
754 | return e | 821 | return e |
755 | 822 | ||
756 | 823 | ||
824 | data InterruptableDelay = InterruptableDelay | ||
825 | { delayThread :: TMVar ThreadId | ||
826 | } | ||
827 | |||
828 | interruptableDelay :: IO InterruptableDelay | ||
829 | interruptableDelay = do | ||
830 | fmap InterruptableDelay | ||
831 | $ atomically newEmptyTMVar | ||
832 | startDelay :: InterruptableDelay -> Microseconds -> IO () | ||
833 | startDelay d interval = do | ||
834 | thread <- myThreadId | ||
835 | atomically $ putTMVar (delayThread d) thread | ||
836 | handle (\(ErrorCall _)-> warn $ "delay interrupted" ) $ do | ||
837 | threadDelay interval | ||
838 | void . atomically $ takeTMVar (delayThread d) | ||
839 | |||
840 | interruptDelay :: InterruptableDelay -> IO () | ||
841 | interruptDelay d = do | ||
842 | mthread <- atomically $ do | ||
843 | tryTakeTMVar (delayThread d) | ||
844 | flip (maybe $ return ()) mthread $ \thread -> do | ||
845 | throwTo thread (ErrorCall "Interrupted delay") | ||
846 | atomically $ do | ||
847 | putTMVar (delayThread d) thread | ||
848 | |||
849 | |||
diff --git a/xmppServer.hs b/xmppServer.hs index 4fbb775b..987e7dbc 100644 --- a/xmppServer.hs +++ b/xmppServer.hs | |||
@@ -27,14 +27,12 @@ import Data.Maybe (catMaybes,fromJust) | |||
27 | import Data.Monoid ( (<>) ) | 27 | import Data.Monoid ( (<>) ) |
28 | import Data.Text (Text) | 28 | import Data.Text (Text) |
29 | import qualified Data.Text as Text (pack) | 29 | import qualified Data.Text as Text (pack) |
30 | import qualified Data.Map as Map | ||
31 | 30 | ||
32 | import qualified Control.Concurrent.STM.UpdateStream as Slotted | 31 | import qualified Control.Concurrent.STM.UpdateStream as Slotted |
33 | import ControlMaybe | 32 | import ControlMaybe |
34 | import Nesting | 33 | import Nesting |
35 | import EventUtil | 34 | import EventUtil |
36 | import Server | 35 | import Server |
37 | import Data.Time.Clock (UTCTime,getCurrentTime) | ||
38 | 36 | ||
39 | addrToText :: SockAddr -> Text | 37 | addrToText :: SockAddr -> Text |
40 | addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) | 38 | addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) |
@@ -288,66 +286,49 @@ forkConnection k pingflag src snk stanzas = do | |||
288 | wlog $ "end reader fork: " ++ show k | 286 | wlog $ "end reader fork: " ++ show k |
289 | return output | 287 | return output |
290 | 288 | ||
289 | data ConnectionKey | ||
290 | = PeerKey { callBackAddress :: SockAddr } | ||
291 | | ClientKey { localAddress :: SockAddr } | ||
292 | deriving (Show, Ord, Eq) | ||
293 | |||
294 | {- | ||
295 | data Peer = Peer | ||
296 | { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis | ||
297 | , peerState :: TVar PeerState | ||
298 | } | ||
299 | data PeerState | ||
300 | = PeerPendingConnect UTCTime | ||
301 | | PeerPendingAccept UTCTime | ||
302 | | PeerConnected (TChan Stanza) | ||
303 | -} | ||
304 | |||
305 | peerKey (sock,addr) = do | ||
306 | peer <- | ||
307 | sIsConnected sock >>= \c -> | ||
308 | if c then getPeerName sock -- addr is normally socketName | ||
309 | else return addr -- Weird hack: addr is would-be peer name | ||
310 | return $ PeerKey (peer `withPort` fromIntegral peerport) | ||
311 | |||
312 | clientKey (sock,addr) = return $ ClientKey addr | ||
313 | |||
291 | monitor sv params = do | 314 | monitor sv params = do |
292 | chan <- return $ serverEvent sv | 315 | chan <- return $ serverEvent sv |
293 | stanzas <- atomically newTChan | 316 | stanzas <- atomically newTChan |
294 | peersVar <- atomically $ newTVar Map.empty | ||
295 | let doConnect utc k = do | ||
296 | peers <- readTVar peersVar | ||
297 | let mb = Map.lookup k peers | ||
298 | maybe (do false <- newTVar False | ||
299 | pending <- newTVar (PeerPendingConnect utc) | ||
300 | let v = Peer { peerWanted = false | ||
301 | , peerState = pending } | ||
302 | writeTVar (peersVar) $ Map.insert k v peers) | ||
303 | (\peer -> | ||
304 | writeTVar (peerState peer) | ||
305 | $ PeerPendingConnect utc) | ||
306 | mb | ||
307 | fix $ \loop -> do | 317 | fix $ \loop -> do |
308 | action <- atomically $ foldr1 orElse | 318 | action <- atomically $ foldr1 orElse |
309 | [ readTChan chan >>= \(k,e) -> return $ do | 319 | [ readTChan chan >>= \(k,e) -> return $ do |
310 | case e of | 320 | case e of |
311 | Connection pingflag conread conwrite -> do | 321 | Connection pingflag conread conwrite -> do |
312 | wlog $ tomsg k "Connection" | 322 | wlog $ tomsg k "Connection" |
313 | let (xsrc,xsnk) = xmlStream conread conwrite | 323 | let (xsrc,xsnk) = xmlStream conread conwrite |
314 | outs <- forkConnection k pingflag xsrc xsnk stanzas | 324 | forkConnection k pingflag xsrc xsnk stanzas |
315 | atomically $ do | 325 | return () |
316 | peers <- readTVar peersVar | ||
317 | let mb = Map.lookup k peers | ||
318 | maybe (do false <- newTVar True -- False -- TODO: should be False | ||
319 | connected <- newTVar (PeerConnected outs) | ||
320 | let v = Peer { peerWanted = false | ||
321 | , peerState = connected } | ||
322 | writeTVar (peersVar) $ Map.insert k v peers) | ||
323 | (\peer -> do | ||
324 | writeTVar (peerWanted peer) True -- TODO REMOVE | ||
325 | writeTVar (peerState peer) | ||
326 | $ PeerConnected outs) | ||
327 | mb | ||
328 | return () | ||
329 | ConnectFailure addr -> do | 326 | ConnectFailure addr -> do |
330 | wlog $ tomsg k "ConnectFailure" | 327 | wlog $ tomsg k "ConnectFailure" |
331 | action <- atomically $ do | ||
332 | peers <- readTVar peersVar | ||
333 | let mb = Map.lookup k peers | ||
334 | maybe (return $ return ()) | ||
335 | (\peer -> do | ||
336 | wanted <- readTVar (peerWanted peer) | ||
337 | if wanted then return $ do | ||
338 | utc <- getCurrentTime | ||
339 | control sv (Connect addr params) | ||
340 | wlog $ tomsg k "Retry" | ||
341 | atomically $ doConnect utc k | ||
342 | else return $ return ()) | ||
343 | mb | ||
344 | action | ||
345 | EOF -> wlog $ tomsg k "EOF" | 328 | EOF -> wlog $ tomsg k "EOF" |
346 | HalfConnection In -> do | 329 | HalfConnection In -> do |
347 | wlog $ tomsg k "ReadOnly" | 330 | wlog $ tomsg k "ReadOnly" |
348 | utc <- getCurrentTime | ||
349 | control sv (Connect (callBackAddress k) params) | 331 | control sv (Connect (callBackAddress k) params) |
350 | atomically $ doConnect utc k | ||
351 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" | 332 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" |
352 | RequiresPing -> wlog $ tomsg k "RequiresPing" | 333 | RequiresPing -> wlog $ tomsg k "RequiresPing" |
353 | _ -> return () | 334 | _ -> return () |
@@ -363,29 +344,6 @@ monitor sv params = do | |||
363 | where | 344 | where |
364 | _ = str :: String | 345 | _ = str :: String |
365 | 346 | ||
366 | data ConnectionKey | ||
367 | = PeerKey { callBackAddress :: SockAddr } | ||
368 | | ClientKey { localAddress :: SockAddr } | ||
369 | deriving (Show, Ord, Eq) | ||
370 | |||
371 | data Peer = Peer | ||
372 | { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis | ||
373 | , peerState :: TVar PeerState | ||
374 | } | ||
375 | data PeerState | ||
376 | = PeerPendingConnect UTCTime | ||
377 | | PeerPendingAccept UTCTime | ||
378 | | PeerConnected (TChan Stanza) | ||
379 | |||
380 | peerKey (sock,addr) = do | ||
381 | peer <- | ||
382 | sIsConnected sock >>= \c -> | ||
383 | if c then getPeerName sock -- addr is normally socketName | ||
384 | else return addr -- Weird hack: addr is would-be peer name | ||
385 | return $ PeerKey (peer `withPort` fromIntegral peerport) | ||
386 | |||
387 | clientKey (sock,addr) = return $ ClientKey addr | ||
388 | |||
389 | 347 | ||
390 | peerport = 5269 | 348 | peerport = 5269 |
391 | clientport = 5222 | 349 | clientport = 5222 |
@@ -404,7 +362,7 @@ main = runResourceT $ do | |||
404 | (Just testaddr0) | 362 | (Just testaddr0) |
405 | (Just "5269") | 363 | (Just "5269") |
406 | putStrLn $ "Connecting to "++show testaddr | 364 | putStrLn $ "Connecting to "++show testaddr |
407 | control sv (Connect testaddr peer_params) | 365 | control sv (ConnectWithEndlessRetry testaddr peer_params 2000) |
408 | forkIO $ monitor sv peer_params | 366 | forkIO $ monitor sv peer_params |
409 | control sv (Listen peerport peer_params) | 367 | control sv (Listen peerport peer_params) |
410 | -- control sv (Listen clientport client_params) | 368 | -- control sv (Listen clientport client_params) |