-- | This module aggregates all sessions to the same remote Tox contact into a -- single online/offline presence. This allows multiple lossless links to the -- same identity at different addresses, or even to the same address. {-# LANGUAGE CPP #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PatternSynonyms #-} module Network.Tox.AggregateSession ( AggregateSession , newAggregateSession , aggregateStatus , checkCompatible , compatibleKeys , AddResult(..) , addSession , DelResult(..) , delSession , closeAll , awaitAny , dispatchMessage ) where import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Control.Concurrent.Supply import Control.Monad import Data.Function import qualified Data.IntMap.Strict as IntMap ;import Data.IntMap.Strict (IntMap) import Data.List import Data.Time.Clock.POSIX #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Connection (Status (..)) import Crypto.Tox (PublicKey, toPublic) import Data.Wrapper.PSQInt as PSQ import DPut import Network.QueryResponse import Network.Tox.Crypto.Transport (CryptoMessage (..), pattern KillPacket, pattern ONLINE, pattern PING, pattern PacketRequest) import Network.Tox.DHT.Transport (key2id) import Network.Tox.NodeId (ToxProgress (..)) import Network.Tox.Crypto.Handlers type Session = NetCryptoSession -- | For each component session, we track the current status. data SingleCon = SingleCon { singleSession :: Session -- ^ A component session. , singleStatus :: TVar (Status ToxProgress) -- ^ Either 'AwaitingSessionPacket' or 'Established'. } -- | A collection of sessions between the same local and remote identities. data AggregateSession = AggregateSession { -- | The set of component sessions indexed by their ID. contactSession :: TVar (IntMap SingleCon) -- | Each inbound packets is written to this channel with the session ID -- from which it came originally. , contactChannel :: TMChan (Int,CryptoMessage) -- | The set of 'Established' sessions IDs. , contactEstablished :: TVar (IntMap ()) -- | Callback for state-change notifications. , notifyState :: AggregateSession -> Session -> Status ToxProgress -> STM () } -- | Create a new empty aggregate session. The argument is a callback to -- receive notifications when the new session changes status. There are three -- possible status values: -- -- [ Dormant ] - No pending or established sessions. -- -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are -- fully established. -- -- [ Established ] - At least one session is fully established and we can -- send and receive packets via this aggregate. -- -- The 'Session' object is provided to the callback so that it can determine the -- current remote and local identities for this AggregateSession. It may not even -- be Established, so do not use it to send or receive packets. newAggregateSession :: (AggregateSession -> Session -> Status ToxProgress -> STM ()) -> STM AggregateSession newAggregateSession notify = do vimap <- newTVar IntMap.empty chan <- newTMChan vemap <- newTVar IntMap.empty return AggregateSession { contactSession = vimap , contactChannel = chan , contactEstablished = vemap , notifyState = notify } -- | Information returned from 'addSession'. Note that a value other than -- 'RejectedSession' does not mean there is any 'Established' session in the -- Aggregate. Sessions are in 'AwaitingSessionPacket' state until a single -- packet is received from the remote end. data AddResult = FirstSession -- ^ Initial connection with this contact. | AddedSession -- ^ Added another connection to active session. | RejectedSession -- ^ Failed to add session (wrong contact / closed session). -- | The 'keepAlive' thread juggles three scheduled tasks. data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | DoAlive -- ^ Send a the keep-alive becon for a session. | DoRequestMissing -- ^ Detect and request lost packets. deriving Enum -- | This function forks a thread to read all packets from the provided -- 'Session' and forward them to 'contactChannel' for a containing -- 'AggregateSession' forkSession :: AggregateSession -> Session -> (Status ToxProgress -> STM ()) -> IO ThreadId forkSession c s setStatus = forkIO $ do myThreadId >>= flip labelThread (intercalate "." ["s" , take 8 $ show $ key2id $ ncTheirPublicKey s , show $ sSessionID s]) tmchan <- atomically $ do tmchan <- newTMChan supply <- readTVar (listenerIDSupply $ ncAllSessions s) let (listenerId,supply') = freshId supply writeTVar (listenerIDSupply $ ncAllSessions s) supply' modifyTVar' (ncListeners s) (IntMap.insert listenerId (0,tmchan)) return tmchan let sendPacket :: CryptoMessage -> STM () sendPacket msg = writeTMChan (contactChannel c) (sSessionID s, msg) inPrint e = dput XNetCrypto $ shows (sSessionID s,ncSockAddr s) $ " --> " ++ e onPacket body loop Nothing = return () onPacket body loop (Just (Left e)) = inPrint e >> loop onPacket body loop (Just (Right x)) = body loop x awaitPacket body = fix $ (.) (fmap Right <$> atomically (readTMChan tmchan) >>=) $ onPacket body atomically $ setStatus $ InProgress AwaitingSessionPacket atomically $ setStatus Established awaitPacket $ \loop x -> do case msgID x of KillPacket -> return () _ -> atomically (sendPacket x) >> loop atomically $ setStatus Dormant sSessionID :: Session -> Int sSessionID s = fromIntegral $ ncSessionId s -- | Add a new session (in 'AwaitingSessionPacket' state) to the -- 'AggregateSession'. If the supplied session is not compatible because it is -- between the wrong ToxIDs or because the AggregateSession is closed, -- 'RejectedSession' will be returned. Otherwise, the operation is successful. -- -- The status-change callback may be triggered by this call as the aggregate -- may transition from 'Dormant' (empty) to 'AwaitingSessionPacket' (at least -- one active session). addSession :: AggregateSession -> Session -> IO AddResult addSession c s = do (result,mcon,replaced) <- atomically $ do let them = ncTheirPublicKey s me = ncMyPublicKey s compat <- checkCompatible me them c let result = case compat of Nothing -> FirstSession Just True -> AddedSession Just False -> RejectedSession case result of RejectedSession -> return (result,Nothing,Nothing) _ -> do statvar <- newTVar Dormant imap <- readTVar (contactSession c) let con = SingleCon s statvar s0 = IntMap.lookup (sSessionID s) imap imap' = IntMap.insert (sSessionID s) con imap writeTVar (contactSession c) imap' return (result,Just con,s0) mapM_ (destroySession . singleSession) replaced forM_ mcon $ \con -> forkSession c s $ \progress -> do writeTVar (singleStatus con) progress emap <- readTVar (contactEstablished c) emap' <- case progress of Established -> do when (IntMap.null emap) $ notifyState c c s Established return $ IntMap.insert (sSessionID s) () emap _ -> do let emap' = IntMap.delete (sSessionID s) emap when (IntMap.null emap' && not (IntMap.null emap)) $ do imap <- readTVar (contactSession c) notifyState c c s $ if IntMap.null imap then Dormant else InProgress AwaitingSessionPacket return emap' writeTVar (contactEstablished c) emap' return result -- | Information returned from 'delSession'. data DelResult = NoSession -- ^ Contact is completely disconnected. | DeletedSession -- ^ Connection removed but session remains active. -- | Close and remove the componenent session corresponding to the provided -- Session ID. -- -- The status-change callback may be triggered as the aggregate may may -- transition to 'Dormant' (empty) or 'AwaitingSessionPacket' (if the last -- 'Established' session is closed). delSession :: AggregateSession -> Int -> IO DelResult delSession c sid = do (con, r) <- atomically $ do imap <- readTVar (contactSession c) emap <- readTVar (contactEstablished c) let emap' = IntMap.delete sid emap imap' = IntMap.delete sid imap case IntMap.toList emap of (sid0,_):_ | IntMap.null emap' , let s = singleSession $ imap IntMap.! sid0 -> notifyState c c s $ if IntMap.null imap' then Dormant else InProgress AwaitingSessionPacket _ -> return () writeTVar (contactSession c) imap' writeTVar (contactEstablished c) emap' return ( IntMap.lookup sid imap, IntMap.null imap') mapM_ (destroySession . singleSession) con return $ if r then NoSession else DeletedSession -- | Send a packet to one or all of the component sessions in the aggregate. dispatchMessage :: AggregateSession -> Maybe Int -- ^ 'Nothing' to broadcast, otherwise SessionID. -> CryptoMessage -> IO () dispatchMessage c msid msg = join $ atomically $ do imap <- readTVar (contactSession c) let go = case msid of Nothing -> forM_ imap Just sid -> forM_ (IntMap.lookup sid imap) return $ go $ \con -> do eResult <- sendLossless (transportCrypto (ncAllSessions (singleSession con))) (singleSession con) msg case eResult of Left msg -> dput XJabber msg Right pkt -> dput XJabber ("sendLossLess SUCCESS: " ++ show pkt) -- | Retry until: -- -- * a packet arrives (with component session ID) arrives. -- -- * the 'AggregateSession' is closed with 'closeAll'. awaitAny :: AggregateSession -> STM (Maybe (Int,CryptoMessage)) awaitAny c = readTMChan (contactChannel c) -- | Close all connections associated with the aggregate. No new sessions will -- be accempted after this, and the notify callback will be informed that we've -- transitioned to 'Dormant'. closeAll :: AggregateSession -> IO () closeAll c = join $ atomically $ do imap <- readTVar (contactSession c) closeTMChan (contactChannel c) return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid -- | Query the current status of the aggregate, there are three possible -- values: -- -- [ Dormant ] - No pending or established sessions. -- -- [ InProgress AwaitingSessionPacket ] - Sessions are pending, but none are -- fully established. -- -- [ Established ] - At least one session is fully established and we can -- send and receive packets via this aggregate. -- aggregateStatus :: AggregateSession -> STM (Status ToxProgress) aggregateStatus c = do isclosed <- isClosedTMChan (contactChannel c) imap <- readTVar (contactSession c) emap <- readTVar (contactEstablished c) return $ case () of _ | isclosed -> Dormant | not (IntMap.null emap) -> Established | not (IntMap.null imap) -> InProgress AwaitingSessionPacket | otherwise -> Dormant -- | Query whether the supplied ToxID keys are compatible with this aggregate. -- -- [ Nothing ] Any keys would be compatible because there is not yet any -- sessions in progress. -- -- [ Just True ] The supplied keys match the session in progress. -- -- [ Just False ] The supplied keys are incompatible. checkCompatible :: PublicKey -- ^ Local Tox key (for which we know the secret). -> PublicKey -- ^ Remote Tox key. -> AggregateSession -> STM (Maybe Bool) checkCompatible me them c = do isclosed <- isClosedTMChan (contactChannel c) imap <- readTVar (contactSession c) return $ case IntMap.elems imap of _ | isclosed -> Just False -- All keys are incompatible (closed). con:_ -> Just $ ncTheirPublicKey (singleSession con) == them && (ncMyPublicKey $ singleSession con) == me [] -> Nothing -- | Returns the local and remote keys that are compatible with this aggregate. -- If 'Nothing' Is returned, then either no key is compatible ('closeAll' was -- called) or all keys are compatible because no sessions have been associated. compatibleKeys :: AggregateSession -> STM (Maybe (PublicKey,PublicKey)) compatibleKeys c = do isclosed <- isClosedTMChan (contactChannel c) imap <- readTVar (contactSession c) return $ case IntMap.elems imap of _ | isclosed -> Nothing -- none. con:_ -> Just ( ncMyPublicKey $ singleSession con , ncTheirPublicKey (singleSession con)) [] -> Nothing -- any.