-- | This module aggregates all sessions to the same remote Tox contact into a -- single online/offline presence. This allows multiple lossless links to the -- same identity at different addresses, or even to the same address. {-# LANGUAGE CPP #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PatternSynonyms #-} module Network.Tox.AggregateSession ( AggregateSession(contactSession) , SingleCon(singleSession) , newAggregateSession , aggregateStatus , checkCompatible , compatibleKeys , AddResult(..) , addSession , DelResult(..) , delSession , closeAll , awaitAny , dispatchMessage ) where import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Control.Exception import Control.Monad import Data.Dependent.Sum import Data.Function import Data.Functor import qualified Data.IntMap.Strict as IntMap ;import Data.IntMap.Strict (IntMap) import Data.List import Data.Maybe import Data.Time.Clock.POSIX import System.IO.Error #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Connection (Status (..)) import Crypto.Tox (PublicKey, toPublic) import Data.Tox.Msg import Data.Wrapper.PSQInt as PSQ import DPut import DebugTag import Network.QueryResponse import Network.Tox.Crypto.Transport import Network.Tox.DHT.Transport (key2id) import Network.Tox.NodeId (ToxProgress (..)) import Network.Tox.Session -- | For each component session, we track the current status. data SingleCon = SingleCon { singleSession :: Session -- ^ A component session. , singleStatus :: TVar (Status ToxProgress) -- ^ Either 'AwaitingSessionPacket' or 'Established'. } -- | A collection of sessions between the same local and remote identities. data AggregateSession = AggregateSession { -- | The set of component sessions indexed by their ID. contactSession :: TVar (IntMap SingleCon) -- | Each inbound packets is written to this channel with the session ID -- from which it came originally. , contactChannel :: TMChan (Int,CryptoMessage) -- | The set of 'Established' sessions IDs. , contactEstablished :: TVar (IntMap ()) -- | Callback for state-change notifications. , notifyState :: AggregateSession -> Session -> Status ToxProgress -> STM () } -- | Create a new empty aggregate session. The argument is a callback to -- receive notifications when the new session changes status. There are three -- possible status values: -- -- [ Dormant ] - No pending or established sessions. -- -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are -- fully established. -- -- [ Established ] - At least one session is fully established and we can -- send and receive packets via this aggregate. -- -- The 'Session' object is provided to the callback so that it can determine the -- current remote and local identities for this AggregateSession. It may not even -- be Established, so do not use it to send or receive packets. newAggregateSession :: (AggregateSession -> Session -> Status ToxProgress -> STM ()) -> STM AggregateSession newAggregateSession notify = do vimap <- newTVar IntMap.empty chan <- newTMChan vemap <- newTVar IntMap.empty return AggregateSession { contactSession = vimap , contactChannel = chan , contactEstablished = vemap , notifyState = notify } -- | Information returned from 'addSession'. Note that a value other than -- 'RejectedSession' does not mean there is any 'Established' session in the -- Aggregate. Sessions are in 'AwaitingSessionPacket' state until a single -- packet is received from the remote end. data AddResult = FirstSession -- ^ Initial connection with this contact. | AddedSession -- ^ Added another connection to active session. | RejectedSession -- ^ Failed to add session (wrong contact / closed session). deriving (Eq,Show) -- | The 'keepAlive' thread juggles three scheduled tasks. data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | DoAlive -- ^ Send a the keep-alive becon for a session. | DoRequestMissing -- ^ Detect and request lost packets. deriving Enum -- | This call loops until the provided session is closed or times out. It -- monitors the provided (non-empty) priority queue for scheduled tasks (see -- 'KeepAliveEvents') to perform for the connection. keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () keepAlive s q = do myThreadId >>= flip labelThread (intercalate "." ["beacon" , take 8 $ show $ key2id $ sTheirUserKey s , show $ sSessionID s]) let -- outPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e unexpected e = dput XUnexpected $ shows (sSessionID s,sTheirAddr s) $ " <-- " ++ e doAlive = do -- outPrint $ "Beacon" sendMessage (sTransport s) () (Pkt ALIVE ==> ()) doRequestMissing = do (ns,nmin) <- sMissingInbound s -- outPrint $ "PacketRequest " ++ show (nmin,ns) sendMessage (sTransport s) () (Pkt PacketRequest ==> MissingPackets ns) `catchIOError` \e -> do unexpected $ "PacketRequest " ++ take 200 (show (nmin,length ns,ns)) unexpected $ "PacketRequest: " ++ show e -- Quit thread by scheduling a timeout event. now <- getPOSIXTime atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now re tm e io = do io atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm doEvent again now e = case e of DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) sClose s DoAlive -> re (now + 10) e doAlive >> again DoRequestMissing -> re (now + 5{- toxcore uses 1sec -}) e doRequestMissing >> again fix $ \again -> do now <- getPOSIXTime join $ atomically $ do PSQ.findMin <$> readTVar q >>= \case Nothing -> return $ do dput XUnexpected "keepAlive: unexpected empty PSQ." sClose s Just ( k :-> tm ) -> return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again else doEvent again now (toEnum k) -- | This function forks two threads: the 'keepAlive' beacon-sending thread and -- a thread to read all packets from the provided 'Session' and forward them to -- 'contactChannel' for a containing 'AggregateSession' forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId forkSession c s setStatus = forkIO $ do myThreadId >>= flip labelThread (intercalate "." ["s" , take 8 $ show $ key2id $ sTheirUserKey s , show $ sSessionID s]) q <- atomically $ newTVar $ fromList [ fromEnum DoAlive :-> 0 , fromEnum DoRequestMissing :-> 0 ] let sendPacket :: CryptoMessage -> STM () sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) inPrint e = dput XNetCrypto $ shows (sSessionID s,sTheirAddr s) $ " --> " ++ e bump = do -- inPrint $ "BUMP: " ++ show (sSessionID s) now <- getPOSIXTime atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) (now + 15) onPacket body loop Terminated = return () onPacket body loop (ParseError e) = inPrint e >> loop onPacket body loop (Arrival _ x) = body loop x awaitPacket body = fix $ \loop -> do (m,io) <- atomically $ awaitMessage (sTransport s) io onPacket body loop m atomically $ setStatus $ InProgress AwaitingSessionPacket awaitPacket $ \_ online -> do when (msgID online /= M ONLINE) $ do inPrint $ "Unexpected initial packet: " ++ show (msgID online) atomically $ do setStatus Established sendPacket online bump beacon <- forkIO $ keepAlive s q `finally` sClose s awaitPacket $ \awaitNext x -> do bump case msgID x of M ALIVE -> return () M KillPacket -> sClose s _ -> atomically $ sendPacket x awaitNext atomically $ setStatus Dormant killThread beacon sessionIsPreferredTo :: Session -> Session -> Bool sessionIsPreferredTo s t = True -- TODO: Check address types. -- | Add a new session (in 'AwaitingSessionPacket' state) to the -- 'AggregateSession'. If the supplied session is not compatible because it is -- between the wrong ToxIDs or because the AggregateSession is closed, -- 'RejectedSession' will be returned. Otherwise, the operation is successful. -- -- The status-change callback may be triggered by this call as the aggregate -- may transition from 'Dormant' (empty) to 'AwaitingSessionPacket' (at least -- one active session). addSession :: AggregateSession -> Session -> IO AddResult addSession c s = do (result,mcon,rejected,closed) <- atomically $ do let them = sTheirUserKey s me = toPublic $ sOurKey s result <- checkCompatible me them c <&> \case Nothing -> FirstSession Just True -> AddedSession Just False -> RejectedSession case result of RejectedSession -> return (result,Nothing,Just s,Nothing) _ -> do statvar <- newTVar Dormant imap <- readTVar (contactSession c) let nodekey = sTheirDHTKey s ts = [ t | SingleCon t _ <- IntMap.elems imap, nodekey == sTheirDHTKey t ] case ts of [] -> do let con = SingleCon s statvar s0 = IntMap.lookup (sSessionID s) imap imap' = IntMap.insert (sSessionID s) con imap writeTVar (contactSession c) imap' return (result,Just con,singleSession <$> s0, Nothing) (t:_) | s `sessionIsPreferredTo` t -> do let con = SingleCon s statvar s0 = IntMap.lookup (sSessionID s) imap imap' = IntMap.insert (sSessionID s) con imap writeTVar (contactSession c) imap' return (result,Just con,singleSession <$> s0, Just (sSessionID t)) (t:_) -> do error "TODO: Prefer older session (udp over tcp for example)" mapM_ sClose rejected when (isNothing mcon) $ dput XMan "addSession: Rejected session!" forM_ (mcon :: Maybe SingleCon) $ \con -> do dput XMan $ "addSession: forkSession! " ++ show result forkSession c s $ \progress -> do status0 <- aggregateStatus c writeTVar (singleStatus con) progress let sid = sSessionID s modifyTVar' (contactEstablished c) $ case progress of Established -> IntMap.insert sid () _ -> IntMap.delete sid when (progress == Dormant) $ modifyTVar' (contactSession c) (IntMap.delete sid) status <- aggregateStatus c when (status /= status0) $ notifyState c c s status mapM_ (delSession c) closed return result -- | Information returned from 'delSession'. data DelResult = NoSession -- ^ Contact is completely disconnected. | DeletedSession -- ^ Connection removed but session remains active. -- | Close and remove the componenent session corresponding to the provided -- Session ID. -- -- The status-change callback may be triggered as the aggregate may may -- transition to 'Dormant' (empty) or 'AwaitingSessionPacket' (if the last -- 'Established' session is closed). delSession :: AggregateSession -> Int -> IO DelResult delSession c sid = do (con, r) <- atomically $ do imap <- readTVar (contactSession c) emap <- readTVar (contactEstablished c) let emap' = IntMap.delete sid emap imap' = IntMap.delete sid imap case IntMap.toList emap of (sid0,_):_ | IntMap.null emap' , let s = singleSession $ imap IntMap.! sid0 -> notifyState c c s $ if IntMap.null imap' then Dormant else InProgress AwaitingSessionPacket _ -> return () writeTVar (contactSession c) imap' writeTVar (contactEstablished c) emap' return ( IntMap.lookup sid imap, IntMap.null imap') mapM_ (sClose . singleSession) con return $ if r then NoSession else DeletedSession -- | Send a packet to one or all of the component sessions in the aggregate. dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. -> CryptoMessage -> IO () dispatchMessage c msid msg = join $ atomically $ do imap <- readTVar (contactSession c) let go = case msid of Nothing -> forM_ imap Just sid -> forM_ (IntMap.lookup sid imap) return $ go $ \con -> sendMessage (sTransport $ singleSession con) () msg -- | Retry until: -- -- * a packet arrives (with component session ID) arrives. -- -- * the 'AggregateSession' is closed with 'closeAll'. awaitAny :: AggregateSession -> STM (Maybe (Int,CryptoMessage)) awaitAny c = readTMChan (contactChannel c) -- | Close all connections associated with the aggregate. No new sessions will -- be accepted after this, and the notify callback will be informed that we've -- transitioned to 'Dormant'. closeAll :: AggregateSession -> IO () closeAll c = join $ atomically $ do imap <- readTVar (contactSession c) closeTMChan (contactChannel c) forM_ (listToMaybe $ IntMap.elems imap) $ \(SingleCon s _) -> do notifyState c c s Dormant return $ forM_ (IntMap.toList imap) $ \(sid,SingleCon s _) -> do sClose s delSession c sid -- | Query the current status of the aggregate, there are three possible -- values: -- -- [ Dormant ] - No pending or established sessions. -- -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are -- fully established. -- -- [ Established ] - At least one session is fully established and we can -- send and receive packets via this aggregate. -- aggregateStatus :: AggregateSession -> STM (Status ToxProgress) aggregateStatus c = do isclosed <- isClosedTMChan (contactChannel c) imap <- readTVar (contactSession c) emap <- readTVar (contactEstablished c) return $ case () of _ | isclosed -> Dormant | not (IntMap.null emap) -> Established | not (IntMap.null imap) -> InProgress AwaitingSessionPacket | otherwise -> Dormant -- | Query whether the supplied ToxID keys are compatible with this aggregate. -- -- [ Nothing ] Any keys would be compatible because there is not yet any -- sessions in progress. -- -- [ Just True ] The supplied keys match the session in progress. -- -- [ Just False ] The supplied keys are incompatible. checkCompatible :: PublicKey -- ^ Local Tox key (for which we know the secret). -> PublicKey -- ^ Remote Tox key. -> AggregateSession -> STM (Maybe Bool) checkCompatible me them c = do isclosed <- isClosedTMChan (contactChannel c) imap <- readTVar (contactSession c) return $ case IntMap.elems imap of _ | isclosed -> Just False -- All keys are incompatible (closed). con:_ -> Just $ sTheirUserKey (singleSession con) == them && toPublic (sOurKey $ singleSession con) == me [] -> Nothing -- | Returns the local and remote keys that are compatible with this aggregate. -- If 'Nothing' Is returned, then either no key is compatible ('closeAll' was -- called) or all keys are compatible because no sessions have been associated. compatibleKeys :: AggregateSession -> STM (Maybe (PublicKey,PublicKey)) compatibleKeys c = do isclosed <- isClosedTMChan (contactChannel c) imap <- readTVar (contactSession c) return $ case IntMap.elems imap of _ | isclosed -> Nothing -- none. con:_ -> Just ( toPublic (sOurKey $ singleSession con) , sTheirUserKey (singleSession con)) [] -> Nothing -- any.