diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /src/Network/Tox/AggregateSession.hs | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'src/Network/Tox/AggregateSession.hs')
-rw-r--r-- | src/Network/Tox/AggregateSession.hs | 374 |
1 files changed, 0 insertions, 374 deletions
diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs deleted file mode 100644 index 8c728660..00000000 --- a/src/Network/Tox/AggregateSession.hs +++ /dev/null | |||
@@ -1,374 +0,0 @@ | |||
1 | -- | This module aggregates all sessions to the same remote Tox contact into a | ||
2 | -- single online/offline presence. This allows multiple lossless links to the | ||
3 | -- same identity at different addresses, or even to the same address. | ||
4 | {-# LANGUAGE CPP #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE PatternSynonyms #-} | ||
8 | module Network.Tox.AggregateSession | ||
9 | ( AggregateSession | ||
10 | , newAggregateSession | ||
11 | , aggregateStatus | ||
12 | , checkCompatible | ||
13 | , compatibleKeys | ||
14 | , AddResult(..) | ||
15 | , addSession | ||
16 | , DelResult(..) | ||
17 | , delSession | ||
18 | , closeAll | ||
19 | , awaitAny | ||
20 | , dispatchMessage | ||
21 | ) where | ||
22 | |||
23 | |||
24 | import Control.Concurrent.STM | ||
25 | import Control.Concurrent.STM.TMChan | ||
26 | import Control.Monad | ||
27 | import Data.Dependent.Sum | ||
28 | import Data.Function | ||
29 | import qualified Data.IntMap.Strict as IntMap | ||
30 | ;import Data.IntMap.Strict (IntMap) | ||
31 | import Data.List | ||
32 | import Data.Time.Clock.POSIX | ||
33 | import System.IO.Error | ||
34 | |||
35 | #ifdef THREAD_DEBUG | ||
36 | import Control.Concurrent.Lifted.Instrument | ||
37 | #else | ||
38 | import Control.Concurrent.Lifted | ||
39 | import GHC.Conc (labelThread) | ||
40 | #endif | ||
41 | |||
42 | import Connection (Status (..)) | ||
43 | import Crypto.Tox (PublicKey, toPublic) | ||
44 | import Data.Tox.Msg | ||
45 | import Data.Wrapper.PSQInt as PSQ | ||
46 | import DPut | ||
47 | import DebugTag | ||
48 | import Network.QueryResponse | ||
49 | import Network.Tox.Crypto.Transport | ||
50 | import Network.Tox.DHT.Transport (key2id) | ||
51 | import Network.Tox.NodeId (ToxProgress (..)) | ||
52 | import Network.Tox.Session | ||
53 | |||
54 | -- | For each component session, we track the current status. | ||
55 | data SingleCon = SingleCon | ||
56 | { singleSession :: Session -- ^ A component session. | ||
57 | , singleStatus :: TVar (Status ToxProgress) -- ^ Either 'AwaitingSessionPacket' or 'Established'. | ||
58 | } | ||
59 | |||
60 | -- | A collection of sessions between the same local and remote identities. | ||
61 | data AggregateSession = AggregateSession | ||
62 | { -- | The set of component sessions indexed by their ID. | ||
63 | contactSession :: TVar (IntMap SingleCon) | ||
64 | -- | Each inbound packets is written to this channel with the session ID | ||
65 | -- from which it came originally. | ||
66 | , contactChannel :: TMChan (Int,CryptoMessage) | ||
67 | -- | The set of 'Established' sessions IDs. | ||
68 | , contactEstablished :: TVar (IntMap ()) | ||
69 | -- | Callback for state-change notifications. | ||
70 | , notifyState :: AggregateSession -> Session -> Status ToxProgress -> STM () | ||
71 | } | ||
72 | |||
73 | |||
74 | -- | Create a new empty aggregate session. The argument is a callback to | ||
75 | -- receive notifications when the new session changes status. There are three | ||
76 | -- possible status values: | ||
77 | -- | ||
78 | -- [ Dormant ] - No pending or established sessions. | ||
79 | -- | ||
80 | -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are | ||
81 | -- fully established. | ||
82 | -- | ||
83 | -- [ Established ] - At least one session is fully established and we can | ||
84 | -- send and receive packets via this aggregate. | ||
85 | -- | ||
86 | -- The 'Session' object is provided to the callback so that it can determine the | ||
87 | -- current remote and local identities for this AggregateSession. It may not even | ||
88 | -- be Established, so do not use it to send or receive packets. | ||
89 | newAggregateSession :: (AggregateSession -> Session -> Status ToxProgress -> STM ()) | ||
90 | -> STM AggregateSession | ||
91 | newAggregateSession notify = do | ||
92 | vimap <- newTVar IntMap.empty | ||
93 | chan <- newTMChan | ||
94 | vemap <- newTVar IntMap.empty | ||
95 | return AggregateSession | ||
96 | { contactSession = vimap | ||
97 | , contactChannel = chan | ||
98 | , contactEstablished = vemap | ||
99 | , notifyState = notify | ||
100 | } | ||
101 | |||
102 | -- | Information returned from 'addSession'. Note that a value other than | ||
103 | -- 'RejectedSession' does not mean there is any 'Established' session in the | ||
104 | -- Aggregate. Sessions are in 'AwaitingSessionPacket' state until a single | ||
105 | -- packet is received from the remote end. | ||
106 | data AddResult = FirstSession -- ^ Initial connection with this contact. | ||
107 | | AddedSession -- ^ Added another connection to active session. | ||
108 | | RejectedSession -- ^ Failed to add session (wrong contact / closed session). | ||
109 | |||
110 | -- | The 'keepAlive' thread juggles three scheduled tasks. | ||
111 | data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | ||
112 | | DoAlive -- ^ Send a the keep-alive becon for a session. | ||
113 | | DoRequestMissing -- ^ Detect and request lost packets. | ||
114 | deriving Enum | ||
115 | |||
116 | -- | This call loops until the provided sesison is closed or times out. It | ||
117 | -- monitors the provided (non-empty) priority queue for scheduled tasks (see | ||
118 | -- 'KeepAliveEvents') to perform for the connection. | ||
119 | keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () | ||
120 | keepAlive s q = do | ||
121 | myThreadId >>= flip labelThread | ||
122 | (intercalate "." ["beacon" | ||
123 | , take 8 $ show $ key2id $ sTheirUserKey s | ||
124 | , show $ sSessionID s]) | ||
125 | |||
126 | let -- outPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e | ||
127 | unexpected e = dput XUnexpected $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e | ||
128 | |||
129 | doAlive = do | ||
130 | -- outPrint $ "Beacon" | ||
131 | sendMessage (sTransport s) () (Pkt ALIVE ==> ()) | ||
132 | |||
133 | doRequestMissing = do | ||
134 | (ns,nmin) <- sMissingInbound s | ||
135 | -- outPrint $ "PacketRequest " ++ show (nmin,ns) | ||
136 | sendMessage (sTransport s) () (Pkt PacketRequest ==> MissingPackets ns) | ||
137 | `catchIOError` \e -> do | ||
138 | unexpected $ "PacketRequest " ++ take 200 (show (nmin,length ns,ns)) | ||
139 | unexpected $ "PacketRequest: " ++ show e | ||
140 | -- Quit thread by scheduling a timeout event. | ||
141 | now <- getPOSIXTime | ||
142 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now | ||
143 | |||
144 | re tm again e io = do | ||
145 | io | ||
146 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm | ||
147 | again | ||
148 | |||
149 | doEvent again now e = case e of | ||
150 | DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) | ||
151 | sClose s | ||
152 | DoAlive -> re (now + 10) again e doAlive | ||
153 | DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals | ||
154 | |||
155 | fix $ \again -> do | ||
156 | |||
157 | now <- getPOSIXTime | ||
158 | join $ atomically $ do | ||
159 | PSQ.findMin <$> readTVar q >>= \case | ||
160 | Nothing -> error "keepAlive: unexpected empty PSQ." | ||
161 | Just ( k :-> tm ) -> | ||
162 | return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again | ||
163 | else doEvent again now (toEnum k) | ||
164 | |||
165 | |||
166 | -- | This function forks two threads: the 'keepAlive' beacon-sending thread and | ||
167 | -- a thread to read all packets from the provided 'Session' and forward them to | ||
168 | -- 'contactChannel' for a containing 'AggregateSession' | ||
169 | forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId | ||
170 | forkSession c s setStatus = forkIO $ do | ||
171 | myThreadId >>= flip labelThread | ||
172 | (intercalate "." ["s" | ||
173 | , take 8 $ show $ key2id $ sTheirUserKey s | ||
174 | , show $ sSessionID s]) | ||
175 | |||
176 | q <- atomically $ newTVar $ fromList | ||
177 | [ fromEnum DoAlive :-> 0 | ||
178 | , fromEnum DoRequestMissing :-> 0 | ||
179 | ] | ||
180 | |||
181 | let sendPacket :: CryptoMessage -> STM () | ||
182 | sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) | ||
183 | |||
184 | inPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " --> " ++ e | ||
185 | |||
186 | bump = do | ||
187 | -- inPrint $ "BUMP: " ++ show (sSessionID s) | ||
188 | now <- getPOSIXTime | ||
189 | atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) | ||
190 | |||
191 | onPacket body loop Nothing = return () | ||
192 | onPacket body loop (Just (Left e)) = inPrint e >> loop | ||
193 | onPacket body loop (Just (Right x)) = body loop x | ||
194 | |||
195 | awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body | ||
196 | |||
197 | atomically $ setStatus $ InProgress AwaitingSessionPacket | ||
198 | awaitPacket $ \_ (online,()) -> do | ||
199 | when (msgID online /= M ONLINE) $ do | ||
200 | inPrint $ "Unexpected initial packet: " ++ show (msgID online) | ||
201 | atomically $ do setStatus Established | ||
202 | sendPacket online | ||
203 | bump | ||
204 | beacon <- forkIO $ keepAlive s q | ||
205 | awaitPacket $ \awaitNext (x,()) -> do | ||
206 | bump | ||
207 | case msgID x of | ||
208 | M ALIVE -> return () | ||
209 | M KillPacket -> sClose s | ||
210 | _ -> atomically $ sendPacket x | ||
211 | awaitNext | ||
212 | atomically $ setStatus Dormant | ||
213 | killThread beacon | ||
214 | |||
215 | -- | Add a new session (in 'AwaitingSessionPacket' state) to the | ||
216 | -- 'AggregateSession'. If the supplied session is not compatible because it is | ||
217 | -- between the wrong ToxIDs or because the AggregateSession is closed, | ||
218 | -- 'RejectedSession' will be returned. Otherwise, the operation is successful. | ||
219 | -- | ||
220 | -- The status-change callback may be triggered by this call as the aggregate | ||
221 | -- may transition from 'Dormant' (empty) to 'AwaitingSessionPacket' (at least | ||
222 | -- one active session). | ||
223 | addSession :: AggregateSession -> Session -> IO AddResult | ||
224 | addSession c s = do | ||
225 | (result,mcon,replaced) <- atomically $ do | ||
226 | let them = sTheirUserKey s | ||
227 | me = toPublic $ sOurKey s | ||
228 | compat <- checkCompatible me them c | ||
229 | let result = case compat of | ||
230 | Nothing -> FirstSession | ||
231 | Just True -> AddedSession | ||
232 | Just False -> RejectedSession | ||
233 | case result of | ||
234 | RejectedSession -> return (result,Nothing,Nothing) | ||
235 | _ -> do | ||
236 | statvar <- newTVar Dormant | ||
237 | imap <- readTVar (contactSession c) | ||
238 | let con = SingleCon s statvar | ||
239 | s0 = IntMap.lookup (sSessionID s) imap | ||
240 | imap' = IntMap.insert (sSessionID s) con imap | ||
241 | writeTVar (contactSession c) imap' | ||
242 | return (result,Just con,s0) | ||
243 | |||
244 | mapM_ (sClose . singleSession) replaced | ||
245 | forM_ mcon $ \con -> | ||
246 | forkSession c s $ \progress -> do | ||
247 | writeTVar (singleStatus con) progress | ||
248 | emap <- readTVar (contactEstablished c) | ||
249 | emap' <- case progress of | ||
250 | Established -> do | ||
251 | when (IntMap.null emap) $ notifyState c c s Established | ||
252 | return $ IntMap.insert (sSessionID s) () emap | ||
253 | _ -> do | ||
254 | let emap' = IntMap.delete (sSessionID s) emap | ||
255 | when (IntMap.null emap' && not (IntMap.null emap)) $ do | ||
256 | imap <- readTVar (contactSession c) | ||
257 | notifyState c c s | ||
258 | $ if IntMap.null imap then Dormant | ||
259 | else InProgress AwaitingSessionPacket | ||
260 | return emap' | ||
261 | writeTVar (contactEstablished c) emap' | ||
262 | return result | ||
263 | |||
264 | -- | Information returned from 'delSession'. | ||
265 | data DelResult = NoSession -- ^ Contact is completely disconnected. | ||
266 | | DeletedSession -- ^ Connection removed but session remains active. | ||
267 | |||
268 | -- | Close and remove the componenent session corresponding to the provided | ||
269 | -- Session ID. | ||
270 | -- | ||
271 | -- The status-change callback may be triggered as the aggregate may may | ||
272 | -- transition to 'Dormant' (empty) or 'AwaitingSessionPacket' (if the last | ||
273 | -- 'Established' session is closed). | ||
274 | delSession :: AggregateSession -> Int -> IO DelResult | ||
275 | delSession c sid = do | ||
276 | (con, r) <- atomically $ do | ||
277 | imap <- readTVar (contactSession c) | ||
278 | emap <- readTVar (contactEstablished c) | ||
279 | let emap' = IntMap.delete sid emap | ||
280 | imap' = IntMap.delete sid imap | ||
281 | case IntMap.toList emap of | ||
282 | (sid0,_):_ | IntMap.null emap' | ||
283 | , let s = singleSession $ imap IntMap.! sid0 | ||
284 | -> notifyState c c s | ||
285 | $ if IntMap.null imap' then Dormant | ||
286 | else InProgress AwaitingSessionPacket | ||
287 | _ -> return () | ||
288 | writeTVar (contactSession c) imap' | ||
289 | writeTVar (contactEstablished c) emap' | ||
290 | return ( IntMap.lookup sid imap, IntMap.null imap') | ||
291 | mapM_ (sClose . singleSession) con | ||
292 | return $ if r then NoSession | ||
293 | else DeletedSession | ||
294 | |||
295 | -- | Send a packet to one or all of the component sessions in the aggregate. | ||
296 | dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. | ||
297 | -> CryptoMessage -> IO () | ||
298 | dispatchMessage c msid msg = join $ atomically $ do | ||
299 | imap <- readTVar (contactSession c) | ||
300 | let go = case msid of Nothing -> forM_ imap | ||
301 | Just sid -> forM_ (IntMap.lookup sid imap) | ||
302 | return $ go $ \con -> sendMessage (sTransport $ singleSession con) () msg | ||
303 | |||
304 | -- | Retry until: | ||
305 | -- | ||
306 | -- * a packet arrives (with component session ID) arrives. | ||
307 | -- | ||
308 | -- * the 'AggregateSession' is closed with 'closeAll'. | ||
309 | awaitAny :: AggregateSession -> STM (Maybe (Int,CryptoMessage)) | ||
310 | awaitAny c = readTMChan (contactChannel c) | ||
311 | |||
312 | -- | Close all connections associated with the aggregate. No new sessions will | ||
313 | -- be accepted after this, and the notify callback will be informed that we've | ||
314 | -- transitioned to 'Dormant'. | ||
315 | closeAll :: AggregateSession -> IO () | ||
316 | closeAll c = join $ atomically $ do | ||
317 | imap <- readTVar (contactSession c) | ||
318 | closeTMChan (contactChannel c) | ||
319 | return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid | ||
320 | |||
321 | -- | Query the current status of the aggregate, there are three possible | ||
322 | -- values: | ||
323 | -- | ||
324 | -- [ Dormant ] - No pending or established sessions. | ||
325 | -- | ||
326 | -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are | ||
327 | -- fully established. | ||
328 | -- | ||
329 | -- [ Established ] - At least one session is fully established and we can | ||
330 | -- send and receive packets via this aggregate. | ||
331 | -- | ||
332 | aggregateStatus :: AggregateSession -> STM (Status ToxProgress) | ||
333 | aggregateStatus c = do | ||
334 | isclosed <- isClosedTMChan (contactChannel c) | ||
335 | imap <- readTVar (contactSession c) | ||
336 | emap <- readTVar (contactEstablished c) | ||
337 | return $ case () of | ||
338 | _ | isclosed -> Dormant | ||
339 | | not (IntMap.null emap) -> Established | ||
340 | | not (IntMap.null imap) -> InProgress AwaitingSessionPacket | ||
341 | | otherwise -> Dormant | ||
342 | |||
343 | -- | Query whether the supplied ToxID keys are compatible with this aggregate. | ||
344 | -- | ||
345 | -- [ Nothing ] Any keys would be compatible because there is not yet any | ||
346 | -- sessions in progress. | ||
347 | -- | ||
348 | -- [ Just True ] The supplied keys match the session in progress. | ||
349 | -- | ||
350 | -- [ Just False ] The supplied keys are incompatible. | ||
351 | checkCompatible :: PublicKey -- ^ Local Tox key (for which we know the secret). | ||
352 | -> PublicKey -- ^ Remote Tox key. | ||
353 | -> AggregateSession -> STM (Maybe Bool) | ||
354 | checkCompatible me them c = do | ||
355 | isclosed <- isClosedTMChan (contactChannel c) | ||
356 | imap <- readTVar (contactSession c) | ||
357 | return $ case IntMap.elems imap of | ||
358 | _ | isclosed -> Just False -- All keys are incompatible (closed). | ||
359 | con:_ -> Just $ sTheirUserKey (singleSession con) == them | ||
360 | && toPublic (sOurKey $ singleSession con) == me | ||
361 | [] -> Nothing | ||
362 | |||
363 | -- | Returns the local and remote keys that are compatible with this aggregate. | ||
364 | -- If 'Nothing' Is returned, then either no key is compatible ('closeAll' was | ||
365 | -- called) or all keys are compatible because no sessions have been associated. | ||
366 | compatibleKeys :: AggregateSession -> STM (Maybe (PublicKey,PublicKey)) | ||
367 | compatibleKeys c = do | ||
368 | isclosed <- isClosedTMChan (contactChannel c) | ||
369 | imap <- readTVar (contactSession c) | ||
370 | return $ case IntMap.elems imap of | ||
371 | _ | isclosed -> Nothing -- none. | ||
372 | con:_ -> Just ( toPublic (sOurKey $ singleSession con) | ||
373 | , sTheirUserKey (singleSession con)) | ||
374 | [] -> Nothing -- any. | ||