summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-09-08 01:31:56 -0400
committerJoe Crayne <joe@jerkface.net>2018-09-08 04:40:10 -0400
commit55c5a979b3b25e1b7a13b1361c5f9cf1222f1653 (patch)
treebd2a09f55bfc9bc1a8fc8093c6300f61429a7cf1 /src
parentfc871868c05c152c47e716e0f5271d62276ceebb (diff)
AggregateSession combines related NetCrypto sessions into one
Diffstat (limited to 'src')
-rw-r--r--src/Network/Tox/AggregateSession.hs323
1 files changed, 323 insertions, 0 deletions
diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs
new file mode 100644
index 00000000..d4497c48
--- /dev/null
+++ b/src/Network/Tox/AggregateSession.hs
@@ -0,0 +1,323 @@
1-- | This module aggregates all sessions to the same remote Tox contact into a
2-- single online/offline presence. This allows multiple lossless links to the
3-- same identity at different addresses, or even to the same address.
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE LambdaCase #-}
6{-# LANGUAGE PatternSynonyms #-}
7module Network.Tox.AggregateSession
8 ( AggregateSession
9 , newAggregateSession
10 , aggregateStatus
11 , checkCompatible
12 , AddResult(..)
13 , addSession
14 , DelResult(..)
15 , delSession
16 , closeAll
17 , awaitAny
18 , dispatchMessage
19 ) where
20
21
22import Control.Concurrent.STM
23import Control.Concurrent.STM.TMChan
24import Control.Concurrent.Supply
25import Control.Monad
26import Data.Function
27import qualified Data.IntMap.Strict as IntMap
28 ;import Data.IntMap.Strict (IntMap)
29import Data.List
30import Data.Time.Clock.POSIX
31
32#ifdef THREAD_DEBUG
33import Control.Concurrent.Lifted.Instrument
34#else
35import Control.Concurrent.Lifted
36import GHC.Conc (labelThread)
37#endif
38
39import Connection (Status (..))
40import Crypto.Tox (PublicKey, toPublic)
41import Data.Wrapper.PSQInt as PSQ
42import DPut
43import Network.QueryResponse
44import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket,
45 pattern ONLINE, pattern PING,
46 pattern PacketRequest)
47import Network.Tox.DHT.Transport (key2id)
48import Network.Tox.NodeId (ToxProgress (..))
49import Network.Tox.Crypto.Handlers
50
51type Session = NetCryptoSession
52
53-- | For each component session, we track the current status.
54data SingleCon = SingleCon
55 { singleSession :: Session -- ^ A component session.
56 , singleStatus :: TVar (Status ToxProgress) -- ^ Either 'AwaitingSessionPacket' or 'Established'.
57 }
58
59-- | A collection of sessions between the same local and remote identities.
60data AggregateSession = AggregateSession
61 { -- | The set of component sessions indexed by their ID.
62 contactSession :: TVar (IntMap SingleCon)
63 -- | Each inbound packets is written to this channel with the session ID
64 -- from which it came originally.
65 , contactChannel :: TMChan (Int,CryptoMessage)
66 -- | The set of 'Established' sessions IDs.
67 , contactEstablished :: TVar (IntMap ())
68 -- | Callback for state-change notifications.
69 , notifyState :: AggregateSession -> Session -> Status ToxProgress -> STM ()
70 }
71
72
73-- | Create a new empty aggregate session. The argument is a callback to
74-- receive notifications when the new session changes status. There are three
75-- possible status values:
76--
77-- [ Dormant ] - No pending or established sessions.
78--
79-- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are
80-- fully established.
81--
82-- [ Established ] - At least one session is fully established and we can
83-- send and receive packets via this aggregate.
84--
85-- The 'Session' object is provided to the callback so that it can determine the
86-- current remote and local identities for this AggregateSession. It may not even
87-- be Established, so do not use it to send or receive packets.
88newAggregateSession :: (AggregateSession -> Session -> Status ToxProgress -> STM ())
89 -> STM AggregateSession
90newAggregateSession notify = do
91 vimap <- newTVar IntMap.empty
92 chan <- newTMChan
93 vemap <- newTVar IntMap.empty
94 return AggregateSession
95 { contactSession = vimap
96 , contactChannel = chan
97 , contactEstablished = vemap
98 , notifyState = notify
99 }
100
101-- | Information returned from 'addSession'. Note that a value other than
102-- 'RejectedSession' does not mean there is any 'Established' session in the
103-- Aggregate. Sessions are in 'AwaitingSessionPacket' state until a single
104-- packet is received from the remote end.
105data AddResult = FirstSession -- ^ Initial connection with this contact.
106 | AddedSession -- ^ Added another connection to active session.
107 | RejectedSession -- ^ Failed to add session (wrong contact / closed session).
108
109-- | The 'keepAlive' thread juggles three scheduled tasks.
110data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it.
111 | DoAlive -- ^ Send a the keep-alive becon for a session.
112 | DoRequestMissing -- ^ Detect and request lost packets.
113 deriving Enum
114
115-- | This function forks a thread to read all packets from the provided
116-- 'Session' and forward them to 'contactChannel' for a containing
117-- 'AggregateSession'
118forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId
119forkSession c s setStatus = forkIO $ do
120 myThreadId >>= flip labelThread
121 (intercalate "." ["s"
122 , take 8 $ show $ key2id $ ncTheirPublicKey s
123 , show $ sSessionID s])
124 tmchan <- atomically $ do
125 tmchan <- newTMChan
126 supply <- readTVar (listenerIDSupply $ ncAllSessions s)
127 let (listenerId,supply') = freshId supply
128 writeTVar (listenerIDSupply $ ncAllSessions s) supply'
129 modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan))
130 return tmchan
131
132 let sendPacket :: CryptoMessage -> STM ()
133 sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg)
134
135 inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e
136
137 onPacket body loop Nothing = return ()
138 onPacket body loop (Just (Left e)) = inPrint e >> loop
139 onPacket body loop (Just (Right x)) = body loop x
140
141 awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=)
142 $ onPacket body
143
144 atomically $ setStatus $ InProgress AwaitingSessionPacket
145 atomically $ setStatus Established
146 awaitPacket $ \loop x -> do
147 case msgID x of
148 KillPacket -> return ()
149 _ -> atomically (sendPacket x) >> loop
150
151 atomically $ setStatus Dormant
152
153
154sSessionID :: Session -> Int
155sSessionID s = fromIntegral $ ncSessionId s
156
157-- | Add a new session (in 'AwaitingSessionPacket' state) to the
158-- 'AggregateSession'. If the supplied session is not compatible because it is
159-- between the wrong ToxIDs or because the AggregateSession is closed,
160-- 'RejectedSession' will be returned. Otherwise, the operation is successful.
161--
162-- The status-change callback may be triggered by this call as the aggregate
163-- may transition from 'Dormant' (empty) to 'AwaitingSessionPacket' (at least
164-- one active session).
165addSession :: AggregateSession -> Session -> IO AddResult
166addSession c s = do
167 (result,mcon,replaced) <- atomically $ do
168 let them = ncTheirPublicKey s
169 me = ncMyPublicKey s
170 compat <- checkCompatible me them c
171 let result = case compat of
172 Nothing -> FirstSession
173 Just True -> AddedSession
174 Just False -> RejectedSession
175 case result of
176 RejectedSession -> return (result,Nothing,Nothing)
177 _ -> do
178 statvar <- newTVar Dormant
179 imap <- readTVar (contactSession c)
180 let con = SingleCon s statvar
181 s0 = IntMap.lookup (sSessionID s) imap
182 imap' = IntMap.insert (sSessionID s) con imap
183 writeTVar (contactSession c) imap'
184 return (result,Just con,s0)
185
186 mapM_ (destroySession . singleSession) replaced
187 forM_ mcon $ \con ->
188 forkSession c s $ \progress -> do
189 writeTVar (singleStatus con) progress
190 emap <- readTVar (contactEstablished c)
191 emap' <- case progress of
192 Established -> do
193 when (IntMap.null emap) $ notifyState c c s Established
194 return $ IntMap.insert (sSessionID s) () emap
195 _ -> do
196 let emap' = IntMap.delete (sSessionID s) emap
197 when (IntMap.null emap' && not (IntMap.null emap)) $ do
198 imap <- readTVar (contactSession c)
199 notifyState c c s
200 $ if IntMap.null imap then Dormant
201 else InProgress AwaitingSessionPacket
202 return emap'
203 writeTVar (contactEstablished c) emap'
204 return result
205-- | Information returned from 'delSession'.
206data DelResult = NoSession -- ^ Contact is completely disconnected.
207 | DeletedSession -- ^ Connection removed but session remains active.
208
209-- | Close and remove the componenent session corresponding to the provided
210-- Session ID.
211--
212-- The status-change callback may be triggered as the aggregate may may
213-- transition to 'Dormant' (empty) or 'AwaitingSessionPacket' (if the last
214-- 'Established' session is closed).
215delSession :: AggregateSession -> Int -> IO DelResult
216delSession c sid = do
217 (con, r) <- atomically $ do
218 imap <- readTVar (contactSession c)
219 emap <- readTVar (contactEstablished c)
220 let emap' = IntMap.delete sid emap
221 imap' = IntMap.delete sid imap
222 case IntMap.toList emap of
223 (sid0,_):_ | IntMap.null emap'
224 , let s = singleSession $ imap IntMap.! sid0
225 -> notifyState c c s
226 $ if IntMap.null imap' then Dormant
227 else InProgress AwaitingSessionPacket
228 _ -> return ()
229 writeTVar (contactSession c) imap'
230 writeTVar (contactEstablished c) emap'
231 return ( IntMap.lookup sid imap, IntMap.null imap')
232 mapM_ (destroySession . singleSession) con
233 return $ if r then NoSession
234 else DeletedSession
235
236
237-- | Send a packet to one or all of the component sessions in the aggregate.
238dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID.
239 -> CryptoMessage -> IO ()
240dispatchMessage c msid msg = join $ atomically $ do
241 imap <- readTVar (contactSession c)
242 let go = case msid of Nothing -> forM_ imap
243 Just sid -> forM_ (IntMap.lookup sid imap)
244 return $ go $ \con -> do
245 outq <- atomically $ do
246 mbOutq <- readTVar (ncOutgoingQueue $ singleSession con)
247 case mbOutq of
248 HaveHandshake outq -> return outq
249 NeedHandshake -> retry
250 extra <- nqToWireIO outq
251 r <- atomically $ do
252 rTry <- tryAppendQueueOutgoing extra outq msg
253 case rTry of
254 OGFull -> retry
255 OGSuccess x -> return (OGSuccess x)
256 OGEncodeFail -> return OGEncodeFail
257 case r of
258 OGSuccess x -> case ncSockAddr (singleSession con) of
259 HaveDHTKey saddr -> sendSessionPacket (ncAllSessions $ singleSession con) saddr x
260 _ -> return ()
261 OGEncodeFail -> dput XMisc ("FAILURE to Encode Outgoing: " ++ show msg)
262 _ -> return ()
263
264
265-- | Retry until:
266--
267-- * a packet arrives (with component session ID) arrives.
268--
269-- * the 'AggregateSession' is closed with 'closeAll'.
270awaitAny :: AggregateSession -> STM (Maybe (Int,CryptoMessage))
271awaitAny c = readTMChan (contactChannel c)
272
273-- | Close all connections associated with the aggregate. No new sessions will
274-- be accempted after this, and the notify callback will be informed that we've
275-- transitioned to 'Dormant'.
276closeAll :: AggregateSession -> IO ()
277closeAll c = join $ atomically $ do
278 imap <- readTVar (contactSession c)
279 closeTMChan (contactChannel c)
280 return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid
281
282-- | Query the current status of the aggregate, there are three possible
283-- values:
284--
285-- [ Dormant ] - No pending or established sessions.
286--
287-- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are
288-- fully established.
289--
290-- [ Established ] - At least one session is fully established and we can
291-- send and receive packets via this aggregate.
292--
293aggregateStatus :: AggregateSession -> STM (Status ToxProgress)
294aggregateStatus c = do
295 isclosed <- isClosedTMChan (contactChannel c)
296 imap <- readTVar (contactSession c)
297 emap <- readTVar (contactEstablished c)
298 return $ case () of
299 _ | isclosed -> Dormant
300 | not (IntMap.null emap) -> Established
301 | not (IntMap.null imap) -> InProgress AwaitingSessionPacket
302 | otherwise -> Dormant
303
304
305-- | Query whether the supplied ToxID keys are compatible with this aggregate.
306--
307-- [ Nothing ] Any keys would be compatible because there is not yet any
308-- sessions in progress.
309--
310-- [ Just True ] The supplied keys match the session in progress.
311--
312-- [ Just False ] The supplied keys are incompatible.
313checkCompatible :: PublicKey -- ^ Local Tox key (for which we know the secret).
314 -> PublicKey -- ^ Remote Tox key.
315 -> AggregateSession -> STM (Maybe Bool)
316checkCompatible me them c = do
317 isclosed <- isClosedTMChan (contactChannel c)
318 imap <- readTVar (contactSession c)
319 return $ case IntMap.elems imap of
320 _ | isclosed -> Just False -- All keys are incompatible (closed).
321 con:_ -> Just $ ncTheirPublicKey (singleSession con) == them
322 && (ncMyPublicKey $ singleSession con) == me
323 [] -> Nothing