diff options
author | Joe Crayne <joe@jerkface.net> | 2018-12-12 02:01:39 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-12-16 14:08:26 -0500 |
commit | d96234b9954fb2e41521eb437edf2fee7317f7d6 (patch) | |
tree | 7a97ad1a7b4d01f1c9d74dcb93fae29d075bb76c /TCPProber.hs | |
parent | 1819d80705986d36c3264f60d05a5383c73bc33f (diff) |
TCP Prober.
Diffstat (limited to 'TCPProber.hs')
-rw-r--r-- | TCPProber.hs | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/TCPProber.hs b/TCPProber.hs new file mode 100644 index 00000000..8d468e53 --- /dev/null +++ b/TCPProber.hs | |||
@@ -0,0 +1,162 @@ | |||
1 | {-# LANGUAGE LambdaCase #-} | ||
2 | module TCPProber where | ||
3 | |||
4 | import Control.Concurrent | ||
5 | import GHC.Conc | ||
6 | |||
7 | import Control.Arrow | ||
8 | import Control.Concurrent.STM | ||
9 | import Control.Monad | ||
10 | import Data.Function | ||
11 | import Data.IP | ||
12 | import Data.Maybe | ||
13 | import Data.Time.Clock.POSIX | ||
14 | import Network.Socket | ||
15 | import System.Timeout | ||
16 | |||
17 | import Crypto.Tox | ||
18 | import Data.Wrapper.PSQ as PSQ | ||
19 | import Network.Kademlia.Search | ||
20 | import Network.Tox.NodeId | ||
21 | import qualified Network.Tox.TCP as TCP | ||
22 | |||
23 | resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber) | ||
24 | resolvePort tcp ni = do | ||
25 | got <- newTVarIO Nothing | ||
26 | cnt <- newTVarIO 0 | ||
27 | let n port = TCP.NodeInfo ni port | ||
28 | forkPort port = do | ||
29 | atomically $ modifyTVar' cnt succ | ||
30 | t <- forkIO $ do | ||
31 | m <- TCP.tcpPing tcp $ n port | ||
32 | atomically $ do | ||
33 | mg <- readTVar got | ||
34 | when (isNothing mg) | ||
35 | $ forM_ m $ \_ -> writeTVar got $ Just port | ||
36 | modifyTVar' cnt pred | ||
37 | labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni) | ||
38 | return t | ||
39 | readResult = atomically $ do | ||
40 | m <- readTVar got | ||
41 | case m of | ||
42 | Just v -> return $ Just v | ||
43 | Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing | ||
44 | t443 <- forkPort 443 | ||
45 | t80 <- forkPort 80 | ||
46 | p <- timeout 1000000 readResult >>= \case | ||
47 | Just (Just p) -> do | ||
48 | killThread t443 | ||
49 | killThread t80 | ||
50 | return $ Just p | ||
51 | _ -> do | ||
52 | let uport = nodePort ni | ||
53 | tudp <- forM (guard $ uport `notElem` [443,80,3389,33445]) | ||
54 | $ \() -> forkPort uport | ||
55 | t3389 <- forkPort 3389 | ||
56 | t33445 <- forkPort 33445 | ||
57 | p <- readResult | ||
58 | mapM_ killThread [t443,t80,t3389,t33445] | ||
59 | mapM_ killThread (tudp :: Maybe ThreadId) | ||
60 | return p | ||
61 | return p | ||
62 | |||
63 | data TCPProber = TCPProber | ||
64 | { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) | ||
65 | , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) | ||
66 | , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count. | ||
67 | , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-})) | ||
68 | , probeCacheCount :: TVar Int | ||
69 | } | ||
70 | |||
71 | newProber :: STM TCPProber | ||
72 | newProber = do | ||
73 | q <- newTVar PSQ.empty | ||
74 | spill <- newTVar PSQ.empty | ||
75 | spillcnt <- newTVar 0 | ||
76 | cache <- newTVar PSQ.empty | ||
77 | cachecnt <- newTVar 0 | ||
78 | return TCPProber | ||
79 | { probeQueue = q | ||
80 | , probeSpill = spill | ||
81 | , probeSpillCount = spillcnt | ||
82 | , probeCache = cache | ||
83 | , probeCacheCount = cachecnt | ||
84 | } | ||
85 | |||
86 | |||
87 | enqueueProbe :: TCPProber -> NodeInfo -> IO () | ||
88 | enqueueProbe prober ni = do | ||
89 | tm <- getPOSIXTime | ||
90 | atomically $ do | ||
91 | spill <- readTVar (probeSpill prober) | ||
92 | cache <- readTVar (probeCache prober) | ||
93 | q <- readTVar (probeQueue prober) | ||
94 | let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm | ||
95 | case PSQ.lookup (nodeId ni) cache of | ||
96 | Just (tm, x) -> bump (probeCache prober) x | ||
97 | Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni) | ||
98 | | member (nodeId ni) q -> return () | ||
99 | | otherwise -> bump (probeQueue prober) (nodeAddr ni) | ||
100 | |||
101 | maxSpill :: Int | ||
102 | maxSpill = 100 | ||
103 | |||
104 | maxCache :: Int | ||
105 | maxCache = 50 | ||
106 | |||
107 | runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO () | ||
108 | runProbeQueue prober client maxjobs = do | ||
109 | jcnt <- newTVarIO 0 | ||
110 | fix $ \loop -> do | ||
111 | (tm, mni) <- atomically $ do | ||
112 | j <- readTVar jcnt | ||
113 | check (j < maxjobs) | ||
114 | q <- readTVar $ probeQueue prober | ||
115 | case minView q of | ||
116 | Nothing -> retry | ||
117 | Just (Binding nid saddr tm,q') -> do | ||
118 | writeTVar (probeQueue prober) q' | ||
119 | return (tm, nodeInfo nid saddr) | ||
120 | forM_ mni $ \ni -> do | ||
121 | atomically $ modifyTVar' jcnt succ | ||
122 | t <- forkIO $ do | ||
123 | m <- resolvePort client ni | ||
124 | atomically $ case m of | ||
125 | Nothing -> do | ||
126 | pcnt <- readTVar (probeSpillCount prober) | ||
127 | modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm | ||
128 | if (pcnt == maxSpill) | ||
129 | then modifyTVar' (probeSpill prober) deleteMin | ||
130 | else modifyTVar' (probeSpillCount prober) succ | ||
131 | Just p -> do | ||
132 | ccnt <- readTVar (probeCacheCount prober) | ||
133 | modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm | ||
134 | if (ccnt == maxCache) | ||
135 | then modifyTVar' (probeCache prober) deleteMin | ||
136 | else modifyTVar' (probeCacheCount prober) succ | ||
137 | atomically $ modifyTVar' jcnt pred | ||
138 | labelThread t ("probe."++show ni) | ||
139 | loop | ||
140 | |||
141 | getNodes :: TCPProber -> TCP.TCPClient err () Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Maybe ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ())) | ||
142 | getNodes prober tcp seeking dst = do | ||
143 | r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst) | ||
144 | let tcps (ns,_,mb) = (ns',ns',mb) | ||
145 | where ns' = do | ||
146 | n <- ns | ||
147 | [ TCP.NodeInfo n 0 ] | ||
148 | fmap join $ forM r $ \(ns,gw) -> do | ||
149 | let ts = tcps ns | ||
150 | if TCP.nodeId gw == TCP.nodeId dst | ||
151 | then return $ Just ts | ||
152 | else do | ||
153 | enqueueProbe prober (TCP.udpNodeInfo dst) | ||
154 | return $ Just ts | ||
155 | return $ Just ts | ||
156 | |||
157 | nodeSearch :: TCPProber -> TCP.TCPClient err () Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo | ||
158 | nodeSearch prober tcp = Search | ||
159 | { searchSpace = TCP.tcpSpace | ||
160 | , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort | ||
161 | , searchQuery = getNodes prober tcp | ||
162 | } | ||