From 7a3ced91da125eebbbee399fc36162c2c3b9716d Mon Sep 17 00:00:00 2001 From: James Crayne Date: Tue, 29 May 2018 05:32:43 +0000 Subject: dhtd & Network.Tox.Crypto.Handlers changes: * merge PerSession into NetCryptoSession * define defaultUnRecHook function * netcrypto command wip for testing * update to sessions command --- dht-client.cabal | 5 ++- examples/dhtd.hs | 84 +++++++++++++++++--------------------- src/Network/Tox/Crypto/Handlers.hs | 61 ++++++++++++++++++++++++++- 3 files changed, 100 insertions(+), 50 deletions(-) diff --git a/dht-client.cabal b/dht-client.cabal index 60a42f00..02b5cdbe 100644 --- a/dht-client.cabal +++ b/dht-client.cabal @@ -4,7 +4,7 @@ license: BSD3 license-file: LICENSE author: Joe Crayne maintainer: Joe Crayne -copyright: (c) 2017 Joe Crayne, (c) 2013, Sam Truzjan +copyright: (c) 2017 Joe Crayne, (c) 2017 James Crayne, (c) 2013 Sam Truzjan category: Network build-type: Custom cabal-version: >= 1.10 @@ -144,6 +144,8 @@ library , hashable , iproute , stm >= 2.4.0 + , stm-chans + , concurrent-supply , base16-bytestring , base32-bytestring , base64-bytestring @@ -261,6 +263,7 @@ executable dhtd , containers , stm , stm-chans + , concurrent-supply , cereal , bencoding , unordered-containers diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 47a4cd46..df8cf1c4 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -107,6 +107,7 @@ import XMPPServer import Connection import ToxToXMPP import qualified Connection.Tcp as Tcp (ConnectionEvent(..)) +import Control.Concurrent.Supply showReport :: [(String,String)] -> String @@ -396,7 +397,6 @@ data Session = Session , userkeys :: TVar [(SecretKey,PublicKey)] , roster :: Tox.ContactInfo , announceToLan :: IO () - , sessions :: TVar [PerSession] , connectionManager :: Maybe ConnectionManager , onionRouter :: OnionRouter , announcer :: Announcer @@ -433,7 +433,7 @@ clientSession0 s sock cnum h = do else throwIO e readKeys :: TVar [(SecretKey, PublicKey)] - -> TVar (HashMap.HashMap Tox.NodeId Account) + -> TVar (HashMap.HashMap Tox.NodeId Account) -- ContactInfo { accounts } -> STM [(SecretKey, PublicKey)] readKeys userkeys roster = do uks <- readTVar userkeys @@ -475,6 +475,7 @@ clientSession s@Session{..} sock cnum h = do , ["k"] , ["roster"] , ["sessions"] + , ["netcrypto"] , ["onion"] , ["g"] , ["p"] @@ -679,17 +680,19 @@ clientSession s@Session{..} sock cnum h = do hPutClientChunk h $ unlines [ dns, "", "Friend Requests" ] hPutClient h $ showReport frs - ("sessions", s) | "" <- strp s + ("sessions", s') | "" <- strp s' -> cmd0 $ do - sessions' <- atomically $ readTVar sessions :: IO [PerSession] - let sessionsReport = mapM showPerSession sessions' - headers = ["Key", "NextMsg", "Dropped","Handled","Unhandled"] - showPerSession (PerSession - { perSessionMsgs = msgQ - , perSessionPublicKey = pubKey - , perSessionAddr = sockAddr - , perSessionNumVar = msgNumVar - , perSessionDropCount = dropCntVar + sessions <- concat . Map.elems <$> (atomically $ readTVar (Tox.netCryptoSessionsByKey cryptosessions)) + let sessionsReport = mapM showPerSession sessions + headers = ["SessionID", "YourKey", "TheirKey", "NextMsg", "Dropped","Handled","Unhandled"] + showPerSession (Tox.NCrypto + { ncSessionId = id + , ncMyPublicKey = yourkey + , ncTheirPublicKey = theirkey + , ncLastNMsgs = msgQ + , ncSockAddr = sockAddr + , ncMsgNumVar = msgNumVar + , ncDropCntVar = dropCntVar }) = do num <- atomically (readTVar msgNumVar) dropped <- atomically (readTVar dropCntVar) @@ -697,13 +700,15 @@ clientSession s@Session{..} sock cnum h = do let (h,u) = partition (fst . snd) as countHandled = length h countUnhandled = length u - return [ show (Tox.key2id pubKey) -- "Key" + return [ printf "%x" id -- "SessionID" + , show (Tox.key2id yourkey) -- "YourKey" + , show (Tox.key2id theirkey)-- "TheirKey" , show num -- "NextMsg" , show dropped -- "Dropped" , show countHandled -- "Handled" , show countUnhandled -- "Unhandled" ] - if null sessions' + if null sessions then hPutClient h "No sessions." else do rows <- sessionsReport @@ -724,6 +729,19 @@ clientSession s@Session{..} sock cnum h = do hPutClientChunk h $ "trampolines: " ++ show (IntMap.size ts) ++ "\n" hPutClient h $ showColumns $ ["","responses","timeouts"]:r + ("netcrypto", s) + | Just DHT{..} <- Map.lookup netname dhts + -> cmd0 $ do + case selectedKey of + Nothing -> hPutClient h "No key is selected, see k command." + Just mypubkey -> do + let nidstr = strp s + goParse = either (hPutClient h . ("Bad netcrypto target: "++)) + goTarget + $ dhtParseId nidstr + goTarget nid = do + hPutClient h "TODO: convert selected public key to private, call netCrypto.." + goParse ("g", s) | Just DHT{..} <- Map.lookup netname dhts -> cmd0 $ do -- arguments: method @@ -1258,12 +1276,6 @@ announceToxJabberPeer echan laddr saddr pingflag tsrc tsnk #endif -data PerSession = PerSession { perSessionMsgs :: PacketQueue (Bool{-Handled?-},Tox.CryptoMessage) - , perSessionPublicKey :: PublicKey - , perSessionAddr :: SockAddr - , perSessionNumVar :: TVar Word32 - , perSessionDropCount :: TVar Word32 - } main :: IO () main = runResourceT $ liftBaseWith $ \resT -> do @@ -1372,7 +1384,6 @@ main = runResourceT $ liftBaseWith $ \resT -> do crypto <- Tox.newCrypto netCryptoSessionsState <- Tox.newSessionsState crypto Tox.defaultUnRecHook Tox.defaultCryptoDataHooks - sessions <- atomically (newTVar []) :: IO (TVar [PerSession]) (mbtox,quitTox,toxdhts,toxips,(taddrs::[SockAddr])) <- case porttox opts of "" -> return (Nothing,return (), Map.empty, return [],[]) toxport -> do @@ -1602,16 +1613,6 @@ main = runResourceT $ liftBaseWith $ \resT -> do -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState) let sockAddr = Tox.ncSockAddr netcrypto pubKey = Tox.ncTheirPublicKey netcrypto - msgQ <- atomically (Data.PacketQueue.newOverwrite 10 0 :: STM (PacketQueue (Bool,Tox.CryptoMessage))) - msgNumVar <- atomically (newTVar 0) - dropCntVar <- atomically (newTVar 0) - let perSession = PerSession { perSessionMsgs = msgQ - , perSessionPublicKey = pubKey - , perSessionAddr = sockAddr - , perSessionNumVar = msgNumVar - , perSessionDropCount = dropCntVar - } - atomically $ modifyTVar' sessions (perSession:) tmchan <- atomically newTMChan let Just pingMachine = Tox.ncPingMachine netcrypto pingflag = readTVar (pingFlag pingMachine) @@ -1624,21 +1625,11 @@ main = runResourceT $ liftBaseWith $ \resT -> do announceToxJabberPeer (xmppEventChannel sv) addrTox (Tox.ncSockAddr netcrypto) pingflag xmppSrc xmppSink -- TODO: Update toxContactInfo, connected. #endif - let handleIncoming typ session cm | any ($ typ) [Tox.isKillPacket, Tox.isOFFLINE] = atomically $ do - closeTMChan tmchan - Tox.forgetCrypto crypto netCryptoSessionsState netcrypto - return Nothing - handleIncoming mTyp session cm = do - atomically $ do - num <- readTVar msgNumVar - (wraps,offset) <- enqueue msgQ num (False,cm) - capacity <- getCapacity msgQ - let dropped = wraps * capacity + offset - modifyTVar' msgNumVar (+1) - writeTVar dropCntVar dropped - atomically $ writeTMChan tmchan cm -- (Tox.bufferData cd) - return Nothing - atomically $ writeTVar (Tox.ncUnrecognizedHook netcrypto) handleIncoming + atomically $ do + supply <- readTVar (Tox.listenerIDSupply netCryptoSessionsState) + let (listenerId,supply') = freshId supply + writeTVar (Tox.listenerIDSupply netCryptoSessionsState) supply' + modifyTVar' (Tox.ncListeners netcrypto) (IntMap.insert listenerId (0,tmchan)) return Nothing let dhts = Map.union btdhts toxdhts @@ -1667,7 +1658,6 @@ main = runResourceT $ liftBaseWith $ \resT -> do , userkeys = toxids , roster = rstr , announceToLan = fromMaybe (return ()) $ Tox.toxAnnounceToLan <$> mbtox - , sessions = sessions , connectionManager = ConnectionManager <$> mconns , onionRouter = orouter , externalAddresses = liftM2 (++) btips toxips diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 0e349196..602b14cc 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -10,6 +10,7 @@ import Network.Tox.DHT.Transport (Cookie(..),CookieData(..), CookieRequest(..), import Network.Tox.DHT.Handlers (Client, cookieRequest, createCookie ) import Crypto.Tox import Control.Concurrent.STM +import Control.Concurrent.STM.TMChan import Network.Address import qualified Data.Map.Strict as Map import Crypto.Hash @@ -37,6 +38,8 @@ import System.Random -- for ping fuzz import Control.Concurrent import GHC.Conc (labelThread) import PingMachine +import qualified Data.IntMap.Strict as IntMap +import Control.Concurrent.Supply -- util, todo: move to another module maybeToEither :: Maybe b -> Either String b @@ -83,6 +86,20 @@ data SessionView = SessionView type SessionID = Word64 +-- | Application specific listener type (Word64) +-- +-- This is some kind of information associated with a listening TChan. +-- It may be used to indicate what kind of packets it is interested in. +-- +-- 0 means listen to all messages and is done automatically in 'defaultUnRecHook' +-- any other values are left open to application specific convention. +-- +-- This module does not know what the different values here +-- mean, but code that sets hooks may adhere to a convention +-- defined elsewhere. +-- +type ListenerType = Word64 + data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatus , ncMyPublicKey :: PublicKey @@ -116,6 +133,18 @@ data NetCryptoSession = NCrypto CryptoMessage (CryptoPacket Encrypted) CryptoData + , ncLastNMsgs :: PacketQueue (Bool{-Handled?-},CryptoMessage) + -- ^ cyclic buffer, holds the last N non-handshake crypto messages + -- even if there is no attached user interface. + , ncListeners :: TVar (IntMap.IntMap (ListenerType,TMChan CryptoMessage)) + -- ^ user interfaces may "listen" by inserting themselves into this map + -- with a unique id and a new TChan, and then reading from the TChan + , ncMsgNumVar :: TVar Word32 + -- ^ The number of non-handshake crypto messages recieved in this session + -- TODO: there is already a packet num etc, do we need two? + , ncDropCntVar :: TVar Word32 + -- ^ The number of crypto messages that were overwritten in the ncLastNMsgs + -- before anybody got to see them. } data NetCryptoSessions = NCSessions @@ -131,6 +160,7 @@ data NetCryptoSessions = NCSessions , nextSessionId :: TVar SessionID , announceNewSessionHooks :: TVar [IOHook (Maybe NoSpam) NetCryptoSession] , sessionTransport :: Transport String SockAddr NetCrypto + , listenerIDSupply :: TVar Supply } type NewSessionHook = IOHook (Maybe NoSpam) NetCryptoSession @@ -173,6 +203,8 @@ newSessionsState crypto unrechook hooks = do svDownloadDir0 <- atomically $ newTVar (homedir "Downloads") nextSessionId0 <- atomically $ newTVar 0 announceNewSessionHooks0 <- atomically $ newTVar [] + lsupply <- newSupply + lsupplyVar <- atomically (newTVar lsupply) return NCSessions { netCryptoSessions = x , netCryptoSessionsByKey = x2 , transportCrypto = crypto @@ -195,6 +227,7 @@ newSessionsState crypto unrechook hooks = do , nextSessionId = nextSessionId0 , announceNewSessionHooks = announceNewSessionHooks0 , sessionTransport = error "Need to set sessionTransport field of NetCryptoSessions!" + , listenerIDSupply = lsupplyVar } data HandshakeParams @@ -338,6 +371,10 @@ freshCryptoSession sessions writeTVar ncMyPacketNonce0 n24plus1 return (return (f n24, n24, ncOutgoingIdMap0)) pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 + msgQ <- atomically (PQ.newOverwrite 10 0 :: STM (PacketQueue (Bool,CryptoMessage))) + listeners <- atomically $ newTVar IntMap.empty + msgNum <- atomically $ newTVar 0 + dropNum <- atomically $ newTVar 0 let netCryptoSession0 = NCrypto { ncState = ncState0 , ncMyPublicKey = toPublic key @@ -362,6 +399,10 @@ freshCryptoSession sessions , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" , ncOutgoingQueue = pktoq + , ncLastNMsgs = msgQ + , ncListeners = listeners + , ncMsgNumVar = msgNum + , ncDropCntVar = dropNum } -- launch dequeue thread threadid <- forkIO $ do @@ -615,9 +656,25 @@ allMsgTypes fDefault = A.listArray (minBound,maxBound) (0:knownMsgs) defaultCryptoDataHooks :: Map.Map MessageType [NetCryptoHook] defaultCryptoDataHooks = Map.empty --- | discards all unrecognized packets +-- | updates ncLastNMsgs, and sends message to type-0 listeners defaultUnRecHook :: MessageType -> NetCryptoHook -defaultUnRecHook _ _ _ = return Nothing +defaultUnRecHook typ session cm | any ($ typ) [isKillPacket, isOFFLINE] = atomically $ do + tmchans <- map snd . IntMap.elems <$> readTVar (ncListeners session) + forM_ tmchans $ \chan -> closeTMChan chan + return Nothing + +defaultUnRecHook typ session cm = do + let msgQ = ncLastNMsgs session + msgNumVar = ncMsgNumVar session + dropCntVar = ncDropCntVar session + atomically $ do + num <- readTVar msgNumVar + (wraps,offset) <- PQ.enqueue msgQ num (False,cm) + capacity <- PQ.getCapacity msgQ + let dropped = wraps * capacity + offset + modifyTVar' msgNumVar (+1) + writeTVar dropCntVar dropped + return Nothing -- | use to add a single hook to a specific session. addCryptoDataHook1 :: Map.Map MessageType [NetCryptoHook] -> MessageType -> NetCryptoHook -> Map.Map MessageType [NetCryptoHook] -- cgit v1.2.3