summaryrefslogtreecommitdiff
path: root/TCPProber.hs
diff options
context:
space:
mode:
Diffstat (limited to 'TCPProber.hs')
-rw-r--r--TCPProber.hs162
1 files changed, 162 insertions, 0 deletions
diff --git a/TCPProber.hs b/TCPProber.hs
new file mode 100644
index 00000000..8d468e53
--- /dev/null
+++ b/TCPProber.hs
@@ -0,0 +1,162 @@
1{-# LANGUAGE LambdaCase #-}
2module TCPProber where
3
4import Control.Concurrent
5import GHC.Conc
6
7import Control.Arrow
8import Control.Concurrent.STM
9import Control.Monad
10import Data.Function
11import Data.IP
12import Data.Maybe
13import Data.Time.Clock.POSIX
14import Network.Socket
15import System.Timeout
16
17import Crypto.Tox
18import Data.Wrapper.PSQ as PSQ
19import Network.Kademlia.Search
20import Network.Tox.NodeId
21import qualified Network.Tox.TCP as TCP
22
23resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber)
24resolvePort tcp ni = do
25 got <- newTVarIO Nothing
26 cnt <- newTVarIO 0
27 let n port = TCP.NodeInfo ni port
28 forkPort port = do
29 atomically $ modifyTVar' cnt succ
30 t <- forkIO $ do
31 m <- TCP.tcpPing tcp $ n port
32 atomically $ do
33 mg <- readTVar got
34 when (isNothing mg)
35 $ forM_ m $ \_ -> writeTVar got $ Just port
36 modifyTVar' cnt pred
37 labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni)
38 return t
39 readResult = atomically $ do
40 m <- readTVar got
41 case m of
42 Just v -> return $ Just v
43 Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing
44 t443 <- forkPort 443
45 t80 <- forkPort 80
46 p <- timeout 1000000 readResult >>= \case
47 Just (Just p) -> do
48 killThread t443
49 killThread t80
50 return $ Just p
51 _ -> do
52 let uport = nodePort ni
53 tudp <- forM (guard $ uport `notElem` [443,80,3389,33445])
54 $ \() -> forkPort uport
55 t3389 <- forkPort 3389
56 t33445 <- forkPort 33445
57 p <- readResult
58 mapM_ killThread [t443,t80,t3389,t33445]
59 mapM_ killThread (tudp :: Maybe ThreadId)
60 return p
61 return p
62
63data TCPProber = TCPProber
64 { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-})
65 , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-})
66 , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count.
67 , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-}))
68 , probeCacheCount :: TVar Int
69 }
70
71newProber :: STM TCPProber
72newProber = do
73 q <- newTVar PSQ.empty
74 spill <- newTVar PSQ.empty
75 spillcnt <- newTVar 0
76 cache <- newTVar PSQ.empty
77 cachecnt <- newTVar 0
78 return TCPProber
79 { probeQueue = q
80 , probeSpill = spill
81 , probeSpillCount = spillcnt
82 , probeCache = cache
83 , probeCacheCount = cachecnt
84 }
85
86
87enqueueProbe :: TCPProber -> NodeInfo -> IO ()
88enqueueProbe prober ni = do
89 tm <- getPOSIXTime
90 atomically $ do
91 spill <- readTVar (probeSpill prober)
92 cache <- readTVar (probeCache prober)
93 q <- readTVar (probeQueue prober)
94 let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm
95 case PSQ.lookup (nodeId ni) cache of
96 Just (tm, x) -> bump (probeCache prober) x
97 Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni)
98 | member (nodeId ni) q -> return ()
99 | otherwise -> bump (probeQueue prober) (nodeAddr ni)
100
101maxSpill :: Int
102maxSpill = 100
103
104maxCache :: Int
105maxCache = 50
106
107runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO ()
108runProbeQueue prober client maxjobs = do
109 jcnt <- newTVarIO 0
110 fix $ \loop -> do
111 (tm, mni) <- atomically $ do
112 j <- readTVar jcnt
113 check (j < maxjobs)
114 q <- readTVar $ probeQueue prober
115 case minView q of
116 Nothing -> retry
117 Just (Binding nid saddr tm,q') -> do
118 writeTVar (probeQueue prober) q'
119 return (tm, nodeInfo nid saddr)
120 forM_ mni $ \ni -> do
121 atomically $ modifyTVar' jcnt succ
122 t <- forkIO $ do
123 m <- resolvePort client ni
124 atomically $ case m of
125 Nothing -> do
126 pcnt <- readTVar (probeSpillCount prober)
127 modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm
128 if (pcnt == maxSpill)
129 then modifyTVar' (probeSpill prober) deleteMin
130 else modifyTVar' (probeSpillCount prober) succ
131 Just p -> do
132 ccnt <- readTVar (probeCacheCount prober)
133 modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm
134 if (ccnt == maxCache)
135 then modifyTVar' (probeCache prober) deleteMin
136 else modifyTVar' (probeCacheCount prober) succ
137 atomically $ modifyTVar' jcnt pred
138 labelThread t ("probe."++show ni)
139 loop
140
141getNodes :: TCPProber -> TCP.TCPClient err () Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Maybe ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()))
142getNodes prober tcp seeking dst = do
143 r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst)
144 let tcps (ns,_,mb) = (ns',ns',mb)
145 where ns' = do
146 n <- ns
147 [ TCP.NodeInfo n 0 ]
148 fmap join $ forM r $ \(ns,gw) -> do
149 let ts = tcps ns
150 if TCP.nodeId gw == TCP.nodeId dst
151 then return $ Just ts
152 else do
153 enqueueProbe prober (TCP.udpNodeInfo dst)
154 return $ Just ts
155 return $ Just ts
156
157nodeSearch :: TCPProber -> TCP.TCPClient err () Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo
158nodeSearch prober tcp = Search
159 { searchSpace = TCP.tcpSpace
160 , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort
161 , searchQuery = getNodes prober tcp
162 }