diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /dht/TCPProber.hs | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'dht/TCPProber.hs')
-rw-r--r-- | dht/TCPProber.hs | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/dht/TCPProber.hs b/dht/TCPProber.hs new file mode 100644 index 00000000..5e011116 --- /dev/null +++ b/dht/TCPProber.hs | |||
@@ -0,0 +1,173 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE LambdaCase #-} | ||
3 | module TCPProber where | ||
4 | |||
5 | #ifdef THREAD_DEBUG | ||
6 | import Control.Concurrent.Lifted.Instrument | ||
7 | #else | ||
8 | import Control.Concurrent | ||
9 | import GHC.Conc | ||
10 | #endif | ||
11 | |||
12 | import Control.Arrow | ||
13 | import Control.Concurrent.STM | ||
14 | import Control.Monad | ||
15 | import Data.Function | ||
16 | import Data.IP | ||
17 | import Data.Maybe | ||
18 | import Data.Time.Clock.POSIX | ||
19 | import Network.Socket | ||
20 | import System.Timeout | ||
21 | |||
22 | import DPut | ||
23 | import DebugTag | ||
24 | import Crypto.Tox | ||
25 | import Data.Wrapper.PSQ as PSQ | ||
26 | import Network.Kademlia.Search | ||
27 | import Network.Tox.NodeId | ||
28 | import qualified Network.Tox.TCP as TCP | ||
29 | |||
30 | resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber) | ||
31 | resolvePort tcp ni = do | ||
32 | got <- newTVarIO Nothing | ||
33 | cnt <- newTVarIO 0 | ||
34 | let n port = TCP.NodeInfo ni port | ||
35 | forkPort port = do | ||
36 | atomically $ modifyTVar' cnt succ | ||
37 | t <- forkIO $ do | ||
38 | m <- TCP.tcpPing tcp $ n port | ||
39 | atomically $ do | ||
40 | mg <- readTVar got | ||
41 | when (isNothing mg) | ||
42 | $ forM_ m $ \_ -> writeTVar got $ Just port | ||
43 | modifyTVar' cnt pred | ||
44 | labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni) | ||
45 | return t | ||
46 | readResult = atomically $ do | ||
47 | m <- readTVar got | ||
48 | case m of | ||
49 | Just v -> return $ Just v | ||
50 | Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing | ||
51 | t443 <- forkPort 443 | ||
52 | t80 <- forkPort 80 | ||
53 | p <- timeout 1000000 readResult >>= \case | ||
54 | Just (Just p) -> do | ||
55 | killThread t443 | ||
56 | killThread t80 | ||
57 | return $ Just p | ||
58 | _ -> do | ||
59 | let uport = nodePort ni | ||
60 | tudp <- forM (guard $ uport `notElem` [443,80,3389,33445]) | ||
61 | $ \() -> forkPort uport | ||
62 | t3389 <- forkPort 3389 | ||
63 | t33445 <- forkPort 33445 | ||
64 | p <- readResult | ||
65 | mapM_ killThread [t443,t80,t3389,t33445] | ||
66 | mapM_ killThread (tudp :: Maybe ThreadId) | ||
67 | return p | ||
68 | return p | ||
69 | |||
70 | data TCPProber = TCPProber | ||
71 | { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) | ||
72 | , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-}) | ||
73 | , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count. | ||
74 | , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-})) | ||
75 | , probeCacheCount :: TVar Int | ||
76 | } | ||
77 | |||
78 | newProber :: STM TCPProber | ||
79 | newProber = do | ||
80 | q <- newTVar PSQ.empty | ||
81 | spill <- newTVar PSQ.empty | ||
82 | spillcnt <- newTVar 0 | ||
83 | cache <- newTVar PSQ.empty | ||
84 | cachecnt <- newTVar 0 | ||
85 | return TCPProber | ||
86 | { probeQueue = q | ||
87 | , probeSpill = spill | ||
88 | , probeSpillCount = spillcnt | ||
89 | , probeCache = cache | ||
90 | , probeCacheCount = cachecnt | ||
91 | } | ||
92 | |||
93 | |||
94 | enqueueProbe :: TCPProber -> NodeInfo -> IO () | ||
95 | enqueueProbe prober ni = do | ||
96 | tm <- getPOSIXTime | ||
97 | atomically $ do | ||
98 | spill <- readTVar (probeSpill prober) | ||
99 | cache <- readTVar (probeCache prober) | ||
100 | q <- readTVar (probeQueue prober) | ||
101 | let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm | ||
102 | case PSQ.lookup (nodeId ni) cache of | ||
103 | Just (tm, x) -> bump (probeCache prober) x | ||
104 | Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni) | ||
105 | | member (nodeId ni) q -> return () | ||
106 | | otherwise -> bump (probeQueue prober) (nodeAddr ni) | ||
107 | |||
108 | maxSpill :: Int | ||
109 | maxSpill = 100 | ||
110 | |||
111 | maxCache :: Int | ||
112 | maxCache = 50 | ||
113 | |||
114 | runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO () | ||
115 | runProbeQueue prober client maxjobs = do | ||
116 | jcnt <- newTVarIO 0 | ||
117 | fix $ \loop -> do | ||
118 | (tm, mni) <- atomically $ do | ||
119 | j <- readTVar jcnt | ||
120 | check (j < maxjobs) | ||
121 | q <- readTVar $ probeQueue prober | ||
122 | case minView q of | ||
123 | Nothing -> retry | ||
124 | Just (Binding nid saddr tm,q') -> do | ||
125 | writeTVar (probeQueue prober) q' | ||
126 | return (tm, nodeInfo nid saddr) | ||
127 | forM_ mni $ \ni -> do | ||
128 | atomically $ modifyTVar' jcnt succ | ||
129 | t <- forkIO $ do | ||
130 | m <- resolvePort client ni | ||
131 | atomically $ case m of | ||
132 | Nothing -> do | ||
133 | pcnt <- readTVar (probeSpillCount prober) | ||
134 | modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm | ||
135 | if (pcnt == maxSpill) | ||
136 | then modifyTVar' (probeSpill prober) deleteMin | ||
137 | else modifyTVar' (probeSpillCount prober) succ | ||
138 | Just p -> do | ||
139 | ccnt <- readTVar (probeCacheCount prober) | ||
140 | modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm | ||
141 | if (ccnt == maxCache) | ||
142 | then modifyTVar' (probeCache prober) deleteMin | ||
143 | else modifyTVar' (probeCacheCount prober) succ | ||
144 | atomically $ modifyTVar' jcnt pred | ||
145 | labelThread t ("probe."++show ni) | ||
146 | loop | ||
147 | |||
148 | |||
149 | getNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Maybe ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ())) | ||
150 | getNodes prober tcp seeking dst = do | ||
151 | r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst) | ||
152 | dput XTCP $ "Got via TCP nodes: " ++ show r | ||
153 | let tcps (ns,_,mb) = (ns',ns',mb) | ||
154 | where ns' = do | ||
155 | n <- ns | ||
156 | [ TCP.NodeInfo n 0 ] | ||
157 | fmap join $ forM r $ \(ns,gw) -> do | ||
158 | let ts = tcps ns | ||
159 | if TCP.nodeId gw == TCP.nodeId dst | ||
160 | then return $ Just ts | ||
161 | else do | ||
162 | enqueueProbe prober (TCP.udpNodeInfo dst) | ||
163 | return $ Just ts | ||
164 | return $ Just ts | ||
165 | |||
166 | nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo | ||
167 | nodeSearch prober tcp = Search | ||
168 | { searchSpace = TCP.tcpSpace | ||
169 | , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort | ||
170 | , searchQuery = Left $ getNodes prober tcp | ||
171 | , searchAlpha = 8 | ||
172 | , searchK = 16 | ||
173 | } | ||