{-# LANGUAGE CPP #-} {-# LANGUAGE LambdaCase #-} module TCPProber where #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent import GHC.Conc #endif import Control.Arrow import Control.Concurrent.STM import Control.Monad import Data.Function import Data.IP import Data.Maybe import Data.Time.Clock.POSIX import Network.Socket import System.Timeout import DPut import DebugTag import Crypto.Tox import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search import Network.Tox.NodeId import qualified Network.Tox.TCP as TCP import Network.QueryResponse as QR -- Probe TCP ports in a staggered fashion to up the odds of discovering -- a higher priority port like 443. resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber) resolvePort tcp ni = do got <- newTVarIO Nothing cnt <- newTVarIO 0 let n port = TCP.NodeInfo ni port forkPort port = do atomically $ modifyTVar' cnt succ t <- forkIO $ do dput XTCP $ "TCP-probe pinging " ++ show (n port) m <- TCP.tcpPing tcp $ n port atomically $ do mg <- readTVar got when (isNothing mg) $ forM_ m $ \_ -> writeTVar got $ Just port modifyTVar' cnt pred labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni) return t readResult = atomically $ do m <- readTVar got case m of Just v -> return $ Just v Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing t443 <- forkPort 443 t80 <- forkPort 80 p <- timeout 500000 readResult >>= \case Just (Just p) -> do killThread t443 killThread t80 return $ Just p _ -> do t3389 <- forkPort 3389 timeout 500000 readResult >>= \case Just (Just p) -> do killThread t3389 killThread t443 killThread t80 return $ Just p _ -> do let uport = nodePort ni tudp <- forM (guard $ uport `notElem` [443,80,3389,33445]) $ \() -> forkPort uport t33445 <- forkPort 33445 p <- readResult mapM_ killThread [t443,t80,t3389,t33445] mapM_ killThread (tudp :: Maybe ThreadId) return p return p data TCPProber = TCPProber { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count. , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-})) , probeCacheCount :: TVar Int } newProber :: STM TCPProber newProber = do q <- newTVar PSQ.empty spill <- newTVar PSQ.empty spillcnt <- newTVar 0 cache <- newTVar PSQ.empty cachecnt <- newTVar 0 return TCPProber { probeQueue = q , probeSpill = spill , probeSpillCount = spillcnt , probeCache = cache , probeCacheCount = cachecnt } enqueueProbe :: TCPProber -> NodeInfo -> IO () enqueueProbe prober ni = do tm <- getPOSIXTime atomically $ do spill <- readTVar (probeSpill prober) cache <- readTVar (probeCache prober) q <- readTVar (probeQueue prober) let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm case PSQ.lookup (nodeId ni) cache of Just (tm, x) -> bump (probeCache prober) x Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni) | member (nodeId ni) q -> return () | otherwise -> bump (probeQueue prober) (nodeAddr ni) maxSpill :: Int maxSpill = 100 maxCache :: Int maxCache = 50 runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO () runProbeQueue prober client maxjobs = do jcnt <- newTVarIO 0 fix $ \loop -> do (tm, mni) <- atomically $ do j <- readTVar jcnt check (j < maxjobs) q <- readTVar $ probeQueue prober case minView q of Nothing -> retry Just (Binding nid saddr tm,q') -> do writeTVar (probeQueue prober) q' return (tm, nodeInfo nid saddr) forM_ mni $ \ni -> do atomically $ modifyTVar' jcnt succ t <- forkIO $ do m <- resolvePort client ni atomically $ case m of Nothing -> do pcnt <- readTVar (probeSpillCount prober) modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm if (pcnt == maxSpill) then modifyTVar' (probeSpill prober) deleteMin else modifyTVar' (probeSpillCount prober) succ Just p -> do ccnt <- readTVar (probeCacheCount prober) modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm if (ccnt == maxCache) then modifyTVar' (probeCache prober) deleteMin else modifyTVar' (probeCacheCount prober) succ atomically $ modifyTVar' jcnt pred labelThread t ("probe."++show ni) loop getNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ())) getNodes prober tcp seeking dst = do r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst) dput XTCP $ "Got via TCP nodes: " ++ show r let tcps (ns,_,mb) = (ns',ns',mb) where ns' = do n <- ns [ TCP.NodeInfo n 0 ] case r of Success (ns,gw) -> do let ts = tcps ns if TCP.nodeId gw == TCP.nodeId dst then return $ Success ts else do enqueueProbe prober (TCP.udpNodeInfo dst) return $ Success ts return $ Success ts _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r asyncGetNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> (Nonce8 -> Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()) -> IO ()) -> IO Nonce8 asyncGetNodes prober tcp seeking dst withResponse = do TCP.asyncUDPNodes tcp seeking (TCP.udpNodeInfo dst) $ \qid r -> do dput XTCP $ "Got via TCP nodes: " ++ show r let tcps (ns,_,mb) = (ns',ns',mb) where ns' = do n <- ns [ TCP.NodeInfo n 0 ] r' <- case r of Success (ns,gw) -> do let ts = tcps ns if TCP.nodeId gw == TCP.nodeId dst then return $ Success ts else do enqueueProbe prober (TCP.udpNodeInfo dst) return $ Success ts return $ Success ts _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r withResponse qid r' nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo Nonce8 nodeSearch prober tcp = Search { searchSpace = TCP.tcpSpace , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort , searchQuery = asyncGetNodes prober tcp , searchQueryCancel = cancelQuery (TCP.tcpClient tcp) , searchAlpha = 8 , searchK = 16 }