{-# LANGUAGE CPP #-} {-# LANGUAGE LambdaCase #-} module TCPProber where #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent import GHC.Conc #endif import Control.Arrow import Control.Concurrent.STM import Control.Monad import Data.Function import Data.IP import Data.Maybe import Data.Time.Clock.POSIX import Network.Socket import System.Timeout import DPut import DebugTag import Crypto.Tox import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search import Network.Tox.NodeId import qualified Network.Tox.TCP as TCP resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber) resolvePort tcp ni = do got <- newTVarIO Nothing cnt <- newTVarIO 0 let n port = TCP.NodeInfo ni port forkPort port = do atomically $ modifyTVar' cnt succ t <- forkIO $ do m <- TCP.tcpPing tcp $ n port atomically $ do mg <- readTVar got when (isNothing mg) $ forM_ m $ \_ -> writeTVar got $ Just port modifyTVar' cnt pred labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni) return t readResult = atomically $ do m <- readTVar got case m of Just v -> return $ Just v Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing t443 <- forkPort 443 t80 <- forkPort 80 p <- timeout 1000000 readResult >>= \case Just (Just p) -> do killThread t443 killThread t80 return $ Just p _ -> do let uport = nodePort ni tudp <- forM (guard $ uport `notElem` [443,80,3389,33445]) $ \() -> forkPort uport t3389 <- forkPort 3389 t33445 <- forkPort 33445 p <- readResult mapM_ killThread [t443,t80,t3389,t33445] mapM_ killThread (tudp :: Maybe ThreadId) return p return p data TCPProber = TCPProber { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count. , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-})) , probeCacheCount :: TVar Int } newProber :: STM TCPProber newProber = do q <- newTVar PSQ.empty spill <- newTVar PSQ.empty spillcnt <- newTVar 0 cache <- newTVar PSQ.empty cachecnt <- newTVar 0 return TCPProber { probeQueue = q , probeSpill = spill , probeSpillCount = spillcnt , probeCache = cache , probeCacheCount = cachecnt } enqueueProbe :: TCPProber -> NodeInfo -> IO () enqueueProbe prober ni = do tm <- getPOSIXTime atomically $ do spill <- readTVar (probeSpill prober) cache <- readTVar (probeCache prober) q <- readTVar (probeQueue prober) let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm case PSQ.lookup (nodeId ni) cache of Just (tm, x) -> bump (probeCache prober) x Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni) | member (nodeId ni) q -> return () | otherwise -> bump (probeQueue prober) (nodeAddr ni) maxSpill :: Int maxSpill = 100 maxCache :: Int maxCache = 50 runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO () runProbeQueue prober client maxjobs = do jcnt <- newTVarIO 0 fix $ \loop -> do (tm, mni) <- atomically $ do j <- readTVar jcnt check (j < maxjobs) q <- readTVar $ probeQueue prober case minView q of Nothing -> retry Just (Binding nid saddr tm,q') -> do writeTVar (probeQueue prober) q' return (tm, nodeInfo nid saddr) forM_ mni $ \ni -> do atomically $ modifyTVar' jcnt succ t <- forkIO $ do m <- resolvePort client ni atomically $ case m of Nothing -> do pcnt <- readTVar (probeSpillCount prober) modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm if (pcnt == maxSpill) then modifyTVar' (probeSpill prober) deleteMin else modifyTVar' (probeSpillCount prober) succ Just p -> do ccnt <- readTVar (probeCacheCount prober) modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm if (ccnt == maxCache) then modifyTVar' (probeCache prober) deleteMin else modifyTVar' (probeCacheCount prober) succ atomically $ modifyTVar' jcnt pred labelThread t ("probe."++show ni) loop getNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Maybe ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ())) getNodes prober tcp seeking dst = do r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst) dput XTCP $ "Got via TCP nodes: " ++ show r let tcps (ns,_,mb) = (ns',ns',mb) where ns' = do n <- ns [ TCP.NodeInfo n 0 ] fmap join $ forM r $ \(ns,gw) -> do let ts = tcps ns if TCP.nodeId gw == TCP.nodeId dst then return $ Just ts else do enqueueProbe prober (TCP.udpNodeInfo dst) return $ Just ts return $ Just ts nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo nodeSearch prober tcp = Search { searchSpace = TCP.tcpSpace , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort , searchQuery = Left $ getNodes prober tcp , searchAlpha = 8 , searchK = 16 }