summaryrefslogtreecommitdiff
path: root/src/Network/Tox
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-09-08 06:37:10 -0400
committerJoe Crayne <joe@jerkface.net>2018-11-03 10:23:45 -0400
commit36cd21f0b42c09cbcf3a215afbcd754cc37d1c4e (patch)
tree548a3c6eb5c03692327f561a6d5afbcf3c1d5f4e /src/Network/Tox
parent0c7768ba8eb62a6a74176f737a1c9c42308d5a8c (diff)
Switched to new session tracker.
Diffstat (limited to 'src/Network/Tox')
-rw-r--r--src/Network/Tox/AggregateSession.hs127
1 files changed, 83 insertions, 44 deletions
diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs
index edb897e0..1dd10eef 100644
--- a/src/Network/Tox/AggregateSession.hs
+++ b/src/Network/Tox/AggregateSession.hs
@@ -22,7 +22,6 @@ module Network.Tox.AggregateSession
22 22
23import Control.Concurrent.STM 23import Control.Concurrent.STM
24import Control.Concurrent.STM.TMChan 24import Control.Concurrent.STM.TMChan
25import Control.Concurrent.Supply
26import Control.Monad 25import Control.Monad
27import Data.Function 26import Data.Function
28import qualified Data.IntMap.Strict as IntMap 27import qualified Data.IntMap.Strict as IntMap
@@ -47,9 +46,7 @@ import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket,
47 pattern PacketRequest) 46 pattern PacketRequest)
48import Network.Tox.DHT.Transport (key2id) 47import Network.Tox.DHT.Transport (key2id)
49import Network.Tox.NodeId (ToxProgress (..)) 48import Network.Tox.NodeId (ToxProgress (..))
50import Network.Tox.Crypto.Handlers 49import Network.Tox.Session
51
52type Session = NetCryptoSession
53 50
54-- | For each component session, we track the current status. 51-- | For each component session, we track the current status.
55data SingleCon = SingleCon 52data SingleCon = SingleCon
@@ -113,47 +110,94 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it.
113 | DoRequestMissing -- ^ Detect and request lost packets. 110 | DoRequestMissing -- ^ Detect and request lost packets.
114 deriving Enum 111 deriving Enum
115 112
116-- | This function forks a thread to read all packets from the provided 113-- | This call loops until the provided sesison is closed or times out. It
117-- 'Session' and forward them to 'contactChannel' for a containing 114-- monitors the provided (non-empty) priority queue for scheduled tasks (see
118-- 'AggregateSession' 115-- 'KeepAliveEvents') to perform for the connection.
116keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO ()
117keepAlive s q = do
118 myThreadId >>= flip labelThread
119 (intercalate "." ["beacon"
120 , take 8 $ show $ key2id $ sTheirUserKey s
121 , show $ sSessionID s])
122
123 let outPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e
124
125 doAlive = do
126 -- outPrint $ "Beacon"
127 sendMessage (sTransport s) () (OneByte PING)
128
129 doRequestMissing = do
130 (ns,nmin) <- sMissingInbound s
131 -- outPrint $ "PacketRequest " ++ show (nmin,ns)
132 sendMessage (sTransport s) () (RequestResend PacketRequest ns)
133
134 re tm again e io = do
135 io
136 atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm
137 again
138
139 doEvent again now e = case e of
140 DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s)
141 sClose s
142 DoAlive -> re (now + 10) again e doAlive
143 DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals
144
145 fix $ \again -> do
146
147 now <- getPOSIXTime
148 join $ atomically $ do
149 Just ( k :-> tm ) <- PSQ.findMin <$> readTVar q
150 return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again
151 else doEvent again now (toEnum k)
152
153-- | This function forks two threads: the 'keepAlive' beacon-sending thread and
154-- a thread to read all packets from the provided 'Session' and forward them to
155-- 'contactChannel' for a containing 'AggregateSession'
119forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId 156forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId
120forkSession c s setStatus = forkIO $ do 157forkSession c s setStatus = forkIO $ do
121 myThreadId >>= flip labelThread 158 myThreadId >>= flip labelThread
122 (intercalate "." ["s" 159 (intercalate "." ["s"
123 , take 8 $ show $ key2id $ ncTheirPublicKey s 160 , take 8 $ show $ key2id $ sTheirUserKey s
124 , show $ sSessionID s]) 161 , show $ sSessionID s])
125 tmchan <- atomically $ do 162
126 tmchan <- newTMChan 163 q <- atomically $ newTVar $ fromList
127 supply <- readTVar (listenerIDSupply $ ncAllSessions s) 164 [ fromEnum DoAlive :-> 0
128 let (listenerId,supply') = freshId supply 165 , fromEnum DoRequestMissing :-> 0
129 writeTVar (listenerIDSupply $ ncAllSessions s) supply' 166 ]
130 modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan))
131 return tmchan
132 167
133 let sendPacket :: CryptoMessage -> STM () 168 let sendPacket :: CryptoMessage -> STM ()
134 sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) 169 sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg)
135 170
136 inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e 171 inPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " --> " ++ e
172
173 bump = do
174 -- inPrint $ "BUMP: " ++ show (sSessionID s)
175 now <- getPOSIXTime
176 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15)
137 177
138 onPacket body loop Nothing = return () 178 onPacket body loop Nothing = return ()
139 onPacket body loop (Just (Left e)) = inPrint e >> loop 179 onPacket body loop (Just (Left e)) = inPrint e >> loop
140 onPacket body loop (Just (Right x)) = body loop x 180 onPacket body loop (Just (Right x)) = body loop x
141 181
142 awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=) 182 awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body
143 $ onPacket body
144 183
145 atomically $ setStatus $ InProgress AwaitingSessionPacket 184 atomically $ setStatus $ InProgress AwaitingSessionPacket
146 atomically $ setStatus Established 185 awaitPacket $ \_ (online,()) -> do
147 awaitPacket $ \loop x -> do 186 when (msgID online /= ONLINE) $ do
148 case msgID x of 187 inPrint $ "Unexpected initial packet: " ++ show (msgID online)
149 KillPacket -> return () 188 atomically $ do setStatus Established
150 _ -> atomically (sendPacket x) >> loop 189 sendPacket online
151 190 bump
152 atomically $ setStatus Dormant 191 beacon <- forkIO $ keepAlive s q
153 192 awaitPacket $ \awaitNext (x,()) -> do
154 193 bump
155sSessionID :: Session -> Int 194 case msgID x of
156sSessionID s = fromIntegral $ ncSessionId s 195 PING -> return ()
196 KillPacket -> sClose s
197 _ -> atomically $ sendPacket x
198 awaitNext
199 atomically $ setStatus Dormant
200 killThread beacon
157 201
158-- | Add a new session (in 'AwaitingSessionPacket' state) to the 202-- | Add a new session (in 'AwaitingSessionPacket' state) to the
159-- 'AggregateSession'. If the supplied session is not compatible because it is 203-- 'AggregateSession'. If the supplied session is not compatible because it is
@@ -166,8 +210,8 @@ sSessionID s = fromIntegral $ ncSessionId s
166addSession :: AggregateSession -> Session -> IO AddResult 210addSession :: AggregateSession -> Session -> IO AddResult
167addSession c s = do 211addSession c s = do
168 (result,mcon,replaced) <- atomically $ do 212 (result,mcon,replaced) <- atomically $ do
169 let them = ncTheirPublicKey s 213 let them = sTheirUserKey s
170 me = ncMyPublicKey s 214 me = toPublic $ sOurKey s
171 compat <- checkCompatible me them c 215 compat <- checkCompatible me them c
172 let result = case compat of 216 let result = case compat of
173 Nothing -> FirstSession 217 Nothing -> FirstSession
@@ -184,7 +228,7 @@ addSession c s = do
184 writeTVar (contactSession c) imap' 228 writeTVar (contactSession c) imap'
185 return (result,Just con,s0) 229 return (result,Just con,s0)
186 230
187 mapM_ (destroySession . singleSession) replaced 231 mapM_ (sClose . singleSession) replaced
188 forM_ mcon $ \con -> 232 forM_ mcon $ \con ->
189 forkSession c s $ \progress -> do 233 forkSession c s $ \progress -> do
190 writeTVar (singleStatus con) progress 234 writeTVar (singleStatus con) progress
@@ -203,6 +247,7 @@ addSession c s = do
203 return emap' 247 return emap'
204 writeTVar (contactEstablished c) emap' 248 writeTVar (contactEstablished c) emap'
205 return result 249 return result
250
206-- | Information returned from 'delSession'. 251-- | Information returned from 'delSession'.
207data DelResult = NoSession -- ^ Contact is completely disconnected. 252data DelResult = NoSession -- ^ Contact is completely disconnected.
208 | DeletedSession -- ^ Connection removed but session remains active. 253 | DeletedSession -- ^ Connection removed but session remains active.
@@ -230,11 +275,10 @@ delSession c sid = do
230 writeTVar (contactSession c) imap' 275 writeTVar (contactSession c) imap'
231 writeTVar (contactEstablished c) emap' 276 writeTVar (contactEstablished c) emap'
232 return ( IntMap.lookup sid imap, IntMap.null imap') 277 return ( IntMap.lookup sid imap, IntMap.null imap')
233 mapM_ (destroySession . singleSession) con 278 mapM_ (sClose . singleSession) con
234 return $ if r then NoSession 279 return $ if r then NoSession
235 else DeletedSession 280 else DeletedSession
236 281
237
238-- | Send a packet to one or all of the component sessions in the aggregate. 282-- | Send a packet to one or all of the component sessions in the aggregate.
239dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. 283dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID.
240 -> CryptoMessage -> IO () 284 -> CryptoMessage -> IO ()
@@ -242,11 +286,7 @@ dispatchMessage c msid msg = join $ atomically $ do
242 imap <- readTVar (contactSession c) 286 imap <- readTVar (contactSession c)
243 let go = case msid of Nothing -> forM_ imap 287 let go = case msid of Nothing -> forM_ imap
244 Just sid -> forM_ (IntMap.lookup sid imap) 288 Just sid -> forM_ (IntMap.lookup sid imap)
245 return $ go $ \con -> do 289 return $ go $ \con -> sendMessage (sTransport $ singleSession con) () msg
246 eResult <- sendLossless (transportCrypto (ncAllSessions (singleSession con))) (singleSession con) msg
247 case eResult of
248 Left msg -> dput XJabber msg
249 Right pkt -> dput XJabber ("sendLossLess SUCCESS: " ++ show pkt)
250 290
251-- | Retry until: 291-- | Retry until:
252-- 292--
@@ -287,7 +327,6 @@ aggregateStatus c = do
287 | not (IntMap.null imap) -> InProgress AwaitingSessionPacket 327 | not (IntMap.null imap) -> InProgress AwaitingSessionPacket
288 | otherwise -> Dormant 328 | otherwise -> Dormant
289 329
290
291-- | Query whether the supplied ToxID keys are compatible with this aggregate. 330-- | Query whether the supplied ToxID keys are compatible with this aggregate.
292-- 331--
293-- [ Nothing ] Any keys would be compatible because there is not yet any 332-- [ Nothing ] Any keys would be compatible because there is not yet any
@@ -304,8 +343,8 @@ checkCompatible me them c = do
304 imap <- readTVar (contactSession c) 343 imap <- readTVar (contactSession c)
305 return $ case IntMap.elems imap of 344 return $ case IntMap.elems imap of
306 _ | isclosed -> Just False -- All keys are incompatible (closed). 345 _ | isclosed -> Just False -- All keys are incompatible (closed).
307 con:_ -> Just $ ncTheirPublicKey (singleSession con) == them 346 con:_ -> Just $ sTheirUserKey (singleSession con) == them
308 && (ncMyPublicKey $ singleSession con) == me 347 && toPublic (sOurKey $ singleSession con) == me
309 [] -> Nothing 348 [] -> Nothing
310 349
311-- | Returns the local and remote keys that are compatible with this aggregate. 350-- | Returns the local and remote keys that are compatible with this aggregate.
@@ -317,6 +356,6 @@ compatibleKeys c = do
317 imap <- readTVar (contactSession c) 356 imap <- readTVar (contactSession c)
318 return $ case IntMap.elems imap of 357 return $ case IntMap.elems imap of
319 _ | isclosed -> Nothing -- none. 358 _ | isclosed -> Nothing -- none.
320 con:_ -> Just ( ncMyPublicKey $ singleSession con 359 con:_ -> Just ( toPublic (sOurKey $ singleSession con)
321 , ncTheirPublicKey (singleSession con)) 360 , sTheirUserKey (singleSession con))
322 [] -> Nothing -- any. 361 [] -> Nothing -- any.