diff options
author | Joe Crayne <joe@jerkface.net> | 2018-09-08 06:37:10 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-11-03 10:23:45 -0400 |
commit | 36cd21f0b42c09cbcf3a215afbcd754cc37d1c4e (patch) | |
tree | 548a3c6eb5c03692327f561a6d5afbcf3c1d5f4e /src/Network/Tox | |
parent | 0c7768ba8eb62a6a74176f737a1c9c42308d5a8c (diff) |
Switched to new session tracker.
Diffstat (limited to 'src/Network/Tox')
-rw-r--r-- | src/Network/Tox/AggregateSession.hs | 127 |
1 files changed, 83 insertions, 44 deletions
diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs index edb897e0..1dd10eef 100644 --- a/src/Network/Tox/AggregateSession.hs +++ b/src/Network/Tox/AggregateSession.hs | |||
@@ -22,7 +22,6 @@ module Network.Tox.AggregateSession | |||
22 | 22 | ||
23 | import Control.Concurrent.STM | 23 | import Control.Concurrent.STM |
24 | import Control.Concurrent.STM.TMChan | 24 | import Control.Concurrent.STM.TMChan |
25 | import Control.Concurrent.Supply | ||
26 | import Control.Monad | 25 | import Control.Monad |
27 | import Data.Function | 26 | import Data.Function |
28 | import qualified Data.IntMap.Strict as IntMap | 27 | import qualified Data.IntMap.Strict as IntMap |
@@ -47,9 +46,7 @@ import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket, | |||
47 | pattern PacketRequest) | 46 | pattern PacketRequest) |
48 | import Network.Tox.DHT.Transport (key2id) | 47 | import Network.Tox.DHT.Transport (key2id) |
49 | import Network.Tox.NodeId (ToxProgress (..)) | 48 | import Network.Tox.NodeId (ToxProgress (..)) |
50 | import Network.Tox.Crypto.Handlers | 49 | import Network.Tox.Session |
51 | |||
52 | type Session = NetCryptoSession | ||
53 | 50 | ||
54 | -- | For each component session, we track the current status. | 51 | -- | For each component session, we track the current status. |
55 | data SingleCon = SingleCon | 52 | data SingleCon = SingleCon |
@@ -113,47 +110,94 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | |||
113 | | DoRequestMissing -- ^ Detect and request lost packets. | 110 | | DoRequestMissing -- ^ Detect and request lost packets. |
114 | deriving Enum | 111 | deriving Enum |
115 | 112 | ||
116 | -- | This function forks a thread to read all packets from the provided | 113 | -- | This call loops until the provided sesison is closed or times out. It |
117 | -- 'Session' and forward them to 'contactChannel' for a containing | 114 | -- monitors the provided (non-empty) priority queue for scheduled tasks (see |
118 | -- 'AggregateSession' | 115 | -- 'KeepAliveEvents') to perform for the connection. |
116 | keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () | ||
117 | keepAlive s q = do | ||
118 | myThreadId >>= flip labelThread | ||
119 | (intercalate "." ["beacon" | ||
120 | , take 8 $ show $ key2id $ sTheirUserKey s | ||
121 | , show $ sSessionID s]) | ||
122 | |||
123 | let outPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e | ||
124 | |||
125 | doAlive = do | ||
126 | -- outPrint $ "Beacon" | ||
127 | sendMessage (sTransport s) () (OneByte PING) | ||
128 | |||
129 | doRequestMissing = do | ||
130 | (ns,nmin) <- sMissingInbound s | ||
131 | -- outPrint $ "PacketRequest " ++ show (nmin,ns) | ||
132 | sendMessage (sTransport s) () (RequestResend PacketRequest ns) | ||
133 | |||
134 | re tm again e io = do | ||
135 | io | ||
136 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm | ||
137 | again | ||
138 | |||
139 | doEvent again now e = case e of | ||
140 | DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) | ||
141 | sClose s | ||
142 | DoAlive -> re (now + 10) again e doAlive | ||
143 | DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals | ||
144 | |||
145 | fix $ \again -> do | ||
146 | |||
147 | now <- getPOSIXTime | ||
148 | join $ atomically $ do | ||
149 | Just ( k :-> tm ) <- PSQ.findMin <$> readTVar q | ||
150 | return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again | ||
151 | else doEvent again now (toEnum k) | ||
152 | |||
153 | -- | This function forks two threads: the 'keepAlive' beacon-sending thread and | ||
154 | -- a thread to read all packets from the provided 'Session' and forward them to | ||
155 | -- 'contactChannel' for a containing 'AggregateSession' | ||
119 | forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId | 156 | forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId |
120 | forkSession c s setStatus = forkIO $ do | 157 | forkSession c s setStatus = forkIO $ do |
121 | myThreadId >>= flip labelThread | 158 | myThreadId >>= flip labelThread |
122 | (intercalate "." ["s" | 159 | (intercalate "." ["s" |
123 | , take 8 $ show $ key2id $ ncTheirPublicKey s | 160 | , take 8 $ show $ key2id $ sTheirUserKey s |
124 | , show $ sSessionID s]) | 161 | , show $ sSessionID s]) |
125 | tmchan <- atomically $ do | 162 | |
126 | tmchan <- newTMChan | 163 | q <- atomically $ newTVar $ fromList |
127 | supply <- readTVar (listenerIDSupply $ ncAllSessions s) | 164 | [ fromEnum DoAlive :-> 0 |
128 | let (listenerId,supply') = freshId supply | 165 | , fromEnum DoRequestMissing :-> 0 |
129 | writeTVar (listenerIDSupply $ ncAllSessions s) supply' | 166 | ] |
130 | modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan)) | ||
131 | return tmchan | ||
132 | 167 | ||
133 | let sendPacket :: CryptoMessage -> STM () | 168 | let sendPacket :: CryptoMessage -> STM () |
134 | sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) | 169 | sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) |
135 | 170 | ||
136 | inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e | 171 | inPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " --> " ++ e |
172 | |||
173 | bump = do | ||
174 | -- inPrint $ "BUMP: " ++ show (sSessionID s) | ||
175 | now <- getPOSIXTime | ||
176 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) | ||
137 | 177 | ||
138 | onPacket body loop Nothing = return () | 178 | onPacket body loop Nothing = return () |
139 | onPacket body loop (Just (Left e)) = inPrint e >> loop | 179 | onPacket body loop (Just (Left e)) = inPrint e >> loop |
140 | onPacket body loop (Just (Right x)) = body loop x | 180 | onPacket body loop (Just (Right x)) = body loop x |
141 | 181 | ||
142 | awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=) | 182 | awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body |
143 | $ onPacket body | ||
144 | 183 | ||
145 | atomically $ setStatus $ InProgress AwaitingSessionPacket | 184 | atomically $ setStatus $ InProgress AwaitingSessionPacket |
146 | atomically $ setStatus Established | 185 | awaitPacket $ \_ (online,()) -> do |
147 | awaitPacket $ \loop x -> do | 186 | when (msgID online /= ONLINE) $ do |
148 | case msgID x of | 187 | inPrint $ "Unexpected initial packet: " ++ show (msgID online) |
149 | KillPacket -> return () | 188 | atomically $ do setStatus Established |
150 | _ -> atomically (sendPacket x) >> loop | 189 | sendPacket online |
151 | 190 | bump | |
152 | atomically $ setStatus Dormant | 191 | beacon <- forkIO $ keepAlive s q |
153 | 192 | awaitPacket $ \awaitNext (x,()) -> do | |
154 | 193 | bump | |
155 | sSessionID :: Session -> Int | 194 | case msgID x of |
156 | sSessionID s = fromIntegral $ ncSessionId s | 195 | PING -> return () |
196 | KillPacket -> sClose s | ||
197 | _ -> atomically $ sendPacket x | ||
198 | awaitNext | ||
199 | atomically $ setStatus Dormant | ||
200 | killThread beacon | ||
157 | 201 | ||
158 | -- | Add a new session (in 'AwaitingSessionPacket' state) to the | 202 | -- | Add a new session (in 'AwaitingSessionPacket' state) to the |
159 | -- 'AggregateSession'. If the supplied session is not compatible because it is | 203 | -- 'AggregateSession'. If the supplied session is not compatible because it is |
@@ -166,8 +210,8 @@ sSessionID s = fromIntegral $ ncSessionId s | |||
166 | addSession :: AggregateSession -> Session -> IO AddResult | 210 | addSession :: AggregateSession -> Session -> IO AddResult |
167 | addSession c s = do | 211 | addSession c s = do |
168 | (result,mcon,replaced) <- atomically $ do | 212 | (result,mcon,replaced) <- atomically $ do |
169 | let them = ncTheirPublicKey s | 213 | let them = sTheirUserKey s |
170 | me = ncMyPublicKey s | 214 | me = toPublic $ sOurKey s |
171 | compat <- checkCompatible me them c | 215 | compat <- checkCompatible me them c |
172 | let result = case compat of | 216 | let result = case compat of |
173 | Nothing -> FirstSession | 217 | Nothing -> FirstSession |
@@ -184,7 +228,7 @@ addSession c s = do | |||
184 | writeTVar (contactSession c) imap' | 228 | writeTVar (contactSession c) imap' |
185 | return (result,Just con,s0) | 229 | return (result,Just con,s0) |
186 | 230 | ||
187 | mapM_ (destroySession . singleSession) replaced | 231 | mapM_ (sClose . singleSession) replaced |
188 | forM_ mcon $ \con -> | 232 | forM_ mcon $ \con -> |
189 | forkSession c s $ \progress -> do | 233 | forkSession c s $ \progress -> do |
190 | writeTVar (singleStatus con) progress | 234 | writeTVar (singleStatus con) progress |
@@ -203,6 +247,7 @@ addSession c s = do | |||
203 | return emap' | 247 | return emap' |
204 | writeTVar (contactEstablished c) emap' | 248 | writeTVar (contactEstablished c) emap' |
205 | return result | 249 | return result |
250 | |||
206 | -- | Information returned from 'delSession'. | 251 | -- | Information returned from 'delSession'. |
207 | data DelResult = NoSession -- ^ Contact is completely disconnected. | 252 | data DelResult = NoSession -- ^ Contact is completely disconnected. |
208 | | DeletedSession -- ^ Connection removed but session remains active. | 253 | | DeletedSession -- ^ Connection removed but session remains active. |
@@ -230,11 +275,10 @@ delSession c sid = do | |||
230 | writeTVar (contactSession c) imap' | 275 | writeTVar (contactSession c) imap' |
231 | writeTVar (contactEstablished c) emap' | 276 | writeTVar (contactEstablished c) emap' |
232 | return ( IntMap.lookup sid imap, IntMap.null imap') | 277 | return ( IntMap.lookup sid imap, IntMap.null imap') |
233 | mapM_ (destroySession . singleSession) con | 278 | mapM_ (sClose . singleSession) con |
234 | return $ if r then NoSession | 279 | return $ if r then NoSession |
235 | else DeletedSession | 280 | else DeletedSession |
236 | 281 | ||
237 | |||
238 | -- | Send a packet to one or all of the component sessions in the aggregate. | 282 | -- | Send a packet to one or all of the component sessions in the aggregate. |
239 | dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. | 283 | dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. |
240 | -> CryptoMessage -> IO () | 284 | -> CryptoMessage -> IO () |
@@ -242,11 +286,7 @@ dispatchMessage c msid msg = join $ atomically $ do | |||
242 | imap <- readTVar (contactSession c) | 286 | imap <- readTVar (contactSession c) |
243 | let go = case msid of Nothing -> forM_ imap | 287 | let go = case msid of Nothing -> forM_ imap |
244 | Just sid -> forM_ (IntMap.lookup sid imap) | 288 | Just sid -> forM_ (IntMap.lookup sid imap) |
245 | return $ go $ \con -> do | 289 | return $ go $ \con -> sendMessage (sTransport $ singleSession con) () msg |
246 | eResult <- sendLossless (transportCrypto (ncAllSessions (singleSession con))) (singleSession con) msg | ||
247 | case eResult of | ||
248 | Left msg -> dput XJabber msg | ||
249 | Right pkt -> dput XJabber ("sendLossLess SUCCESS: " ++ show pkt) | ||
250 | 290 | ||
251 | -- | Retry until: | 291 | -- | Retry until: |
252 | -- | 292 | -- |
@@ -287,7 +327,6 @@ aggregateStatus c = do | |||
287 | | not (IntMap.null imap) -> InProgress AwaitingSessionPacket | 327 | | not (IntMap.null imap) -> InProgress AwaitingSessionPacket |
288 | | otherwise -> Dormant | 328 | | otherwise -> Dormant |
289 | 329 | ||
290 | |||
291 | -- | Query whether the supplied ToxID keys are compatible with this aggregate. | 330 | -- | Query whether the supplied ToxID keys are compatible with this aggregate. |
292 | -- | 331 | -- |
293 | -- [ Nothing ] Any keys would be compatible because there is not yet any | 332 | -- [ Nothing ] Any keys would be compatible because there is not yet any |
@@ -304,8 +343,8 @@ checkCompatible me them c = do | |||
304 | imap <- readTVar (contactSession c) | 343 | imap <- readTVar (contactSession c) |
305 | return $ case IntMap.elems imap of | 344 | return $ case IntMap.elems imap of |
306 | _ | isclosed -> Just False -- All keys are incompatible (closed). | 345 | _ | isclosed -> Just False -- All keys are incompatible (closed). |
307 | con:_ -> Just $ ncTheirPublicKey (singleSession con) == them | 346 | con:_ -> Just $ sTheirUserKey (singleSession con) == them |
308 | && (ncMyPublicKey $ singleSession con) == me | 347 | && toPublic (sOurKey $ singleSession con) == me |
309 | [] -> Nothing | 348 | [] -> Nothing |
310 | 349 | ||
311 | -- | Returns the local and remote keys that are compatible with this aggregate. | 350 | -- | Returns the local and remote keys that are compatible with this aggregate. |
@@ -317,6 +356,6 @@ compatibleKeys c = do | |||
317 | imap <- readTVar (contactSession c) | 356 | imap <- readTVar (contactSession c) |
318 | return $ case IntMap.elems imap of | 357 | return $ case IntMap.elems imap of |
319 | _ | isclosed -> Nothing -- none. | 358 | _ | isclosed -> Nothing -- none. |
320 | con:_ -> Just ( ncMyPublicKey $ singleSession con | 359 | con:_ -> Just ( toPublic (sOurKey $ singleSession con) |
321 | , ncTheirPublicKey (singleSession con)) | 360 | , sTheirUserKey (singleSession con)) |
322 | [] -> Nothing -- any. | 361 | [] -> Nothing -- any. |