From e8446341d0dbe9b466571fa10875141ed22fbb47 Mon Sep 17 00:00:00 2001 From: "jim@bo" Date: Thu, 21 Jun 2018 23:12:29 -0400 Subject: NetCrypto IdleEvents,TimeOuts --- PingMachine.hs | 48 ++++++++++++++++ examples/dhtd.hs | 29 ++++++++++ g | 2 +- src/Network/Tox/Crypto/Handlers.hs | 115 ++++++++++++++++++++++++------------- 4 files changed, 153 insertions(+), 41 deletions(-) diff --git a/PingMachine.hs b/PingMachine.hs index 5cd70f95..4a1cb008 100644 --- a/PingMachine.hs +++ b/PingMachine.hs @@ -1,6 +1,8 @@ {-# LANGUAGE CPP #-} +{-# LANGUAGE TupleSections #-} module PingMachine where +import Control.Applicative import Control.Monad import Data.Function #ifdef THREAD_DEBUG @@ -89,6 +91,52 @@ forkPingMachine label idle timeout = do , pingStarted = started } +-- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically +-- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread +-- regardless of idle value. +forkPingMachineDynamic + :: String + -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. + -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. + -> IO PingMachine +forkPingMachineDynamic label idleV timeoutV = do + d <- interruptibleDelay + flag <- atomically $ newTVar False + canceled <- atomically $ newTVar False + event <- atomically newEmptyTMVar + started <- atomically $ newEmptyTMVar + void . forkIO $ do + myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") + (>>=) (atomically (readTMVar started)) $ flip when $ do + fix $ \loop -> do + atomically $ writeTVar flag False + (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV + fin <- startDelay d (1000*idle) + (>>=) (atomically (readTMVar started)) $ flip when $ do + if (not fin) then loop + else do + -- Idle event + atomically $ do + tryTakeTMVar event + putTMVar event PingIdle + writeTVar flag True + fin <- startDelay d (1000*timeout) + (>>=) (atomically (readTMVar started)) $ flip when $ do + me <- myThreadId + if (not fin) then loop + else do + -- Timeout event + atomically $ do + tryTakeTMVar event + writeTVar flag False + putTMVar event PingTimeOut + return PingMachine + { pingFlag = flag + , pingInterruptible = d + , pingEvent = event + , pingStarted = started + } + -- | Terminate the watchdog thread. Call this upon connection close. -- -- You should ensure no threads are waiting on 'pingWait' because there is no diff --git a/examples/dhtd.hs b/examples/dhtd.hs index ce6cc8f7..6ef4539f 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -702,6 +702,35 @@ clientSession s@Session{..} sock cnum h = do else do rows <- sessionsReport hPutClient h (showColumns (headers:rows)) + -- session set key val + ("session",s) | (idStr,"set",unstripped) <- twoWords s + , (key,val,unstripped2) <- twoWords unstripped + , let setmap = [("ncRequestInterval", \s x -> writeTVar (Tox.ncRequestInterval s) x) + ,("ncAliveInterval", \s x -> writeTVar (Tox.ncAliveInterval s) x) + ,("ncIdleEvent", \s x -> writeTVar (Tox.ncIdleEvent s) x) + ,("ncTimeOut", \s x -> writeTVar (Tox.ncTimeOut s) x) + ] + , Just stmFunc <- Data.List.lookup key setmap + -> cmd0 $ do + lrSession <- strToSession idStr + case lrSession of + Left s -> hPutClient h s + Right session -> do + case readMaybe val of + Just (x::Int) -> do + atomically (stmFunc session x) + hPutClient h $ "Session " ++ idStr ++ ": " ++ key ++ " = " ++ val + _ -> + hPutClient h $ "Invalid " ++ key ++ " value: " ++ val + + -- report error when setting invalid keys + ("session",s) | (idStr,"set",unstripped) <- twoWords s + , (key,val,unstripped2) <- twoWords unstripped + -> cmd0 $ do + lrSession <- strToSession idStr + case lrSession of + Left s -> hPutClient h s + Right session -> hPutClient h $ "What is " ++ key ++ "?" -- session tail -- show context (latest lossless messages) ("session", s) | (idStr,tailcmd,unstripped) <- twoWords s diff --git a/g b/g index e0d5c2b4..ca17ec88 100755 --- a/g +++ b/g @@ -2,7 +2,7 @@ rootname=$(cat /etc/debian_chroot 2>/dev/null) echo $PATH | grep '\.stack' >/dev/null && rootname="stack" -BUILDB=build/b +BUILDB=.stack-work/dist/x86_64-linux/Cabal-2.0.1.0/build warn="-freverse-errors -fwarn-unused-imports -Wmissing-signatures -fdefer-typed-holes" exts="-XOverloadedStrings -XRecordWildCards" diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index c43542b1..2211e0f2 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -311,6 +311,7 @@ data NetCryptoSession = NCrypto -- where as the prior fields will be used in any implementation -- , ncHooks :: TVar (Map.Map MessageType [NetCryptoHook]) , ncUnrecognizedHook :: TVar (MessageType -> NetCryptoHook) + , ncIdleEventHooks :: TVar [(Int,NetCryptoSession -> IO ())] , ncIncomingTypeArray :: TVar MsgTypeArray -- ^ This array maps 255 Id bytes to MessageType -- It should contain all messages this session understands. @@ -342,11 +343,19 @@ data NetCryptoSession = NCrypto -- ^ a buffer in which incoming packets may be stored out of order -- but from which they may be extracted in sequence, -- helps ensure lossless packets are processed in order - , ncRequestInterval :: TVar Int - -- ^ How long (in microseconds) to wait between packet requests - -- , ncStoredRequests :: CyclicBuffer CryptoData + , ncStoredRequests :: CyclicBuffer CryptoData -- ^ Store the last 5 packet requests, try handling in any order -- if the connection seems like it is locked (TODO) + , ncRequestInterval :: TVar Int + -- ^ How long (in miliseconds) to wait between packet requests + , ncAliveInterval :: TVar Int + -- ^ How long before the next ALIVE packet ("PING") + -- is to be sent regardless of activity + , ncTimeOut :: TVar Int + -- ^ How many miliseconds of inactivity before this session is abandoned + , ncIdleEvent :: TVar Int + -- ^ How many miliseconds of inactivity before emergency measures are taken + -- Emergency measures = (rehandle the packet requests stored in ncStoredRequests) , ncRequestThread :: TVar (Maybe ThreadId) -- ^ thread which sends packet requests , ncDequeueThread :: TVar (Maybe ThreadId) @@ -358,6 +367,7 @@ data NetCryptoSession = NCrypto -- ^ thread which triggers ping events , ncPingThread :: TVar (Maybe ThreadId) -- ^ thread which actually queues outgoing pings + , ncIdleEventThread :: TVar (Maybe ThreadId) , ncOutgoingQueue :: TVar (UponHandshake (PQ.PacketOutQueue @@ -387,6 +397,7 @@ data NetCryptoSessions = NCSessions , transportCrypto :: TransportCrypto , defaultHooks :: Map.Map MessageType [NetCryptoHook] , defaultUnrecognizedHook :: MessageType -> NetCryptoHook + , defaultIdleEventHooks :: [(Int,NetCryptoSession -> IO ())] , sessionView :: SessionView , msgTypeArray :: MsgTypeArray , inboundQueueCapacity :: Word32 @@ -453,6 +464,7 @@ newSessionsState crypto unrechook hooks = do , transportCrypto = crypto , defaultHooks = hooks , defaultUnrecognizedHook = unrechook + , defaultIdleEventHooks = [(0,handleRequestsOutOfOrder)] , sessionView = SessionView { svNick = nick , svStatus = status @@ -507,11 +519,11 @@ ncToWire getState seqno bufend pktno msg = do GrpMsg KnownLossless _ -> Lossless (state,n24,msgOutMapVar) <- getState -- msgOutMap <- readTVar msgOutMapVar - result1 <- trace ("lookupInRangeMap typ64=" ++ show typ64) + result1 <- dtrace XNetCrypto ("lookupInRangeMap typ64=" ++ show typ64) $ lookupInRangeMap typ64 msgOutMapVar case result1 of -- msgOutMapLookup typ64 msgOutMap of - Nothing -> trace "lookupInRangeMap gave Nothing!" $ return Nothing - Just outid -> trace ("encrypting packet with Nonce: " ++ show n24) $ do + Nothing -> dtrace XNetCrypto "lookupInRangeMap gave Nothing!" $ return Nothing + Just outid -> dtrace XNetCrypto ("encrypting packet with Nonce: " ++ show n24) $ do let setMessageId (OneByte _) mid = OneByte (toEnum8 mid) setMessageId (TwoByte _ x) mid = TwoByte (toEnum8 mid) x setMessageId (UpToN _ x) mid = UpToN (toEnum8 mid) x @@ -527,7 +539,7 @@ ncToWire getState seqno bufend pktno msg = do plain = encodePlain cd encrypted = encrypt state plain pkt = CryptoPacket { pktNonce = let r = nonce24ToWord16 n24 - in trace (printf "converting n24 to word16: 0x%x" r) r + in dtrace XNetCrypto (printf "converting n24 to word16: 0x%x" r) r , pktData = encrypted } in return (Just (pkt, pktno)) Lossless -> let cd = @@ -562,7 +574,7 @@ freshCryptoSession sessions let crypto = transportCrypto sessions allsessions = netCryptoSessions sessions allsessionsByKey = netCryptoSessionsByKey sessions - dmsg msg = trace msg (return ()) + dmsg msg = dtrace XNetCrypto msg (return ()) sessionId <- do x <- readTVar (nextSessionId sessions) modifyTVar (nextSessionId sessions) (+1) @@ -582,6 +594,7 @@ freshCryptoSession sessions cookie0 <- newTVar (HaveCookie otherCookie) ncHooks0 <- newTVar (defaultHooks sessions) ncUnrecognizedHook0 <- newTVar (defaultUnrecognizedHook sessions) + ncIdleEventHooks0 <- newTVar (defaultIdleEventHooks sessions) ncIncomingTypeArray0 <- newTVar (msgTypeArray sessions) let idMap = foldl (\mp (x,y) -> W64.insert x y mp) W64.empty (zip [0..255] [0..255]) (ncOutgoingIdMap0,lossyEscapeIdMap,losslessEscapeIdMap) <- do @@ -605,7 +618,7 @@ freshCryptoSession sessions Just theirSessionKey -> createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 mbpktoqVar <- newTVar mbpktoq lastNQ <- CB.new 10 0 :: STM (CyclicBuffer (Bool,(ViewSnapshot,InOrOut CryptoMessage))) - -- ncStoredRequests0 <- CB.new 5 0 :: STM (CyclicBuffer CryptoData) + ncStoredRequests0 <- CB.new 5 0 :: STM (CyclicBuffer CryptoData) listeners <- newTVar IntMap.empty msgNum <- newTVar 0 dropNum <- newTVar 0 @@ -613,12 +626,19 @@ freshCryptoSession sessions dmsg $ "freshCryptoSession: Session ncTheirBaseNonce=" ++ show theirbasenonce dmsg $ "freshCryptoSession: My Session Public =" ++ show (key2id $ toPublic newsession) ncTheirSessionPublic0 <- newTVar (frmMaybe mbtheirSessionKey) - ncRequestInterval0 <- newTVar 2000000 -- (TODO: shrink this) long interval while debugging slows trace flood ncRequestThread0 <- newTVar Nothing ncDequeueThread0 <- newTVar Nothing ncDequeueOutGoingThread0 <- newTVar Nothing ncPingMachine0 <- newTVar Nothing ncPingThread0 <- newTVar Nothing + ncIdleEventThread0 <- newTVar Nothing + ncRequestInterval0 <- newTVar 2000 -- (TODO: shrink this) long interval while debugging slows trace flood + ncAliveInterval0 <- newTVar 8000 -- 8 seconds + -- ping Machine parameters + fuzz <- return 0 -- randomRIO (0,2000) -- Fuzz to prevent simultaneous ping/pong exchanges. + -- Disabled because tox has no pong event. + ncTimeOut0 <- newTVar 32000 -- 32 seconds + ncIdleEvent0 <- newTVar (5000 + fuzz) -- 5 seconds let netCryptoSession0 = NCrypto { ncState = ncState0 , ncMyPublicKey = toPublic key @@ -634,6 +654,7 @@ freshCryptoSession sessions , ncSockAddr = HaveDHTKey addr , ncHooks = ncHooks0 , ncUnrecognizedHook = ncUnrecognizedHook0 + , ncIdleEventHooks = ncIdleEventHooks0 , ncAllSessions = sessions , ncIncomingTypeArray = ncIncomingTypeArray0 , ncOutgoingIdMap = ncOutgoingIdMap0 @@ -641,13 +662,17 @@ freshCryptoSession sessions , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap , ncView = ncView0 , ncPacketQueue = pktq - -- , ncStoredRequests = ncStoredRequests0 (TODO) + , ncStoredRequests = ncStoredRequests0 , ncRequestInterval = ncRequestInterval0 + , ncAliveInterval = ncAliveInterval0 + , ncTimeOut = ncTimeOut0 + , ncIdleEvent = ncIdleEvent0 , ncRequestThread = ncRequestThread0 , ncDequeueThread = ncDequeueThread0 , ncDequeueOutGoingThread = ncDequeueOutGoingThread0 , ncPingMachine = ncPingMachine0 , ncPingThread = ncPingThread0 + , ncIdleEventThread = ncIdleEventThread0 , ncOutgoingQueue = mbpktoqVar , ncLastNMsgs = lastNQ , ncListeners = listeners @@ -674,7 +699,7 @@ createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce atomically $ do n24 <- readTVar ncMyPacketNonce0 let n24plus1 = incrementNonce24 n24 - trace ("ncMyPacketNonce+1=" ++ show n24plus1 + dtrace XNetCrypto ("ncMyPacketNonce+1=" ++ show n24plus1 ++ "\n toWireIO: theirSessionKey = " ++ show (key2id theirSessionKey) ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) ) $ writeTVar ncMyPacketNonce0 n24plus1 @@ -762,7 +787,7 @@ runUponHandshake netCryptoSession0 addr pktoq = do atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) fix $ \loop -> do - atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay + atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) nums <- atomically $ PQ.getMissing pktq dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums getOutGoingParam <- PQ.readyOutGoing pktoq @@ -786,30 +811,35 @@ runUponHandshake netCryptoSession0 addr pktoq = do loop dput XNetCrypto $ "runUponHandshake: " ++ show threadidOutgoing ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr -- launch ping Machine thread - fuzz <- return 0 -- randomRIO (0,2000) -- Fuzz to prevent simultaneous ping/pong exchanges. - -- Disabled because tox has no pong event. - pingMachine <- forkPingMachine ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (8000 + fuzz) 4000 + pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) atomically $ writeTVar (ncPingMachine netCryptoSession0) (Just pingMachine) -- launch ping thread pingThreadId <- forkIO $ do tid <- myThreadId atomically $ writeTVar (ncPingThread netCryptoSession0) (Just tid) labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) + fix $ \loop -> do + atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) + dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" + lr <- sendPing crypto netCryptoSession0 + case lr of + Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s + Right _ -> return () + loop + atomically $ writeTVar (ncPingThread netCryptoSession0) (Just pingThreadId) + -- launch IdleEvent thread + idleThreadId <- forkIO $ do + tid <- myThreadId + atomically $ writeTVar (ncPingThread netCryptoSession0) (Just tid) + labelThread tid ("NetCryptoIdleEvent." ++ show (key2id remotePublicKey) ++ sidStr) event <- atomically $ pingWait pingMachine case event of - PingIdle -> do - dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingIdle" - -- Normally, we would not bump the PingMachine until we receive - -- an inbound packet. We are doing this here because tox has - -- no pong response packet and so we need to mark the - -- connection non-idle here. Doing this prevents a PingTimeOut - -- from ever occurring. (TODO: handle timed-out sessions somehow.) - pingBump pingMachine - lr <- sendPing crypto netCryptoSession0 - case lr of - Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s - Right _ -> return () - PingTimeOut -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") PingTimeOut TODO" + PingIdle -> do + hooks <- atomically (readTVar (ncIdleEventHooks netCryptoSession0)) + mapM_ (($ netCryptoSession0) . snd) hooks + PingTimeOut -> destroySession netCryptoSession0 + atomically $ writeTVar (ncIdleEventThread netCryptoSession0) (Just idleThreadId) + -- update session with thread ids let netCryptoSession = netCryptoSession0 -- add this session to the lookup maps @@ -852,6 +882,8 @@ destroySession session = do stopMachine (ncPingMachine session) stopThread (ncDequeueThread session) stopThread (ncDequeueOutGoingThread session) + stopThread (ncRequestThread session) + stopThread (ncIdleEventThread session) -- | Called when we get a handshake, but there's already a session entry. -- @@ -948,7 +980,7 @@ handshakeH sessions addrRaw hshake@(Handshake (Cookie n24 ecookie) nonce24 encry -- IO action to get a new session key in case we need it in transaction to come newsession <- generateSecretKey -- Do a lookup, so we can handle the update case differently - let dmsg msg = trace msg (return ()) + let dmsg msg = dtrace XNetCrypto msg (return ()) timestamp <- getPOSIXTime (myhandshake,launchThreads) <- atomically $ do @@ -987,7 +1019,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do return Nothing -- drop packet, we have no session Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketQueue, ncHooks, ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, - ncPingMachine, ncSessionId}) -> do + ncPingMachine, ncSessionId, ncStoredRequests}) -> do -- Unrecognized packets, try them thrice so as to give -- handshakes some time to come in -- TODO: Remove this loop, as it is probably unnecessary. @@ -1048,7 +1080,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do atomically $ do HaveHandshake y <- readTVar ncTheirBaseNonce let x = addtoNonce24 y (fromIntegral dATA_NUM_THRESHOLD) - trace ("nonce y(" ++ show y ++ ") + " ++ show (fromIntegral dATA_NUM_THRESHOLD) + dtrace XNetCrypto ("nonce y(" ++ show y ++ ") + " ++ show (fromIntegral dATA_NUM_THRESHOLD) ++ " = " ++ show x) (return ()) writeTVar ncTheirBaseNonce (HaveHandshake y) -- then set session confirmed, @@ -1056,13 +1088,8 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do -- bump ping machine ncPingMachine0 <- atomically $ readTVar ncPingMachine case ncPingMachine0 of - Just pingMachine -> -- Normally, we would bump the PingMachine to mark the connection - -- as non-idle so that we don't need to send a ping message. - -- Because tox has no pong message, we need to send a ping every - -- eight seconds regardless, so we will let the PingIdle event be - -- signaled even when we receive packets. - -- pingBump pingMachine - return () + -- the ping machine is used to detect inactivity and respond accordingly + Just pingMachine -> pingBump pingMachine Nothing -> return () msgTypes <- atomically $ readTVar ncIncomingTypeArray let msgTyp = cd ^. messageType @@ -1078,12 +1105,15 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm when (msgID cm == PING) $ dput XNetCrypto $ "NetCrypto Recieved PING (session " ++ show ncSessionId ++")" + when (msgID cm == PacketRequest) . atomically $ do + num <- CB.getNextSequenceNum ncStoredRequests + CB.enqueue ncStoredRequests num cd atomically $ PQ.enqueue ncPacketQueue bufferEnd cd return Nothing where last2Bytes :: Nonce24 -> Word16 last2Bytes (Nonce24 bs) = case S.decode (B.drop 22 bs) of - Right n -> n -- trace ("byteSwap16 " ++ printf "0x%x" n ++ " = " ++ printf "0x%x" (byteSwap16 n)) $ byteSwap16 n + Right n -> n -- dtrace XNetCrypto ("byteSwap16 " ++ printf "0x%x" n ++ " = " ++ printf "0x%x" (byteSwap16 n)) $ byteSwap16 n _ -> error "unreachable-last2Bytes" dATA_NUM_THRESHOLD = 21845 -- = 65535 / 3 @@ -1337,6 +1367,11 @@ defaultCryptoDataHooks , (Msg KillPacket, [defaultKillHook]) ] +handleRequestsOutOfOrder :: NetCryptoSession -> IO () +handleRequestsOutOfOrder session = do + cds <- atomically $ CB.cyclicBufferViewList (ncStoredRequests session) + mapM_ (handlePacketRequest session) (map snd cds) + handlePacketRequest :: NetCryptoSession -> CryptoData -> IO () handlePacketRequest session (CryptoData { bufferStart=num , bufferData=cm@(msgID -> PacketRequest) -- cgit v1.2.3