From d96234b9954fb2e41521eb437edf2fee7317f7d6 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Wed, 12 Dec 2018 02:01:39 -0500 Subject: TCP Prober. --- TCPProber.hs | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 TCPProber.hs (limited to 'TCPProber.hs') diff --git a/TCPProber.hs b/TCPProber.hs new file mode 100644 index 00000000..8d468e53 --- /dev/null +++ b/TCPProber.hs @@ -0,0 +1,162 @@ +{-# LANGUAGE LambdaCase #-} +module TCPProber where + +import Control.Concurrent +import GHC.Conc + +import Control.Arrow +import Control.Concurrent.STM +import Control.Monad +import Data.Function +import Data.IP +import Data.Maybe +import Data.Time.Clock.POSIX +import Network.Socket +import System.Timeout + +import Crypto.Tox +import Data.Wrapper.PSQ as PSQ +import Network.Kademlia.Search +import Network.Tox.NodeId +import qualified Network.Tox.TCP as TCP + +resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber) +resolvePort tcp ni = do + got <- newTVarIO Nothing + cnt <- newTVarIO 0 + let n port = TCP.NodeInfo ni port + forkPort port = do + atomically $ modifyTVar' cnt succ + t <- forkIO $ do + m <- TCP.tcpPing tcp $ n port + atomically $ do + mg <- readTVar got + when (isNothing mg) + $ forM_ m $ \_ -> writeTVar got $ Just port + modifyTVar' cnt pred + labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni) + return t + readResult = atomically $ do + m <- readTVar got + case m of + Just v -> return $ Just v + Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing + t443 <- forkPort 443 + t80 <- forkPort 80 + p <- timeout 1000000 readResult >>= \case + Just (Just p) -> do + killThread t443 + killThread t80 + return $ Just p + _ -> do + let uport = nodePort ni + tudp <- forM (guard $ uport `notElem` [443,80,3389,33445]) + $ \() -> forkPort uport + t3389 <- forkPort 3389 + t33445 <- forkPort 33445 + p <- readResult + mapM_ killThread [t443,t80,t3389,t33445] + mapM_ killThread (tudp :: Maybe ThreadId) + return p + return p + +data TCPProber = TCPProber + { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) + , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) + , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count. + , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-})) + , probeCacheCount :: TVar Int + } + +newProber :: STM TCPProber +newProber = do + q <- newTVar PSQ.empty + spill <- newTVar PSQ.empty + spillcnt <- newTVar 0 + cache <- newTVar PSQ.empty + cachecnt <- newTVar 0 + return TCPProber + { probeQueue = q + , probeSpill = spill + , probeSpillCount = spillcnt + , probeCache = cache + , probeCacheCount = cachecnt + } + + +enqueueProbe :: TCPProber -> NodeInfo -> IO () +enqueueProbe prober ni = do + tm <- getPOSIXTime + atomically $ do + spill <- readTVar (probeSpill prober) + cache <- readTVar (probeCache prober) + q <- readTVar (probeQueue prober) + let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm + case PSQ.lookup (nodeId ni) cache of + Just (tm, x) -> bump (probeCache prober) x + Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni) + | member (nodeId ni) q -> return () + | otherwise -> bump (probeQueue prober) (nodeAddr ni) + +maxSpill :: Int +maxSpill = 100 + +maxCache :: Int +maxCache = 50 + +runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO () +runProbeQueue prober client maxjobs = do + jcnt <- newTVarIO 0 + fix $ \loop -> do + (tm, mni) <- atomically $ do + j <- readTVar jcnt + check (j < maxjobs) + q <- readTVar $ probeQueue prober + case minView q of + Nothing -> retry + Just (Binding nid saddr tm,q') -> do + writeTVar (probeQueue prober) q' + return (tm, nodeInfo nid saddr) + forM_ mni $ \ni -> do + atomically $ modifyTVar' jcnt succ + t <- forkIO $ do + m <- resolvePort client ni + atomically $ case m of + Nothing -> do + pcnt <- readTVar (probeSpillCount prober) + modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm + if (pcnt == maxSpill) + then modifyTVar' (probeSpill prober) deleteMin + else modifyTVar' (probeSpillCount prober) succ + Just p -> do + ccnt <- readTVar (probeCacheCount prober) + modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm + if (ccnt == maxCache) + then modifyTVar' (probeCache prober) deleteMin + else modifyTVar' (probeCacheCount prober) succ + atomically $ modifyTVar' jcnt pred + labelThread t ("probe."++show ni) + loop + +getNodes :: TCPProber -> TCP.TCPClient err () Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Maybe ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ())) +getNodes prober tcp seeking dst = do + r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst) + let tcps (ns,_,mb) = (ns',ns',mb) + where ns' = do + n <- ns + [ TCP.NodeInfo n 0 ] + fmap join $ forM r $ \(ns,gw) -> do + let ts = tcps ns + if TCP.nodeId gw == TCP.nodeId dst + then return $ Just ts + else do + enqueueProbe prober (TCP.udpNodeInfo dst) + return $ Just ts + return $ Just ts + +nodeSearch :: TCPProber -> TCP.TCPClient err () Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo +nodeSearch prober tcp = Search + { searchSpace = TCP.tcpSpace + , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort + , searchQuery = getNodes prober tcp + } -- cgit v1.2.3