diff options
Diffstat (limited to 'dht/src/Network/Tox/AggregateSession.hs')
-rw-r--r-- | dht/src/Network/Tox/AggregateSession.hs | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs index 9a784291..d1f42e91 100644 --- a/dht/src/Network/Tox/AggregateSession.hs +++ b/dht/src/Network/Tox/AggregateSession.hs | |||
@@ -24,6 +24,7 @@ module Network.Tox.AggregateSession | |||
24 | 24 | ||
25 | import Control.Concurrent.STM | 25 | import Control.Concurrent.STM |
26 | import Control.Concurrent.STM.TMChan | 26 | import Control.Concurrent.STM.TMChan |
27 | import Control.Exception | ||
27 | import Control.Monad | 28 | import Control.Monad |
28 | import Data.Dependent.Sum | 29 | import Data.Dependent.Sum |
29 | import Data.Function | 30 | import Data.Function |
@@ -114,7 +115,7 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | |||
114 | | DoRequestMissing -- ^ Detect and request lost packets. | 115 | | DoRequestMissing -- ^ Detect and request lost packets. |
115 | deriving Enum | 116 | deriving Enum |
116 | 117 | ||
117 | -- | This call loops until the provided sesison is closed or times out. It | 118 | -- | This call loops until the provided session is closed or times out. It |
118 | -- monitors the provided (non-empty) priority queue for scheduled tasks (see | 119 | -- monitors the provided (non-empty) priority queue for scheduled tasks (see |
119 | -- 'KeepAliveEvents') to perform for the connection. | 120 | -- 'KeepAliveEvents') to perform for the connection. |
120 | keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () | 121 | keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () |
@@ -142,23 +143,23 @@ keepAlive s q = do | |||
142 | now <- getPOSIXTime | 143 | now <- getPOSIXTime |
143 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now | 144 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now |
144 | 145 | ||
145 | re tm again e io = do | 146 | re tm e io = do |
146 | io | 147 | io |
147 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm | 148 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm |
148 | again | ||
149 | 149 | ||
150 | doEvent again now e = case e of | 150 | doEvent again now e = case e of |
151 | DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) | 151 | DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) |
152 | sClose s | 152 | sClose s |
153 | DoAlive -> re (now + 10) again e doAlive | 153 | DoAlive -> re (now + 10) e doAlive >> again |
154 | DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals | 154 | DoRequestMissing -> re (now + 5{- toxcore uses 1sec -}) e doRequestMissing >> again |
155 | 155 | ||
156 | fix $ \again -> do | 156 | fix $ \again -> do |
157 | 157 | ||
158 | now <- getPOSIXTime | 158 | now <- getPOSIXTime |
159 | join $ atomically $ do | 159 | join $ atomically $ do |
160 | PSQ.findMin <$> readTVar q >>= \case | 160 | PSQ.findMin <$> readTVar q >>= \case |
161 | Nothing -> error "keepAlive: unexpected empty PSQ." | 161 | Nothing -> return $ do dput XUnexpected "keepAlive: unexpected empty PSQ." |
162 | sClose s | ||
162 | Just ( k :-> tm ) -> | 163 | Just ( k :-> tm ) -> |
163 | return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again | 164 | return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again |
164 | else doEvent again now (toEnum k) | 165 | else doEvent again now (toEnum k) |
@@ -202,7 +203,7 @@ forkSession c s setStatus = forkIO $ do | |||
202 | atomically $ do setStatus Established | 203 | atomically $ do setStatus Established |
203 | sendPacket online | 204 | sendPacket online |
204 | bump | 205 | bump |
205 | beacon <- forkIO $ keepAlive s q | 206 | beacon <- forkIO $ keepAlive s q `finally` sClose s |
206 | awaitPacket $ \awaitNext x -> do | 207 | awaitPacket $ \awaitNext x -> do |
207 | bump | 208 | bump |
208 | case msgID x of | 209 | case msgID x of |
@@ -223,7 +224,7 @@ forkSession c s setStatus = forkIO $ do | |||
223 | -- one active session). | 224 | -- one active session). |
224 | addSession :: AggregateSession -> Session -> IO AddResult | 225 | addSession :: AggregateSession -> Session -> IO AddResult |
225 | addSession c s = do | 226 | addSession c s = do |
226 | (result,mcon,replaced) <- atomically $ do | 227 | (result,mcon,rejected) <- atomically $ do |
227 | let them = sTheirUserKey s | 228 | let them = sTheirUserKey s |
228 | me = toPublic $ sOurKey s | 229 | me = toPublic $ sOurKey s |
229 | compat <- checkCompatible me them c | 230 | compat <- checkCompatible me them c |
@@ -232,7 +233,7 @@ addSession c s = do | |||
232 | Just True -> AddedSession | 233 | Just True -> AddedSession |
233 | Just False -> RejectedSession | 234 | Just False -> RejectedSession |
234 | case result of | 235 | case result of |
235 | RejectedSession -> return (result,Nothing,Nothing) | 236 | RejectedSession -> return (result,Nothing,Just s) |
236 | _ -> do | 237 | _ -> do |
237 | statvar <- newTVar Dormant | 238 | statvar <- newTVar Dormant |
238 | imap <- readTVar (contactSession c) | 239 | imap <- readTVar (contactSession c) |
@@ -240,9 +241,9 @@ addSession c s = do | |||
240 | s0 = IntMap.lookup (sSessionID s) imap | 241 | s0 = IntMap.lookup (sSessionID s) imap |
241 | imap' = IntMap.insert (sSessionID s) con imap | 242 | imap' = IntMap.insert (sSessionID s) con imap |
242 | writeTVar (contactSession c) imap' | 243 | writeTVar (contactSession c) imap' |
243 | return (result,Just con,s0) | 244 | return (result,Just con,singleSession <$> s0) |
244 | 245 | ||
245 | mapM_ (sClose . singleSession) replaced | 246 | mapM_ sClose rejected |
246 | forM_ (mcon :: Maybe SingleCon) $ \con -> | 247 | forM_ (mcon :: Maybe SingleCon) $ \con -> |
247 | forkSession c s $ \progress -> do | 248 | forkSession c s $ \progress -> do |
248 | status0 <- aggregateStatus c | 249 | status0 <- aggregateStatus c |
@@ -313,7 +314,9 @@ closeAll :: AggregateSession -> IO () | |||
313 | closeAll c = join $ atomically $ do | 314 | closeAll c = join $ atomically $ do |
314 | imap <- readTVar (contactSession c) | 315 | imap <- readTVar (contactSession c) |
315 | closeTMChan (contactChannel c) | 316 | closeTMChan (contactChannel c) |
316 | return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid | 317 | return $ forM_ (IntMap.toList imap) $ \(sid,SingleCon s _) -> do |
318 | sClose s | ||
319 | delSession c sid | ||
317 | 320 | ||
318 | -- | Query the current status of the aggregate, there are three possible | 321 | -- | Query the current status of the aggregate, there are three possible |
319 | -- values: | 322 | -- values: |