summaryrefslogtreecommitdiff
path: root/dht/src/Network/Tox/AggregateSession.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/src/Network/Tox/AggregateSession.hs
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/src/Network/Tox/AggregateSession.hs')
-rw-r--r--dht/src/Network/Tox/AggregateSession.hs374
1 files changed, 374 insertions, 0 deletions
diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs
new file mode 100644
index 00000000..8c728660
--- /dev/null
+++ b/dht/src/Network/Tox/AggregateSession.hs
@@ -0,0 +1,374 @@
1-- | This module aggregates all sessions to the same remote Tox contact into a
2-- single online/offline presence. This allows multiple lossless links to the
3-- same identity at different addresses, or even to the same address.
4{-# LANGUAGE CPP #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE PatternSynonyms #-}
8module Network.Tox.AggregateSession
9 ( AggregateSession
10 , newAggregateSession
11 , aggregateStatus
12 , checkCompatible
13 , compatibleKeys
14 , AddResult(..)
15 , addSession
16 , DelResult(..)
17 , delSession
18 , closeAll
19 , awaitAny
20 , dispatchMessage
21 ) where
22
23
24import Control.Concurrent.STM
25import Control.Concurrent.STM.TMChan
26import Control.Monad
27import Data.Dependent.Sum
28import Data.Function
29import qualified Data.IntMap.Strict as IntMap
30 ;import Data.IntMap.Strict (IntMap)
31import Data.List
32import Data.Time.Clock.POSIX
33import System.IO.Error
34
35#ifdef THREAD_DEBUG
36import Control.Concurrent.Lifted.Instrument
37#else
38import Control.Concurrent.Lifted
39import GHC.Conc (labelThread)
40#endif
41
42import Connection (Status (..))
43import Crypto.Tox (PublicKey, toPublic)
44import Data.Tox.Msg
45import Data.Wrapper.PSQInt as PSQ
46import DPut
47import DebugTag
48import Network.QueryResponse
49import Network.Tox.Crypto.Transport
50import Network.Tox.DHT.Transport (key2id)
51import Network.Tox.NodeId (ToxProgress (..))
52import Network.Tox.Session
53
54-- | For each component session, we track the current status.
55data SingleCon = SingleCon
56 { singleSession :: Session -- ^ A component session.
57 , singleStatus :: TVar (Status ToxProgress) -- ^ Either 'AwaitingSessionPacket' or 'Established'.
58 }
59
60-- | A collection of sessions between the same local and remote identities.
61data AggregateSession = AggregateSession
62 { -- | The set of component sessions indexed by their ID.
63 contactSession :: TVar (IntMap SingleCon)
64 -- | Each inbound packets is written to this channel with the session ID
65 -- from which it came originally.
66 , contactChannel :: TMChan (Int,CryptoMessage)
67 -- | The set of 'Established' sessions IDs.
68 , contactEstablished :: TVar (IntMap ())
69 -- | Callback for state-change notifications.
70 , notifyState :: AggregateSession -> Session -> Status ToxProgress -> STM ()
71 }
72
73
74-- | Create a new empty aggregate session. The argument is a callback to
75-- receive notifications when the new session changes status. There are three
76-- possible status values:
77--
78-- [ Dormant ] - No pending or established sessions.
79--
80-- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are
81-- fully established.
82--
83-- [ Established ] - At least one session is fully established and we can
84-- send and receive packets via this aggregate.
85--
86-- The 'Session' object is provided to the callback so that it can determine the
87-- current remote and local identities for this AggregateSession. It may not even
88-- be Established, so do not use it to send or receive packets.
89newAggregateSession :: (AggregateSession -> Session -> Status ToxProgress -> STM ())
90 -> STM AggregateSession
91newAggregateSession notify = do
92 vimap <- newTVar IntMap.empty
93 chan <- newTMChan
94 vemap <- newTVar IntMap.empty
95 return AggregateSession
96 { contactSession = vimap
97 , contactChannel = chan
98 , contactEstablished = vemap
99 , notifyState = notify
100 }
101
102-- | Information returned from 'addSession'. Note that a value other than
103-- 'RejectedSession' does not mean there is any 'Established' session in the
104-- Aggregate. Sessions are in 'AwaitingSessionPacket' state until a single
105-- packet is received from the remote end.
106data AddResult = FirstSession -- ^ Initial connection with this contact.
107 | AddedSession -- ^ Added another connection to active session.
108 | RejectedSession -- ^ Failed to add session (wrong contact / closed session).
109
110-- | The 'keepAlive' thread juggles three scheduled tasks.
111data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it.
112 | DoAlive -- ^ Send a the keep-alive becon for a session.
113 | DoRequestMissing -- ^ Detect and request lost packets.
114 deriving Enum
115
116-- | This call loops until the provided sesison is closed or times out. It
117-- monitors the provided (non-empty) priority queue for scheduled tasks (see
118-- 'KeepAliveEvents') to perform for the connection.
119keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO ()
120keepAlive s q = do
121 myThreadId >>= flip labelThread
122 (intercalate "." ["beacon"
123 , take 8 $ show $ key2id $ sTheirUserKey s
124 , show $ sSessionID s])
125
126 let -- outPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e
127 unexpected e = dput XUnexpected $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e
128
129 doAlive = do
130 -- outPrint $ "Beacon"
131 sendMessage (sTransport s) () (Pkt ALIVE ==> ())
132
133 doRequestMissing = do
134 (ns,nmin) <- sMissingInbound s
135 -- outPrint $ "PacketRequest " ++ show (nmin,ns)
136 sendMessage (sTransport s) () (Pkt PacketRequest ==> MissingPackets ns)
137 `catchIOError` \e -> do
138 unexpected $ "PacketRequest " ++ take 200 (show (nmin,length ns,ns))
139 unexpected $ "PacketRequest: " ++ show e
140 -- Quit thread by scheduling a timeout event.
141 now <- getPOSIXTime
142 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now
143
144 re tm again e io = do
145 io
146 atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm
147 again
148
149 doEvent again now e = case e of
150 DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s)
151 sClose s
152 DoAlive -> re (now + 10) again e doAlive
153 DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals
154
155 fix $ \again -> do
156
157 now <- getPOSIXTime
158 join $ atomically $ do
159 PSQ.findMin <$> readTVar q >>= \case
160 Nothing -> error "keepAlive: unexpected empty PSQ."
161 Just ( k :-> tm ) ->
162 return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again
163 else doEvent again now (toEnum k)
164
165
166-- | This function forks two threads: the 'keepAlive' beacon-sending thread and
167-- a thread to read all packets from the provided 'Session' and forward them to
168-- 'contactChannel' for a containing 'AggregateSession'
169forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId
170forkSession c s setStatus = forkIO $ do
171 myThreadId >>= flip labelThread
172 (intercalate "." ["s"
173 , take 8 $ show $ key2id $ sTheirUserKey s
174 , show $ sSessionID s])
175
176 q <- atomically $ newTVar $ fromList
177 [ fromEnum DoAlive :-> 0
178 , fromEnum DoRequestMissing :-> 0
179 ]
180
181 let sendPacket :: CryptoMessage -> STM ()
182 sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg)
183
184 inPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " --> " ++ e
185
186 bump = do
187 -- inPrint $ "BUMP: " ++ show (sSessionID s)
188 now <- getPOSIXTime
189 atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15)
190
191 onPacket body loop Nothing = return ()
192 onPacket body loop (Just (Left e)) = inPrint e >> loop
193 onPacket body loop (Just (Right x)) = body loop x
194
195 awaitPacket body = fix $ awaitMessage (sTransport s) . onPacket body
196
197 atomically $ setStatus $ InProgress AwaitingSessionPacket
198 awaitPacket $ \_ (online,()) -> do
199 when (msgID online /= M ONLINE) $ do
200 inPrint $ "Unexpected initial packet: " ++ show (msgID online)
201 atomically $ do setStatus Established
202 sendPacket online
203 bump
204 beacon <- forkIO $ keepAlive s q
205 awaitPacket $ \awaitNext (x,()) -> do
206 bump
207 case msgID x of
208 M ALIVE -> return ()
209 M KillPacket -> sClose s
210 _ -> atomically $ sendPacket x
211 awaitNext
212 atomically $ setStatus Dormant
213 killThread beacon
214
215-- | Add a new session (in 'AwaitingSessionPacket' state) to the
216-- 'AggregateSession'. If the supplied session is not compatible because it is
217-- between the wrong ToxIDs or because the AggregateSession is closed,
218-- 'RejectedSession' will be returned. Otherwise, the operation is successful.
219--
220-- The status-change callback may be triggered by this call as the aggregate
221-- may transition from 'Dormant' (empty) to 'AwaitingSessionPacket' (at least
222-- one active session).
223addSession :: AggregateSession -> Session -> IO AddResult
224addSession c s = do
225 (result,mcon,replaced) <- atomically $ do
226 let them = sTheirUserKey s
227 me = toPublic $ sOurKey s
228 compat <- checkCompatible me them c
229 let result = case compat of
230 Nothing -> FirstSession
231 Just True -> AddedSession
232 Just False -> RejectedSession
233 case result of
234 RejectedSession -> return (result,Nothing,Nothing)
235 _ -> do
236 statvar <- newTVar Dormant
237 imap <- readTVar (contactSession c)
238 let con = SingleCon s statvar
239 s0 = IntMap.lookup (sSessionID s) imap
240 imap' = IntMap.insert (sSessionID s) con imap
241 writeTVar (contactSession c) imap'
242 return (result,Just con,s0)
243
244 mapM_ (sClose . singleSession) replaced
245 forM_ mcon $ \con ->
246 forkSession c s $ \progress -> do
247 writeTVar (singleStatus con) progress
248 emap <- readTVar (contactEstablished c)
249 emap' <- case progress of
250 Established -> do
251 when (IntMap.null emap) $ notifyState c c s Established
252 return $ IntMap.insert (sSessionID s) () emap
253 _ -> do
254 let emap' = IntMap.delete (sSessionID s) emap
255 when (IntMap.null emap' && not (IntMap.null emap)) $ do
256 imap <- readTVar (contactSession c)
257 notifyState c c s
258 $ if IntMap.null imap then Dormant
259 else InProgress AwaitingSessionPacket
260 return emap'
261 writeTVar (contactEstablished c) emap'
262 return result
263
264-- | Information returned from 'delSession'.
265data DelResult = NoSession -- ^ Contact is completely disconnected.
266 | DeletedSession -- ^ Connection removed but session remains active.
267
268-- | Close and remove the componenent session corresponding to the provided
269-- Session ID.
270--
271-- The status-change callback may be triggered as the aggregate may may
272-- transition to 'Dormant' (empty) or 'AwaitingSessionPacket' (if the last
273-- 'Established' session is closed).
274delSession :: AggregateSession -> Int -> IO DelResult
275delSession c sid = do
276 (con, r) <- atomically $ do
277 imap <- readTVar (contactSession c)
278 emap <- readTVar (contactEstablished c)
279 let emap' = IntMap.delete sid emap
280 imap' = IntMap.delete sid imap
281 case IntMap.toList emap of
282 (sid0,_):_ | IntMap.null emap'
283 , let s = singleSession $ imap IntMap.! sid0
284 -> notifyState c c s
285 $ if IntMap.null imap' then Dormant
286 else InProgress AwaitingSessionPacket
287 _ -> return ()
288 writeTVar (contactSession c) imap'
289 writeTVar (contactEstablished c) emap'
290 return ( IntMap.lookup sid imap, IntMap.null imap')
291 mapM_ (sClose . singleSession) con
292 return $ if r then NoSession
293 else DeletedSession
294
295-- | Send a packet to one or all of the component sessions in the aggregate.
296dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID.
297 -> CryptoMessage -> IO ()
298dispatchMessage c msid msg = join $ atomically $ do
299 imap <- readTVar (contactSession c)
300 let go = case msid of Nothing -> forM_ imap
301 Just sid -> forM_ (IntMap.lookup sid imap)
302 return $ go $ \con -> sendMessage (sTransport $ singleSession con) () msg
303
304-- | Retry until:
305--
306-- * a packet arrives (with component session ID) arrives.
307--
308-- * the 'AggregateSession' is closed with 'closeAll'.
309awaitAny :: AggregateSession -> STM (Maybe (Int,CryptoMessage))
310awaitAny c = readTMChan (contactChannel c)
311
312-- | Close all connections associated with the aggregate. No new sessions will
313-- be accepted after this, and the notify callback will be informed that we've
314-- transitioned to 'Dormant'.
315closeAll :: AggregateSession -> IO ()
316closeAll c = join $ atomically $ do
317 imap <- readTVar (contactSession c)
318 closeTMChan (contactChannel c)
319 return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid
320
321-- | Query the current status of the aggregate, there are three possible
322-- values:
323--
324-- [ Dormant ] - No pending or established sessions.
325--
326-- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are
327-- fully established.
328--
329-- [ Established ] - At least one session is fully established and we can
330-- send and receive packets via this aggregate.
331--
332aggregateStatus :: AggregateSession -> STM (Status ToxProgress)
333aggregateStatus c = do
334 isclosed <- isClosedTMChan (contactChannel c)
335 imap <- readTVar (contactSession c)
336 emap <- readTVar (contactEstablished c)
337 return $ case () of
338 _ | isclosed -> Dormant
339 | not (IntMap.null emap) -> Established
340 | not (IntMap.null imap) -> InProgress AwaitingSessionPacket
341 | otherwise -> Dormant
342
343-- | Query whether the supplied ToxID keys are compatible with this aggregate.
344--
345-- [ Nothing ] Any keys would be compatible because there is not yet any
346-- sessions in progress.
347--
348-- [ Just True ] The supplied keys match the session in progress.
349--
350-- [ Just False ] The supplied keys are incompatible.
351checkCompatible :: PublicKey -- ^ Local Tox key (for which we know the secret).
352 -> PublicKey -- ^ Remote Tox key.
353 -> AggregateSession -> STM (Maybe Bool)
354checkCompatible me them c = do
355 isclosed <- isClosedTMChan (contactChannel c)
356 imap <- readTVar (contactSession c)
357 return $ case IntMap.elems imap of
358 _ | isclosed -> Just False -- All keys are incompatible (closed).
359 con:_ -> Just $ sTheirUserKey (singleSession con) == them
360 && toPublic (sOurKey $ singleSession con) == me
361 [] -> Nothing
362
363-- | Returns the local and remote keys that are compatible with this aggregate.
364-- If 'Nothing' Is returned, then either no key is compatible ('closeAll' was
365-- called) or all keys are compatible because no sessions have been associated.
366compatibleKeys :: AggregateSession -> STM (Maybe (PublicKey,PublicKey))
367compatibleKeys c = do
368 isclosed <- isClosedTMChan (contactChannel c)
369 imap <- readTVar (contactSession c)
370 return $ case IntMap.elems imap of
371 _ | isclosed -> Nothing -- none.
372 con:_ -> Just ( toPublic (sOurKey $ singleSession con)
373 , sTheirUserKey (singleSession con))
374 [] -> Nothing -- any.