diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/Tox/AggregateSession.hs | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs new file mode 100644 index 00000000..d4497c48 --- /dev/null +++ b/src/Network/Tox/AggregateSession.hs | |||
@@ -0,0 +1,323 @@ | |||
1 | -- | This module aggregates all sessions to the same remote Tox contact into a | ||
2 | -- single online/offline presence. This allows multiple lossless links to the | ||
3 | -- same identity at different addresses, or even to the same address. | ||
4 | {-# LANGUAGE CPP #-} | ||
5 | {-# LANGUAGE LambdaCase #-} | ||
6 | {-# LANGUAGE PatternSynonyms #-} | ||
7 | module Network.Tox.AggregateSession | ||
8 | ( AggregateSession | ||
9 | , newAggregateSession | ||
10 | , aggregateStatus | ||
11 | , checkCompatible | ||
12 | , AddResult(..) | ||
13 | , addSession | ||
14 | , DelResult(..) | ||
15 | , delSession | ||
16 | , closeAll | ||
17 | , awaitAny | ||
18 | , dispatchMessage | ||
19 | ) where | ||
20 | |||
21 | |||
22 | import Control.Concurrent.STM | ||
23 | import Control.Concurrent.STM.TMChan | ||
24 | import Control.Concurrent.Supply | ||
25 | import Control.Monad | ||
26 | import Data.Function | ||
27 | import qualified Data.IntMap.Strict as IntMap | ||
28 | ;import Data.IntMap.Strict (IntMap) | ||
29 | import Data.List | ||
30 | import Data.Time.Clock.POSIX | ||
31 | |||
32 | #ifdef THREAD_DEBUG | ||
33 | import Control.Concurrent.Lifted.Instrument | ||
34 | #else | ||
35 | import Control.Concurrent.Lifted | ||
36 | import GHC.Conc (labelThread) | ||
37 | #endif | ||
38 | |||
39 | import Connection (Status (..)) | ||
40 | import Crypto.Tox (PublicKey, toPublic) | ||
41 | import Data.Wrapper.PSQInt as PSQ | ||
42 | import DPut | ||
43 | import Network.QueryResponse | ||
44 | import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket, | ||
45 | pattern ONLINE, pattern PING, | ||
46 | pattern PacketRequest) | ||
47 | import Network.Tox.DHT.Transport (key2id) | ||
48 | import Network.Tox.NodeId (ToxProgress (..)) | ||
49 | import Network.Tox.Crypto.Handlers | ||
50 | |||
51 | type Session = NetCryptoSession | ||
52 | |||
53 | -- | For each component session, we track the current status. | ||
54 | data SingleCon = SingleCon | ||
55 | { singleSession :: Session -- ^ A component session. | ||
56 | , singleStatus :: TVar (Status ToxProgress) -- ^ Either 'AwaitingSessionPacket' or 'Established'. | ||
57 | } | ||
58 | |||
59 | -- | A collection of sessions between the same local and remote identities. | ||
60 | data AggregateSession = AggregateSession | ||
61 | { -- | The set of component sessions indexed by their ID. | ||
62 | contactSession :: TVar (IntMap SingleCon) | ||
63 | -- | Each inbound packets is written to this channel with the session ID | ||
64 | -- from which it came originally. | ||
65 | , contactChannel :: TMChan (Int,CryptoMessage) | ||
66 | -- | The set of 'Established' sessions IDs. | ||
67 | , contactEstablished :: TVar (IntMap ()) | ||
68 | -- | Callback for state-change notifications. | ||
69 | , notifyState :: AggregateSession -> Session -> Status ToxProgress -> STM () | ||
70 | } | ||
71 | |||
72 | |||
73 | -- | Create a new empty aggregate session. The argument is a callback to | ||
74 | -- receive notifications when the new session changes status. There are three | ||
75 | -- possible status values: | ||
76 | -- | ||
77 | -- [ Dormant ] - No pending or established sessions. | ||
78 | -- | ||
79 | -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are | ||
80 | -- fully established. | ||
81 | -- | ||
82 | -- [ Established ] - At least one session is fully established and we can | ||
83 | -- send and receive packets via this aggregate. | ||
84 | -- | ||
85 | -- The 'Session' object is provided to the callback so that it can determine the | ||
86 | -- current remote and local identities for this AggregateSession. It may not even | ||
87 | -- be Established, so do not use it to send or receive packets. | ||
88 | newAggregateSession :: (AggregateSession -> Session -> Status ToxProgress -> STM ()) | ||
89 | -> STM AggregateSession | ||
90 | newAggregateSession notify = do | ||
91 | vimap <- newTVar IntMap.empty | ||
92 | chan <- newTMChan | ||
93 | vemap <- newTVar IntMap.empty | ||
94 | return AggregateSession | ||
95 | { contactSession = vimap | ||
96 | , contactChannel = chan | ||
97 | , contactEstablished = vemap | ||
98 | , notifyState = notify | ||
99 | } | ||
100 | |||
101 | -- | Information returned from 'addSession'. Note that a value other than | ||
102 | -- 'RejectedSession' does not mean there is any 'Established' session in the | ||
103 | -- Aggregate. Sessions are in 'AwaitingSessionPacket' state until a single | ||
104 | -- packet is received from the remote end. | ||
105 | data AddResult = FirstSession -- ^ Initial connection with this contact. | ||
106 | | AddedSession -- ^ Added another connection to active session. | ||
107 | | RejectedSession -- ^ Failed to add session (wrong contact / closed session). | ||
108 | |||
109 | -- | The 'keepAlive' thread juggles three scheduled tasks. | ||
110 | data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | ||
111 | | DoAlive -- ^ Send a the keep-alive becon for a session. | ||
112 | | DoRequestMissing -- ^ Detect and request lost packets. | ||
113 | deriving Enum | ||
114 | |||
115 | -- | This function forks a thread to read all packets from the provided | ||
116 | -- 'Session' and forward them to 'contactChannel' for a containing | ||
117 | -- 'AggregateSession' | ||
118 | forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId | ||
119 | forkSession c s setStatus = forkIO $ do | ||
120 | myThreadId >>= flip labelThread | ||
121 | (intercalate "." ["s" | ||
122 | , take 8 $ show $ key2id $ ncTheirPublicKey s | ||
123 | , show $ sSessionID s]) | ||
124 | tmchan <- atomically $ do | ||
125 | tmchan <- newTMChan | ||
126 | supply <- readTVar (listenerIDSupply $ ncAllSessions s) | ||
127 | let (listenerId,supply') = freshId supply | ||
128 | writeTVar (listenerIDSupply $ ncAllSessions s) supply' | ||
129 | modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan)) | ||
130 | return tmchan | ||
131 | |||
132 | let sendPacket :: CryptoMessage -> STM () | ||
133 | sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) | ||
134 | |||
135 | inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e | ||
136 | |||
137 | onPacket body loop Nothing = return () | ||
138 | onPacket body loop (Just (Left e)) = inPrint e >> loop | ||
139 | onPacket body loop (Just (Right x)) = body loop x | ||
140 | |||
141 | awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=) | ||
142 | $ onPacket body | ||
143 | |||
144 | atomically $ setStatus $ InProgress AwaitingSessionPacket | ||
145 | atomically $ setStatus Established | ||
146 | awaitPacket $ \loop x -> do | ||
147 | case msgID x of | ||
148 | KillPacket -> return () | ||
149 | _ -> atomically (sendPacket x) >> loop | ||
150 | |||
151 | atomically $ setStatus Dormant | ||
152 | |||
153 | |||
154 | sSessionID :: Session -> Int | ||
155 | sSessionID s = fromIntegral $ ncSessionId s | ||
156 | |||
157 | -- | Add a new session (in 'AwaitingSessionPacket' state) to the | ||
158 | -- 'AggregateSession'. If the supplied session is not compatible because it is | ||
159 | -- between the wrong ToxIDs or because the AggregateSession is closed, | ||
160 | -- 'RejectedSession' will be returned. Otherwise, the operation is successful. | ||
161 | -- | ||
162 | -- The status-change callback may be triggered by this call as the aggregate | ||
163 | -- may transition from 'Dormant' (empty) to 'AwaitingSessionPacket' (at least | ||
164 | -- one active session). | ||
165 | addSession :: AggregateSession -> Session -> IO AddResult | ||
166 | addSession c s = do | ||
167 | (result,mcon,replaced) <- atomically $ do | ||
168 | let them = ncTheirPublicKey s | ||
169 | me = ncMyPublicKey s | ||
170 | compat <- checkCompatible me them c | ||
171 | let result = case compat of | ||
172 | Nothing -> FirstSession | ||
173 | Just True -> AddedSession | ||
174 | Just False -> RejectedSession | ||
175 | case result of | ||
176 | RejectedSession -> return (result,Nothing,Nothing) | ||
177 | _ -> do | ||
178 | statvar <- newTVar Dormant | ||
179 | imap <- readTVar (contactSession c) | ||
180 | let con = SingleCon s statvar | ||
181 | s0 = IntMap.lookup (sSessionID s) imap | ||
182 | imap' = IntMap.insert (sSessionID s) con imap | ||
183 | writeTVar (contactSession c) imap' | ||
184 | return (result,Just con,s0) | ||
185 | |||
186 | mapM_ (destroySession . singleSession) replaced | ||
187 | forM_ mcon $ \con -> | ||
188 | forkSession c s $ \progress -> do | ||
189 | writeTVar (singleStatus con) progress | ||
190 | emap <- readTVar (contactEstablished c) | ||
191 | emap' <- case progress of | ||
192 | Established -> do | ||
193 | when (IntMap.null emap) $ notifyState c c s Established | ||
194 | return $ IntMap.insert (sSessionID s) () emap | ||
195 | _ -> do | ||
196 | let emap' = IntMap.delete (sSessionID s) emap | ||
197 | when (IntMap.null emap' && not (IntMap.null emap)) $ do | ||
198 | imap <- readTVar (contactSession c) | ||
199 | notifyState c c s | ||
200 | $ if IntMap.null imap then Dormant | ||
201 | else InProgress AwaitingSessionPacket | ||
202 | return emap' | ||
203 | writeTVar (contactEstablished c) emap' | ||
204 | return result | ||
205 | -- | Information returned from 'delSession'. | ||
206 | data DelResult = NoSession -- ^ Contact is completely disconnected. | ||
207 | | DeletedSession -- ^ Connection removed but session remains active. | ||
208 | |||
209 | -- | Close and remove the componenent session corresponding to the provided | ||
210 | -- Session ID. | ||
211 | -- | ||
212 | -- The status-change callback may be triggered as the aggregate may may | ||
213 | -- transition to 'Dormant' (empty) or 'AwaitingSessionPacket' (if the last | ||
214 | -- 'Established' session is closed). | ||
215 | delSession :: AggregateSession -> Int -> IO DelResult | ||
216 | delSession c sid = do | ||
217 | (con, r) <- atomically $ do | ||
218 | imap <- readTVar (contactSession c) | ||
219 | emap <- readTVar (contactEstablished c) | ||
220 | let emap' = IntMap.delete sid emap | ||
221 | imap' = IntMap.delete sid imap | ||
222 | case IntMap.toList emap of | ||
223 | (sid0,_):_ | IntMap.null emap' | ||
224 | , let s = singleSession $ imap IntMap.! sid0 | ||
225 | -> notifyState c c s | ||
226 | $ if IntMap.null imap' then Dormant | ||
227 | else InProgress AwaitingSessionPacket | ||
228 | _ -> return () | ||
229 | writeTVar (contactSession c) imap' | ||
230 | writeTVar (contactEstablished c) emap' | ||
231 | return ( IntMap.lookup sid imap, IntMap.null imap') | ||
232 | mapM_ (destroySession . singleSession) con | ||
233 | return $ if r then NoSession | ||
234 | else DeletedSession | ||
235 | |||
236 | |||
237 | -- | Send a packet to one or all of the component sessions in the aggregate. | ||
238 | dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. | ||
239 | -> CryptoMessage -> IO () | ||
240 | dispatchMessage c msid msg = join $ atomically $ do | ||
241 | imap <- readTVar (contactSession c) | ||
242 | let go = case msid of Nothing -> forM_ imap | ||
243 | Just sid -> forM_ (IntMap.lookup sid imap) | ||
244 | return $ go $ \con -> do | ||
245 | outq <- atomically $ do | ||
246 | mbOutq <- readTVar (ncOutgoingQueue $ singleSession con) | ||
247 | case mbOutq of | ||
248 | HaveHandshake outq -> return outq | ||
249 | NeedHandshake -> retry | ||
250 | extra <- nqToWireIO outq | ||
251 | r <- atomically $ do | ||
252 | rTry <- tryAppendQueueOutgoing extra outq msg | ||
253 | case rTry of | ||
254 | OGFull -> retry | ||
255 | OGSuccess x -> return (OGSuccess x) | ||
256 | OGEncodeFail -> return OGEncodeFail | ||
257 | case r of | ||
258 | OGSuccess x -> case ncSockAddr (singleSession con) of | ||
259 | HaveDHTKey saddr -> sendSessionPacket (ncAllSessions $ singleSession con) saddr x | ||
260 | _ -> return () | ||
261 | OGEncodeFail -> dput XMisc ("FAILURE to Encode Outgoing: " ++ show msg) | ||
262 | _ -> return () | ||
263 | |||
264 | |||
265 | -- | Retry until: | ||
266 | -- | ||
267 | -- * a packet arrives (with component session ID) arrives. | ||
268 | -- | ||
269 | -- * the 'AggregateSession' is closed with 'closeAll'. | ||
270 | awaitAny :: AggregateSession -> STM (Maybe (Int,CryptoMessage)) | ||
271 | awaitAny c = readTMChan (contactChannel c) | ||
272 | |||
273 | -- | Close all connections associated with the aggregate. No new sessions will | ||
274 | -- be accempted after this, and the notify callback will be informed that we've | ||
275 | -- transitioned to 'Dormant'. | ||
276 | closeAll :: AggregateSession -> IO () | ||
277 | closeAll c = join $ atomically $ do | ||
278 | imap <- readTVar (contactSession c) | ||
279 | closeTMChan (contactChannel c) | ||
280 | return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid | ||
281 | |||
282 | -- | Query the current status of the aggregate, there are three possible | ||
283 | -- values: | ||
284 | -- | ||
285 | -- [ Dormant ] - No pending or established sessions. | ||
286 | -- | ||
287 | -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are | ||
288 | -- fully established. | ||
289 | -- | ||
290 | -- [ Established ] - At least one session is fully established and we can | ||
291 | -- send and receive packets via this aggregate. | ||
292 | -- | ||
293 | aggregateStatus :: AggregateSession -> STM (Status ToxProgress) | ||
294 | aggregateStatus c = do | ||
295 | isclosed <- isClosedTMChan (contactChannel c) | ||
296 | imap <- readTVar (contactSession c) | ||
297 | emap <- readTVar (contactEstablished c) | ||
298 | return $ case () of | ||
299 | _ | isclosed -> Dormant | ||
300 | | not (IntMap.null emap) -> Established | ||
301 | | not (IntMap.null imap) -> InProgress AwaitingSessionPacket | ||
302 | | otherwise -> Dormant | ||
303 | |||
304 | |||
305 | -- | Query whether the supplied ToxID keys are compatible with this aggregate. | ||
306 | -- | ||
307 | -- [ Nothing ] Any keys would be compatible because there is not yet any | ||
308 | -- sessions in progress. | ||
309 | -- | ||
310 | -- [ Just True ] The supplied keys match the session in progress. | ||
311 | -- | ||
312 | -- [ Just False ] The supplied keys are incompatible. | ||
313 | checkCompatible :: PublicKey -- ^ Local Tox key (for which we know the secret). | ||
314 | -> PublicKey -- ^ Remote Tox key. | ||
315 | -> AggregateSession -> STM (Maybe Bool) | ||
316 | checkCompatible me them c = do | ||
317 | isclosed <- isClosedTMChan (contactChannel c) | ||
318 | imap <- readTVar (contactSession c) | ||
319 | return $ case IntMap.elems imap of | ||
320 | _ | isclosed -> Just False -- All keys are incompatible (closed). | ||
321 | con:_ -> Just $ ncTheirPublicKey (singleSession con) == them | ||
322 | && (ncMyPublicKey $ singleSession con) == me | ||
323 | [] -> Nothing | ||