summaryrefslogtreecommitdiff
path: root/dht/src/Network/Tox/AggregateSession.hs
diff options
context:
space:
mode:
Diffstat (limited to 'dht/src/Network/Tox/AggregateSession.hs')
-rw-r--r--dht/src/Network/Tox/AggregateSession.hs27
1 files changed, 15 insertions, 12 deletions
diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs
index 9a784291..d1f42e91 100644
--- a/dht/src/Network/Tox/AggregateSession.hs
+++ b/dht/src/Network/Tox/AggregateSession.hs
@@ -24,6 +24,7 @@ module Network.Tox.AggregateSession
24 24
25import Control.Concurrent.STM 25import Control.Concurrent.STM
26import Control.Concurrent.STM.TMChan 26import Control.Concurrent.STM.TMChan
27import Control.Exception
27import Control.Monad 28import Control.Monad
28import Data.Dependent.Sum 29import Data.Dependent.Sum
29import Data.Function 30import Data.Function
@@ -114,7 +115,7 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it.
114 | DoRequestMissing -- ^ Detect and request lost packets. 115 | DoRequestMissing -- ^ Detect and request lost packets.
115 deriving Enum 116 deriving Enum
116 117
117-- | This call loops until the provided sesison is closed or times out. It 118-- | This call loops until the provided session is closed or times out. It
118-- monitors the provided (non-empty) priority queue for scheduled tasks (see 119-- monitors the provided (non-empty) priority queue for scheduled tasks (see
119-- 'KeepAliveEvents') to perform for the connection. 120-- 'KeepAliveEvents') to perform for the connection.
120keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () 121keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO ()
@@ -142,23 +143,23 @@ keepAlive s q = do
142 now <- getPOSIXTime 143 now <- getPOSIXTime
143 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now 144 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now
144 145
145 re tm again e io = do 146 re tm e io = do
146 io 147 io
147 atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm 148 atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm
148 again
149 149
150 doEvent again now e = case e of 150 doEvent again now e = case e of
151 DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) 151 DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s)
152 sClose s 152 sClose s
153 DoAlive -> re (now + 10) again e doAlive 153 DoAlive -> re (now + 10) e doAlive >> again
154 DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals 154 DoRequestMissing -> re (now + 5{- toxcore uses 1sec -}) e doRequestMissing >> again
155 155
156 fix $ \again -> do 156 fix $ \again -> do
157 157
158 now <- getPOSIXTime 158 now <- getPOSIXTime
159 join $ atomically $ do 159 join $ atomically $ do
160 PSQ.findMin <$> readTVar q >>= \case 160 PSQ.findMin <$> readTVar q >>= \case
161 Nothing -> error "keepAlive: unexpected empty PSQ." 161 Nothing -> return $ do dput XUnexpected "keepAlive: unexpected empty PSQ."
162 sClose s
162 Just ( k :-> tm ) -> 163 Just ( k :-> tm ) ->
163 return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again 164 return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again
164 else doEvent again now (toEnum k) 165 else doEvent again now (toEnum k)
@@ -202,7 +203,7 @@ forkSession c s setStatus = forkIO $ do
202 atomically $ do setStatus Established 203 atomically $ do setStatus Established
203 sendPacket online 204 sendPacket online
204 bump 205 bump
205 beacon <- forkIO $ keepAlive s q 206 beacon <- forkIO $ keepAlive s q `finally` sClose s
206 awaitPacket $ \awaitNext x -> do 207 awaitPacket $ \awaitNext x -> do
207 bump 208 bump
208 case msgID x of 209 case msgID x of
@@ -223,7 +224,7 @@ forkSession c s setStatus = forkIO $ do
223-- one active session). 224-- one active session).
224addSession :: AggregateSession -> Session -> IO AddResult 225addSession :: AggregateSession -> Session -> IO AddResult
225addSession c s = do 226addSession c s = do
226 (result,mcon,replaced) <- atomically $ do 227 (result,mcon,rejected) <- atomically $ do
227 let them = sTheirUserKey s 228 let them = sTheirUserKey s
228 me = toPublic $ sOurKey s 229 me = toPublic $ sOurKey s
229 compat <- checkCompatible me them c 230 compat <- checkCompatible me them c
@@ -232,7 +233,7 @@ addSession c s = do
232 Just True -> AddedSession 233 Just True -> AddedSession
233 Just False -> RejectedSession 234 Just False -> RejectedSession
234 case result of 235 case result of
235 RejectedSession -> return (result,Nothing,Nothing) 236 RejectedSession -> return (result,Nothing,Just s)
236 _ -> do 237 _ -> do
237 statvar <- newTVar Dormant 238 statvar <- newTVar Dormant
238 imap <- readTVar (contactSession c) 239 imap <- readTVar (contactSession c)
@@ -240,9 +241,9 @@ addSession c s = do
240 s0 = IntMap.lookup (sSessionID s) imap 241 s0 = IntMap.lookup (sSessionID s) imap
241 imap' = IntMap.insert (sSessionID s) con imap 242 imap' = IntMap.insert (sSessionID s) con imap
242 writeTVar (contactSession c) imap' 243 writeTVar (contactSession c) imap'
243 return (result,Just con,s0) 244 return (result,Just con,singleSession <$> s0)
244 245
245 mapM_ (sClose . singleSession) replaced 246 mapM_ sClose rejected
246 forM_ (mcon :: Maybe SingleCon) $ \con -> 247 forM_ (mcon :: Maybe SingleCon) $ \con ->
247 forkSession c s $ \progress -> do 248 forkSession c s $ \progress -> do
248 status0 <- aggregateStatus c 249 status0 <- aggregateStatus c
@@ -313,7 +314,9 @@ closeAll :: AggregateSession -> IO ()
313closeAll c = join $ atomically $ do 314closeAll c = join $ atomically $ do
314 imap <- readTVar (contactSession c) 315 imap <- readTVar (contactSession c)
315 closeTMChan (contactChannel c) 316 closeTMChan (contactChannel c)
316 return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid 317 return $ forM_ (IntMap.toList imap) $ \(sid,SingleCon s _) -> do
318 sClose s
319 delSession c sid
317 320
318-- | Query the current status of the aggregate, there are three possible 321-- | Query the current status of the aggregate, there are three possible
319-- values: 322-- values: