From 55c5a979b3b25e1b7a13b1361c5f9cf1222f1653 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sat, 8 Sep 2018 01:31:56 -0400 Subject: AggregateSession combines related NetCrypto sessions into one --- src/Network/Tox/AggregateSession.hs | 323 ++++++++++++++++++++++++++++++++++++ 1 file changed, 323 insertions(+) create mode 100644 src/Network/Tox/AggregateSession.hs (limited to 'src') diff --git a/src/Network/Tox/AggregateSession.hs b/src/Network/Tox/AggregateSession.hs new file mode 100644 index 00000000..d4497c48 --- /dev/null +++ b/src/Network/Tox/AggregateSession.hs @@ -0,0 +1,323 @@ +-- | This module aggregates all sessions to the same remote Tox contact into a +-- single online/offline presence. This allows multiple lossless links to the +-- same identity at different addresses, or even to the same address. +{-# LANGUAGE CPP #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE PatternSynonyms #-} +module Network.Tox.AggregateSession + ( AggregateSession + , newAggregateSession + , aggregateStatus + , checkCompatible + , AddResult(..) + , addSession + , DelResult(..) + , delSession + , closeAll + , awaitAny + , dispatchMessage + ) where + + +import Control.Concurrent.STM +import Control.Concurrent.STM.TMChan +import Control.Concurrent.Supply +import Control.Monad +import Data.Function +import qualified Data.IntMap.Strict as IntMap + ;import Data.IntMap.Strict (IntMap) +import Data.List +import Data.Time.Clock.POSIX + +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif + +import Connection (Status (..)) +import Crypto.Tox (PublicKey, toPublic) +import Data.Wrapper.PSQInt as PSQ +import DPut +import Network.QueryResponse +import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket, + pattern ONLINE, pattern PING, + pattern PacketRequest) +import Network.Tox.DHT.Transport (key2id) +import Network.Tox.NodeId (ToxProgress (..)) +import Network.Tox.Crypto.Handlers + +type Session = NetCryptoSession + +-- | For each component session, we track the current status. +data SingleCon = SingleCon + { singleSession :: Session -- ^ A component session. + , singleStatus :: TVar (Status ToxProgress) -- ^ Either 'AwaitingSessionPacket' or 'Established'. + } + +-- | A collection of sessions between the same local and remote identities. +data AggregateSession = AggregateSession + { -- | The set of component sessions indexed by their ID. + contactSession :: TVar (IntMap SingleCon) + -- | Each inbound packets is written to this channel with the session ID + -- from which it came originally. + , contactChannel :: TMChan (Int,CryptoMessage) + -- | The set of 'Established' sessions IDs. + , contactEstablished :: TVar (IntMap ()) + -- | Callback for state-change notifications. + , notifyState :: AggregateSession -> Session -> Status ToxProgress -> STM () + } + + +-- | Create a new empty aggregate session. The argument is a callback to +-- receive notifications when the new session changes status. There are three +-- possible status values: +-- +-- [ Dormant ] - No pending or established sessions. +-- +-- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are +-- fully established. +-- +-- [ Established ] - At least one session is fully established and we can +-- send and receive packets via this aggregate. +-- +-- The 'Session' object is provided to the callback so that it can determine the +-- current remote and local identities for this AggregateSession. It may not even +-- be Established, so do not use it to send or receive packets. +newAggregateSession :: (AggregateSession -> Session -> Status ToxProgress -> STM ()) + -> STM AggregateSession +newAggregateSession notify = do + vimap <- newTVar IntMap.empty + chan <- newTMChan + vemap <- newTVar IntMap.empty + return AggregateSession + { contactSession = vimap + , contactChannel = chan + , contactEstablished = vemap + , notifyState = notify + } + +-- | Information returned from 'addSession'. Note that a value other than +-- 'RejectedSession' does not mean there is any 'Established' session in the +-- Aggregate. Sessions are in 'AwaitingSessionPacket' state until a single +-- packet is received from the remote end. +data AddResult = FirstSession -- ^ Initial connection with this contact. + | AddedSession -- ^ Added another connection to active session. + | RejectedSession -- ^ Failed to add session (wrong contact / closed session). + +-- | The 'keepAlive' thread juggles three scheduled tasks. +data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. + | DoAlive -- ^ Send a the keep-alive becon for a session. + | DoRequestMissing -- ^ Detect and request lost packets. + deriving Enum + +-- | This function forks a thread to read all packets from the provided +-- 'Session' and forward them to 'contactChannel' for a containing +-- 'AggregateSession' +forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId +forkSession c s setStatus = forkIO $ do + myThreadId >>= flip labelThread + (intercalate "." ["s" + , take 8 $ show $ key2id $ ncTheirPublicKey s + , show $ sSessionID s]) + tmchan <- atomically $ do + tmchan <- newTMChan + supply <- readTVar (listenerIDSupply $ ncAllSessions s) + let (listenerId,supply') = freshId supply + writeTVar (listenerIDSupply $ ncAllSessions s) supply' + modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan)) + return tmchan + + let sendPacket :: CryptoMessage -> STM () + sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) + + inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e + + onPacket body loop Nothing = return () + onPacket body loop (Just (Left e)) = inPrint e >> loop + onPacket body loop (Just (Right x)) = body loop x + + awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=) + $ onPacket body + + atomically $ setStatus $ InProgress AwaitingSessionPacket + atomically $ setStatus Established + awaitPacket $ \loop x -> do + case msgID x of + KillPacket -> return () + _ -> atomically (sendPacket x) >> loop + + atomically $ setStatus Dormant + + +sSessionID :: Session -> Int +sSessionID s = fromIntegral $ ncSessionId s + +-- | Add a new session (in 'AwaitingSessionPacket' state) to the +-- 'AggregateSession'. If the supplied session is not compatible because it is +-- between the wrong ToxIDs or because the AggregateSession is closed, +-- 'RejectedSession' will be returned. Otherwise, the operation is successful. +-- +-- The status-change callback may be triggered by this call as the aggregate +-- may transition from 'Dormant' (empty) to 'AwaitingSessionPacket' (at least +-- one active session). +addSession :: AggregateSession -> Session -> IO AddResult +addSession c s = do + (result,mcon,replaced) <- atomically $ do + let them = ncTheirPublicKey s + me = ncMyPublicKey s + compat <- checkCompatible me them c + let result = case compat of + Nothing -> FirstSession + Just True -> AddedSession + Just False -> RejectedSession + case result of + RejectedSession -> return (result,Nothing,Nothing) + _ -> do + statvar <- newTVar Dormant + imap <- readTVar (contactSession c) + let con = SingleCon s statvar + s0 = IntMap.lookup (sSessionID s) imap + imap' = IntMap.insert (sSessionID s) con imap + writeTVar (contactSession c) imap' + return (result,Just con,s0) + + mapM_ (destroySession . singleSession) replaced + forM_ mcon $ \con -> + forkSession c s $ \progress -> do + writeTVar (singleStatus con) progress + emap <- readTVar (contactEstablished c) + emap' <- case progress of + Established -> do + when (IntMap.null emap) $ notifyState c c s Established + return $ IntMap.insert (sSessionID s) () emap + _ -> do + let emap' = IntMap.delete (sSessionID s) emap + when (IntMap.null emap' && not (IntMap.null emap)) $ do + imap <- readTVar (contactSession c) + notifyState c c s + $ if IntMap.null imap then Dormant + else InProgress AwaitingSessionPacket + return emap' + writeTVar (contactEstablished c) emap' + return result +-- | Information returned from 'delSession'. +data DelResult = NoSession -- ^ Contact is completely disconnected. + | DeletedSession -- ^ Connection removed but session remains active. + +-- | Close and remove the componenent session corresponding to the provided +-- Session ID. +-- +-- The status-change callback may be triggered as the aggregate may may +-- transition to 'Dormant' (empty) or 'AwaitingSessionPacket' (if the last +-- 'Established' session is closed). +delSession :: AggregateSession -> Int -> IO DelResult +delSession c sid = do + (con, r) <- atomically $ do + imap <- readTVar (contactSession c) + emap <- readTVar (contactEstablished c) + let emap' = IntMap.delete sid emap + imap' = IntMap.delete sid imap + case IntMap.toList emap of + (sid0,_):_ | IntMap.null emap' + , let s = singleSession $ imap IntMap.! sid0 + -> notifyState c c s + $ if IntMap.null imap' then Dormant + else InProgress AwaitingSessionPacket + _ -> return () + writeTVar (contactSession c) imap' + writeTVar (contactEstablished c) emap' + return ( IntMap.lookup sid imap, IntMap.null imap') + mapM_ (destroySession . singleSession) con + return $ if r then NoSession + else DeletedSession + + +-- | Send a packet to one or all of the component sessions in the aggregate. +dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. + -> CryptoMessage -> IO () +dispatchMessage c msid msg = join $ atomically $ do + imap <- readTVar (contactSession c) + let go = case msid of Nothing -> forM_ imap + Just sid -> forM_ (IntMap.lookup sid imap) + return $ go $ \con -> do + outq <- atomically $ do + mbOutq <- readTVar (ncOutgoingQueue $ singleSession con) + case mbOutq of + HaveHandshake outq -> return outq + NeedHandshake -> retry + extra <- nqToWireIO outq + r <- atomically $ do + rTry <- tryAppendQueueOutgoing extra outq msg + case rTry of + OGFull -> retry + OGSuccess x -> return (OGSuccess x) + OGEncodeFail -> return OGEncodeFail + case r of + OGSuccess x -> case ncSockAddr (singleSession con) of + HaveDHTKey saddr -> sendSessionPacket (ncAllSessions $ singleSession con) saddr x + _ -> return () + OGEncodeFail -> dput XMisc ("FAILURE to Encode Outgoing: " ++ show msg) + _ -> return () + + +-- | Retry until: +-- +-- * a packet arrives (with component session ID) arrives. +-- +-- * the 'AggregateSession' is closed with 'closeAll'. +awaitAny :: AggregateSession -> STM (Maybe (Int,CryptoMessage)) +awaitAny c = readTMChan (contactChannel c) + +-- | Close all connections associated with the aggregate. No new sessions will +-- be accempted after this, and the notify callback will be informed that we've +-- transitioned to 'Dormant'. +closeAll :: AggregateSession -> IO () +closeAll c = join $ atomically $ do + imap <- readTVar (contactSession c) + closeTMChan (contactChannel c) + return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid + +-- | Query the current status of the aggregate, there are three possible +-- values: +-- +-- [ Dormant ] - No pending or established sessions. +-- +-- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are +-- fully established. +-- +-- [ Established ] - At least one session is fully established and we can +-- send and receive packets via this aggregate. +-- +aggregateStatus :: AggregateSession -> STM (Status ToxProgress) +aggregateStatus c = do + isclosed <- isClosedTMChan (contactChannel c) + imap <- readTVar (contactSession c) + emap <- readTVar (contactEstablished c) + return $ case () of + _ | isclosed -> Dormant + | not (IntMap.null emap) -> Established + | not (IntMap.null imap) -> InProgress AwaitingSessionPacket + | otherwise -> Dormant + + +-- | Query whether the supplied ToxID keys are compatible with this aggregate. +-- +-- [ Nothing ] Any keys would be compatible because there is not yet any +-- sessions in progress. +-- +-- [ Just True ] The supplied keys match the session in progress. +-- +-- [ Just False ] The supplied keys are incompatible. +checkCompatible :: PublicKey -- ^ Local Tox key (for which we know the secret). + -> PublicKey -- ^ Remote Tox key. + -> AggregateSession -> STM (Maybe Bool) +checkCompatible me them c = do + isclosed <- isClosedTMChan (contactChannel c) + imap <- readTVar (contactSession c) + return $ case IntMap.elems imap of + _ | isclosed -> Just False -- All keys are incompatible (closed). + con:_ -> Just $ ncTheirPublicKey (singleSession con) == them + && (ncMyPublicKey $ singleSession con) == me + [] -> Nothing -- cgit v1.2.3