summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Presence/Server.hs107
-rw-r--r--xmppServer.hs102
2 files changed, 130 insertions, 79 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 5e54ff27..1141f0b0 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -60,14 +60,16 @@ import Network.BSD
60 ( getProtocolNumber 60 ( getProtocolNumber
61 ) 61 )
62import Debug.Trace 62import Debug.Trace
63import Data.Time.Clock (UTCTime,getCurrentTime) 63import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime)
64import Data.Time.Format (formatTime) 64import Data.Time.Format (formatTime)
65import System.Locale (defaultTimeLocale) 65import System.Locale (defaultTimeLocale)
66 66
67todo = error "unimplemented" 67todo = error "unimplemented"
68 68
69type TimeOut = Int -- ^ miliseconds 69type Microseconds = Int
70type PingInterval = Int -- ^ miliseconds 70type Miliseconds = Int
71type TimeOut = Miliseconds
72type PingInterval = Miliseconds
71 73
72-- | This object is passed with the 'Listen' and 'Connect' 74-- | This object is passed with the 'Listen' and 'Connect'
73-- instructions in order to control the behavior of the 75-- instructions in order to control the behavior of the
@@ -128,6 +130,10 @@ data ServerInstruction conkey
128 -- ^ listen for incomming connections 130 -- ^ listen for incomming connections
129 | Connect SockAddr (ConnectionParameters conkey) 131 | Connect SockAddr (ConnectionParameters conkey)
130 -- ^ connect to addresses 132 -- ^ connect to addresses
133 | ConnectWithEndlessRetry SockAddr
134 (ConnectionParameters conkey)
135 Miliseconds
136 -- ^ keep retrying the connection
131 | Ignore PortNumber 137 | Ignore PortNumber
132 -- ^ stop listening on specified port 138 -- ^ stop listening on specified port
133 | Send conkey ByteString 139 | Send conkey ByteString
@@ -178,6 +184,7 @@ data Server a
178 , serverReleaseKey :: ReleaseKey 184 , serverReleaseKey :: ReleaseKey
179 , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) 185 , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState))
180 , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) 186 , listenmap :: TVar (Map PortNumber (ThreadId,Socket))
187 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay))
181 } 188 }
182 189
183-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' 190-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
@@ -210,11 +217,13 @@ server = do
210 tchan <- newTChan 217 tchan <- newTChan
211 conmap <- newTVar Map.empty 218 conmap <- newTVar Map.empty
212 listenmap<- newTVar Map.empty 219 listenmap<- newTVar Map.empty
220 retrymap <- newTVar Map.empty
213 return Server { serverCommand = cmds 221 return Server { serverCommand = cmds
214 , serverEvent = tchan 222 , serverEvent = tchan
215 , serverReleaseKey = key 223 , serverReleaseKey = key
216 , conmap = conmap 224 , conmap = conmap
217 , listenmap = listenmap 225 , listenmap = listenmap
226 , retrymap = retrymap
218 } 227 }
219 liftIO $ do 228 liftIO $ do
220 forkIO $ fix $ \loop -> do 229 forkIO $ fix $ \loop -> do
@@ -230,6 +239,10 @@ server = do
230 closeAll server = liftIO $ do 239 closeAll server = liftIO $ do
231 listening <- atomically . readTVar $ listenmap server 240 listening <- atomically . readTVar $ listenmap server
232 mapM_ killListener (Map.elems listening) 241 mapM_ killListener (Map.elems listening)
242 let stopRetry (v,d) = do atomically $ writeTVar v False
243 interruptDelay d
244 retriers <- atomically . readTVar $ retrymap server
245 mapM_ stopRetry (Map.elems retriers)
233 cons <- atomically . readTVar $ conmap server 246 cons <- atomically . readTVar $ conmap server
234 atomically $ mapM_ (connClose . snd) (Map.elems cons) 247 atomically $ mapM_ (connClose . snd) (Map.elems cons)
235 atomically $ mapM_ (connWait . snd) (Map.elems cons) 248 atomically $ mapM_ (connWait . snd) (Map.elems cons)
@@ -278,7 +291,7 @@ server = do
278 doit server (Connect addr params) = liftIO $ do 291 doit server (Connect addr params) = liftIO $ do
279 void . forkIO $ do 292 void . forkIO $ do
280 proto <- getProtocolNumber "tcp" 293 proto <- getProtocolNumber "tcp"
281 bracketOnError 294 sock <- bracketOnError
282 (socket (socketFamily addr) Stream proto) 295 (socket (socketFamily addr) Stream proto)
283 (\sock -> do -- only done if there's an error 296 (\sock -> do -- only done if there's an error
284 -- Weird hack: puting the would-be peer address 297 -- Weird hack: puting the would-be peer address
@@ -288,12 +301,66 @@ server = do
288 atomically 301 atomically
289 $ writeTChan (serverEvent server) 302 $ writeTChan (serverEvent server)
290 $ (conkey,ConnectFailure addr)) 303 $ (conkey,ConnectFailure addr))
291 $ \sock -> do 304 $ \sock -> do connect sock addr
292 connect sock addr 305 return sock
293 me <- getSocketName sock 306 me <- getSocketName sock
294 conkey <- makeConnKey params (sock,me) 307 conkey <- makeConnKey params (sock,me)
295 h <- socketToHandle sock ReadWriteMode 308 h <- socketToHandle sock ReadWriteMode
296 newConnection server params conkey h Out 309 newConnection server params conkey h Out
310 return ()
311
312 doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do
313 proto <- getProtocolNumber "tcp"
314 void . forkIO $ do
315 resultVar <- atomically newEmptyTMVar
316 timer <- interruptableDelay
317 (retryVar,action) <- atomically $ do
318 let noop = return ()
319 map <- readTVar (retrymap server)
320 let mb = Map.lookup addr map
321 action <-
322 maybe (return noop)
323 (\(v,d) -> do writeTVar v False
324 return $ interruptDelay d)
325 mb
326 v <- newTVar True
327 writeTVar (retrymap server) (Map.insert addr (v,timer) map)
328 return (v,action)
329 action
330 fix $ \retryLoop -> do
331 utc <- getCurrentTime
332 forkIO $ do
333 sock <- bracketOnError
334 (socket (socketFamily addr) Stream proto)
335 (\sock -> do -- only done if there's an error
336 -- Weird hack: puting the would-be peer address
337 -- instead of local socketName
338 conkey <- makeConnKey params (sock,addr) -- XXX: ?
339 sClose sock
340 atomically $ do
341 writeTChan (serverEvent server)
342 $ (conkey,ConnectFailure addr)
343 retry <- readTVar retryVar
344 putTMVar resultVar retry)
345 $ \sock -> do connect sock addr
346 return sock
347 me <- getSocketName sock
348 conkey <- makeConnKey params (sock,me)
349 h <- socketToHandle sock ReadWriteMode
350 threads <- newConnection server params conkey h Out
351 atomically $ do threadsWait threads
352 retry <- readTVar retryVar
353 putTMVar resultVar retry
354 retry <- atomically $ takeTMVar resultVar
355 fin_utc <- getCurrentTime
356 when retry $ do
357 let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc)
358 expected = fromIntegral interval
359 when (retry && elapsed < expected) $ do
360 warn $ "Waiting to retry " <> bshow addr
361 startDelay timer (round $ 1000 * (expected-elapsed))
362 warn $ "retry " <> bshow (retry,addr)
363 when retry $ retryLoop
297 364
298 365
299-- INTERNAL ---------------------------------------------------------- 366-- INTERNAL ----------------------------------------------------------
@@ -397,7 +464,7 @@ newConnection server params conkey h inout = do
397 mb 464 mb
398 warn $ "quit-gots: " <> bshow (conkey,inout) 465 warn $ "quit-gots: " <> bshow (conkey,inout)
399#endif 466#endif
400 return () 467 return new
401 where 468 where
402 469
403 announce e = writeTChan (serverEvent server) e 470 announce e = writeTChan (serverEvent server) e
@@ -754,3 +821,29 @@ pingWait me = do
754 return e 821 return e
755 822
756 823
824data InterruptableDelay = InterruptableDelay
825 { delayThread :: TMVar ThreadId
826 }
827
828interruptableDelay :: IO InterruptableDelay
829interruptableDelay = do
830 fmap InterruptableDelay
831 $ atomically newEmptyTMVar
832startDelay :: InterruptableDelay -> Microseconds -> IO ()
833startDelay d interval = do
834 thread <- myThreadId
835 atomically $ putTMVar (delayThread d) thread
836 handle (\(ErrorCall _)-> warn $ "delay interrupted" ) $ do
837 threadDelay interval
838 void . atomically $ takeTMVar (delayThread d)
839
840interruptDelay :: InterruptableDelay -> IO ()
841interruptDelay d = do
842 mthread <- atomically $ do
843 tryTakeTMVar (delayThread d)
844 flip (maybe $ return ()) mthread $ \thread -> do
845 throwTo thread (ErrorCall "Interrupted delay")
846 atomically $ do
847 putTMVar (delayThread d) thread
848
849
diff --git a/xmppServer.hs b/xmppServer.hs
index 4fbb775b..987e7dbc 100644
--- a/xmppServer.hs
+++ b/xmppServer.hs
@@ -27,14 +27,12 @@ import Data.Maybe (catMaybes,fromJust)
27import Data.Monoid ( (<>) ) 27import Data.Monoid ( (<>) )
28import Data.Text (Text) 28import Data.Text (Text)
29import qualified Data.Text as Text (pack) 29import qualified Data.Text as Text (pack)
30import qualified Data.Map as Map
31 30
32import qualified Control.Concurrent.STM.UpdateStream as Slotted 31import qualified Control.Concurrent.STM.UpdateStream as Slotted
33import ControlMaybe 32import ControlMaybe
34import Nesting 33import Nesting
35import EventUtil 34import EventUtil
36import Server 35import Server
37import Data.Time.Clock (UTCTime,getCurrentTime)
38 36
39addrToText :: SockAddr -> Text 37addrToText :: SockAddr -> Text
40addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) 38addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr)
@@ -288,66 +286,49 @@ forkConnection k pingflag src snk stanzas = do
288 wlog $ "end reader fork: " ++ show k 286 wlog $ "end reader fork: " ++ show k
289 return output 287 return output
290 288
289data ConnectionKey
290 = PeerKey { callBackAddress :: SockAddr }
291 | ClientKey { localAddress :: SockAddr }
292 deriving (Show, Ord, Eq)
293
294{-
295data Peer = Peer
296 { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis
297 , peerState :: TVar PeerState
298 }
299data PeerState
300 = PeerPendingConnect UTCTime
301 | PeerPendingAccept UTCTime
302 | PeerConnected (TChan Stanza)
303-}
304
305peerKey (sock,addr) = do
306 peer <-
307 sIsConnected sock >>= \c ->
308 if c then getPeerName sock -- addr is normally socketName
309 else return addr -- Weird hack: addr is would-be peer name
310 return $ PeerKey (peer `withPort` fromIntegral peerport)
311
312clientKey (sock,addr) = return $ ClientKey addr
313
291monitor sv params = do 314monitor sv params = do
292 chan <- return $ serverEvent sv 315 chan <- return $ serverEvent sv
293 stanzas <- atomically newTChan 316 stanzas <- atomically newTChan
294 peersVar <- atomically $ newTVar Map.empty
295 let doConnect utc k = do
296 peers <- readTVar peersVar
297 let mb = Map.lookup k peers
298 maybe (do false <- newTVar False
299 pending <- newTVar (PeerPendingConnect utc)
300 let v = Peer { peerWanted = false
301 , peerState = pending }
302 writeTVar (peersVar) $ Map.insert k v peers)
303 (\peer ->
304 writeTVar (peerState peer)
305 $ PeerPendingConnect utc)
306 mb
307 fix $ \loop -> do 317 fix $ \loop -> do
308 action <- atomically $ foldr1 orElse 318 action <- atomically $ foldr1 orElse
309 [ readTChan chan >>= \(k,e) -> return $ do 319 [ readTChan chan >>= \(k,e) -> return $ do
310 case e of 320 case e of
311 Connection pingflag conread conwrite -> do 321 Connection pingflag conread conwrite -> do
312 wlog $ tomsg k "Connection" 322 wlog $ tomsg k "Connection"
313 let (xsrc,xsnk) = xmlStream conread conwrite 323 let (xsrc,xsnk) = xmlStream conread conwrite
314 outs <- forkConnection k pingflag xsrc xsnk stanzas 324 forkConnection k pingflag xsrc xsnk stanzas
315 atomically $ do 325 return ()
316 peers <- readTVar peersVar
317 let mb = Map.lookup k peers
318 maybe (do false <- newTVar True -- False -- TODO: should be False
319 connected <- newTVar (PeerConnected outs)
320 let v = Peer { peerWanted = false
321 , peerState = connected }
322 writeTVar (peersVar) $ Map.insert k v peers)
323 (\peer -> do
324 writeTVar (peerWanted peer) True -- TODO REMOVE
325 writeTVar (peerState peer)
326 $ PeerConnected outs)
327 mb
328 return ()
329 ConnectFailure addr -> do 326 ConnectFailure addr -> do
330 wlog $ tomsg k "ConnectFailure" 327 wlog $ tomsg k "ConnectFailure"
331 action <- atomically $ do
332 peers <- readTVar peersVar
333 let mb = Map.lookup k peers
334 maybe (return $ return ())
335 (\peer -> do
336 wanted <- readTVar (peerWanted peer)
337 if wanted then return $ do
338 utc <- getCurrentTime
339 control sv (Connect addr params)
340 wlog $ tomsg k "Retry"
341 atomically $ doConnect utc k
342 else return $ return ())
343 mb
344 action
345 EOF -> wlog $ tomsg k "EOF" 328 EOF -> wlog $ tomsg k "EOF"
346 HalfConnection In -> do 329 HalfConnection In -> do
347 wlog $ tomsg k "ReadOnly" 330 wlog $ tomsg k "ReadOnly"
348 utc <- getCurrentTime
349 control sv (Connect (callBackAddress k) params) 331 control sv (Connect (callBackAddress k) params)
350 atomically $ doConnect utc k
351 HalfConnection Out -> wlog $ tomsg k "WriteOnly" 332 HalfConnection Out -> wlog $ tomsg k "WriteOnly"
352 RequiresPing -> wlog $ tomsg k "RequiresPing" 333 RequiresPing -> wlog $ tomsg k "RequiresPing"
353 _ -> return () 334 _ -> return ()
@@ -363,29 +344,6 @@ monitor sv params = do
363 where 344 where
364 _ = str :: String 345 _ = str :: String
365 346
366data ConnectionKey
367 = PeerKey { callBackAddress :: SockAddr }
368 | ClientKey { localAddress :: SockAddr }
369 deriving (Show, Ord, Eq)
370
371data Peer = Peer
372 { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis
373 , peerState :: TVar PeerState
374 }
375data PeerState
376 = PeerPendingConnect UTCTime
377 | PeerPendingAccept UTCTime
378 | PeerConnected (TChan Stanza)
379
380peerKey (sock,addr) = do
381 peer <-
382 sIsConnected sock >>= \c ->
383 if c then getPeerName sock -- addr is normally socketName
384 else return addr -- Weird hack: addr is would-be peer name
385 return $ PeerKey (peer `withPort` fromIntegral peerport)
386
387clientKey (sock,addr) = return $ ClientKey addr
388
389 347
390peerport = 5269 348peerport = 5269
391clientport = 5222 349clientport = 5222
@@ -404,7 +362,7 @@ main = runResourceT $ do
404 (Just testaddr0) 362 (Just testaddr0)
405 (Just "5269") 363 (Just "5269")
406 putStrLn $ "Connecting to "++show testaddr 364 putStrLn $ "Connecting to "++show testaddr
407 control sv (Connect testaddr peer_params) 365 control sv (ConnectWithEndlessRetry testaddr peer_params 2000)
408 forkIO $ monitor sv peer_params 366 forkIO $ monitor sv peer_params
409 control sv (Listen peerport peer_params) 367 control sv (Listen peerport peer_params)
410 -- control sv (Listen clientport client_params) 368 -- control sv (Listen clientport client_params)