summaryrefslogtreecommitdiff
path: root/dht/TCPProber.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/TCPProber.hs
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/TCPProber.hs')
-rw-r--r--dht/TCPProber.hs173
1 files changed, 173 insertions, 0 deletions
diff --git a/dht/TCPProber.hs b/dht/TCPProber.hs
new file mode 100644
index 00000000..5e011116
--- /dev/null
+++ b/dht/TCPProber.hs
@@ -0,0 +1,173 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE LambdaCase #-}
3module TCPProber where
4
5#ifdef THREAD_DEBUG
6import Control.Concurrent.Lifted.Instrument
7#else
8import Control.Concurrent
9import GHC.Conc
10#endif
11
12import Control.Arrow
13import Control.Concurrent.STM
14import Control.Monad
15import Data.Function
16import Data.IP
17import Data.Maybe
18import Data.Time.Clock.POSIX
19import Network.Socket
20import System.Timeout
21
22import DPut
23import DebugTag
24import Crypto.Tox
25import Data.Wrapper.PSQ as PSQ
26import Network.Kademlia.Search
27import Network.Tox.NodeId
28import qualified Network.Tox.TCP as TCP
29
30resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber)
31resolvePort tcp ni = do
32 got <- newTVarIO Nothing
33 cnt <- newTVarIO 0
34 let n port = TCP.NodeInfo ni port
35 forkPort port = do
36 atomically $ modifyTVar' cnt succ
37 t <- forkIO $ do
38 m <- TCP.tcpPing tcp $ n port
39 atomically $ do
40 mg <- readTVar got
41 when (isNothing mg)
42 $ forM_ m $ \_ -> writeTVar got $ Just port
43 modifyTVar' cnt pred
44 labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni)
45 return t
46 readResult = atomically $ do
47 m <- readTVar got
48 case m of
49 Just v -> return $ Just v
50 Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing
51 t443 <- forkPort 443
52 t80 <- forkPort 80
53 p <- timeout 1000000 readResult >>= \case
54 Just (Just p) -> do
55 killThread t443
56 killThread t80
57 return $ Just p
58 _ -> do
59 let uport = nodePort ni
60 tudp <- forM (guard $ uport `notElem` [443,80,3389,33445])
61 $ \() -> forkPort uport
62 t3389 <- forkPort 3389
63 t33445 <- forkPort 33445
64 p <- readResult
65 mapM_ killThread [t443,t80,t3389,t33445]
66 mapM_ killThread (tudp :: Maybe ThreadId)
67 return p
68 return p
69
70data TCPProber = TCPProber
71 { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-})
72 , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-})
73 , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count.
74 , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-}))
75 , probeCacheCount :: TVar Int
76 }
77
78newProber :: STM TCPProber
79newProber = do
80 q <- newTVar PSQ.empty
81 spill <- newTVar PSQ.empty
82 spillcnt <- newTVar 0
83 cache <- newTVar PSQ.empty
84 cachecnt <- newTVar 0
85 return TCPProber
86 { probeQueue = q
87 , probeSpill = spill
88 , probeSpillCount = spillcnt
89 , probeCache = cache
90 , probeCacheCount = cachecnt
91 }
92
93
94enqueueProbe :: TCPProber -> NodeInfo -> IO ()
95enqueueProbe prober ni = do
96 tm <- getPOSIXTime
97 atomically $ do
98 spill <- readTVar (probeSpill prober)
99 cache <- readTVar (probeCache prober)
100 q <- readTVar (probeQueue prober)
101 let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm
102 case PSQ.lookup (nodeId ni) cache of
103 Just (tm, x) -> bump (probeCache prober) x
104 Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni)
105 | member (nodeId ni) q -> return ()
106 | otherwise -> bump (probeQueue prober) (nodeAddr ni)
107
108maxSpill :: Int
109maxSpill = 100
110
111maxCache :: Int
112maxCache = 50
113
114runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO ()
115runProbeQueue prober client maxjobs = do
116 jcnt <- newTVarIO 0
117 fix $ \loop -> do
118 (tm, mni) <- atomically $ do
119 j <- readTVar jcnt
120 check (j < maxjobs)
121 q <- readTVar $ probeQueue prober
122 case minView q of
123 Nothing -> retry
124 Just (Binding nid saddr tm,q') -> do
125 writeTVar (probeQueue prober) q'
126 return (tm, nodeInfo nid saddr)
127 forM_ mni $ \ni -> do
128 atomically $ modifyTVar' jcnt succ
129 t <- forkIO $ do
130 m <- resolvePort client ni
131 atomically $ case m of
132 Nothing -> do
133 pcnt <- readTVar (probeSpillCount prober)
134 modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm
135 if (pcnt == maxSpill)
136 then modifyTVar' (probeSpill prober) deleteMin
137 else modifyTVar' (probeSpillCount prober) succ
138 Just p -> do
139 ccnt <- readTVar (probeCacheCount prober)
140 modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm
141 if (ccnt == maxCache)
142 then modifyTVar' (probeCache prober) deleteMin
143 else modifyTVar' (probeCacheCount prober) succ
144 atomically $ modifyTVar' jcnt pred
145 labelThread t ("probe."++show ni)
146 loop
147
148
149getNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Maybe ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()))
150getNodes prober tcp seeking dst = do
151 r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst)
152 dput XTCP $ "Got via TCP nodes: " ++ show r
153 let tcps (ns,_,mb) = (ns',ns',mb)
154 where ns' = do
155 n <- ns
156 [ TCP.NodeInfo n 0 ]
157 fmap join $ forM r $ \(ns,gw) -> do
158 let ts = tcps ns
159 if TCP.nodeId gw == TCP.nodeId dst
160 then return $ Just ts
161 else do
162 enqueueProbe prober (TCP.udpNodeInfo dst)
163 return $ Just ts
164 return $ Just ts
165
166nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo
167nodeSearch prober tcp = Search
168 { searchSpace = TCP.tcpSpace
169 , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort
170 , searchQuery = Left $ getNodes prober tcp
171 , searchAlpha = 8
172 , searchK = 16
173 }