summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-06 00:16:36 -0500
committerjoe <joe@jerkface.net>2017-11-06 00:16:36 -0500
commit159f60689ab70413d963d8103b561dd587c448d6 (patch)
treed7d2a36839f7d30424731b0b7c4d46bd129a553a /src/Network
parente3efc5455185bc984b2e49f0d54c9bded1b4f269 (diff)
Factored out Network.Kademlia.Bootstrap.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/MainlineDHT.hs1
-rw-r--r--src/Network/Kademlia.hs210
-rw-r--r--src/Network/Kademlia/Bootstrap.hs255
-rw-r--r--src/Network/Tox/DHT/Handlers.hs1
4 files changed, 257 insertions, 210 deletions
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs
index eb06564a..c7ef560c 100644
--- a/src/Network/BitTorrent/MainlineDHT.hs
+++ b/src/Network/BitTorrent/MainlineDHT.hs
@@ -54,6 +54,7 @@ import qualified Data.Wrapper.PSQInt as Int
54import Debug.Trace 54import Debug.Trace
55import Network.BitTorrent.MainlineDHT.Symbols 55import Network.BitTorrent.MainlineDHT.Symbols
56import Network.Kademlia 56import Network.Kademlia
57import Network.Kademlia.Bootstrap
57import Network.Address (Address, fromAddr, fromSockAddr, 58import Network.Address (Address, fromAddr, fromSockAddr,
58 setPort, sockAddrPort, testIdBit, 59 setPort, sockAddrPort, testIdBit,
59 toSockAddr, genBucketSample', WantIP(..), 60 toSockAddr, genBucketSample', WantIP(..),
diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs
index f1ffda19..8956df2c 100644
--- a/src/Network/Kademlia.hs
+++ b/src/Network/Kademlia.hs
@@ -178,213 +178,3 @@ insertNode (Kademlia reporter space io) node = do
178 178
179 return () 179 return ()
180 180
181
182-- TODO: Bootstrap/Refresh
183--
184-- From BEP 05:
185--
186-- Each bucket should maintain a "last changed" property to indicate how
187-- "fresh" the contents are.
188--
189-- Note: We will use a "time to next refresh" property instead and store it in
190-- a priority search queue.
191--
192-- When...
193--
194-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
195-- >>> bucketEvents =
196-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
197-- >>>
198-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
199-- >>>
200-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
201-- >>> , Applicant :--> Accepted -- with another node,
202-- >>> ]
203--
204-- the bucket's last changed property should be updated. Buckets
205-- that have not been changed in 15 minutes should be "refreshed." This is done
206-- by picking a random ID in the range of the bucket and performing a
207-- find_nodes search on it.
208--
209-- The only other possible BucketTouchEvents are as follows:
210--
211-- >>> not_handled =
212-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
213-- >>> -- (Applicant :--> Stranger)
214-- >>> -- (Applicant :--> Accepted)
215-- >>> , Accepted :--> Applicant -- Never happens
216-- >>> ]
217--
218
219-- XXX: This will be redundantly triggered twice upon every node replacement
220-- because we do not currently distinguish between standalone
221-- insertion/deletion events and an insertion/deletion pair constituting
222-- replacement.
223--
224-- It might also be better to pass the timestamp of the transition here and
225-- keep the refresh queue in better sync with the routing table by updating it
226-- within the STM monad.
227touchBucket :: KademliaSpace nid ni
228 -> POSIXTime
229 -> TVar (BucketList ni)
230 -> TVar (Int.PSQ POSIXTime)
231 -> RoutingTransition ni
232 -> STM (IO ())
233touchBucket space interval bkts psq tr
234 | (transitionedTo tr == Applicant)
235 = return $ return ()
236 | otherwise = return $ do
237 now <- getPOSIXTime
238 atomically $ do
239 let nid = kademliaLocation space (transitioningNode tr)
240 num <- R.bucketNumber space nid <$> readTVar bkts
241 modifyTVar' psq $ Int.insert num (now + interval)
242
243-- | > pollForRefresh interval queue refresh
244--
245-- Fork a refresh loop. Kill the returned thread to terminate it. The
246-- arguments are: a staleness threshold (if a bucket goes this long without
247-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
248-- schedule for each bucket, and a refresh action to be forked when a bucket
249-- excedes the staleness threshold.
250--
251-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
252-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
253-- TVar.
254forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
255forkPollForRefresh interval psq refresh = do
256 fork $ do
257 myThreadId >>= flip labelThread "pollForRefresh"
258 fix $ \again -> do
259 join $ atomically $ do
260 nextup <- Int.findMin <$> readTVar psq
261 maybe retry (return . go again) nextup
262 where
263 go again ( bktnum :-> refresh_time ) = do
264 now <- getPOSIXTime
265 case fromEnum (refresh_time - now) of
266 x | x <= 0 -> do -- Refresh time!
267 -- Move it to the back of the refresh queue.
268 atomically $ modifyTVar' psq
269 $ Int.insert bktnum (now + interval)
270 -- Now fork the refresh operation.
271 -- TODO: We should probably propogate the kill signal to this thread.
272 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
273 _ <- refresh bktnum
274 return ()
275 return ()
276 seconds -> threadDelay ( seconds * 1000000 )
277 again
278
279refreshBucket :: forall nid tok ni addr.
280 ( Show nid
281 , Serialize nid
282 , Ord nid
283 , Ord ni
284 , Ord addr
285 , Hashable nid
286 , Hashable ni ) =>
287 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
288refreshBucket sch var n = do
289 tbl <- atomically (readTVar var)
290 let count = bktCount tbl
291 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
292 sample <- if n+1 >= count -- Is this the last bucket?
293 then return nid -- Yes? Search our own id.
294 else kademliaSample (searchSpace sch) -- No? Generate a random id.
295 getEntropy
296 nid
297 (bucketRange n (n + 1 < count))
298 fin <- atomically $ newTVar False
299 resultCounter <- atomically $ newTVar Set.empty
300 let fullcount = R.defaultBucketSize
301 saveit True = writeTVar fin True >> return True
302 saveit _ = return False
303 checkBucketFull :: ni -> STM Bool
304 checkBucketFull found_node = do
305 tbl <- readTVar var
306 let counts = R.shape tbl
307 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
308 $ modifyTVar' resultCounter (Set.insert found_node)
309 resultCount <- readTVar resultCounter
310 saveit $ case drop (n - 1) counts of
311 (cnt:_) | cnt < fullcount -> True
312 _ | Set.size resultCount < fullcount -> True
313 _ -> False
314
315 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
316
317 -- Set 15 minute timeout in order to avoid overlapping refreshes.
318 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
319 then const $ return True -- Never short-circuit the last bucket.
320 else checkBucketFull
321 _ <- timeout (15*60*1000000) $ do
322 atomically $ searchIsFinished s >>= check
323 atomically $ searchCancel s
324 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
325 rcount <- atomically $ do
326 c <- Set.size <$> readTVar resultCounter
327 b <- readTVar fin
328 return $ if b then 1 else c
329 return rcount
330
331bootstrap ::
332 ( Show nid
333 , Serialize nid
334 -- , FiniteBits nid
335 , Hashable ni
336 , Hashable nid
337 , Ord ni
338 , Ord addr
339 , Ord nid
340 , Traversable t1
341 , Traversable t
342 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
343bootstrap sch var ping ns ns0 = do
344 gotPing <- atomically $ newTVar False
345
346 -- First, ping the given nodes so that they are added to
347 -- our routing table.
348 withTaskGroup "bootstrap.resume" 20 $ \g -> do
349 forM_ ns $ \n -> do
350 let lbl = show $ kademliaLocation (searchSpace sch) n
351 forkTask g lbl $ do
352 b <- ping n
353 when b $ atomically $ writeTVar gotPing True
354
355 -- We resort to the hardcoded fallback nodes only when we got no
356 -- responses. This is to lesson the burden on well-known boostrap
357 -- nodes.
358 fallback <- atomically (readTVar gotPing) >>= return . when . not
359 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
360 forM_ ns0 $ \n -> do
361 forkTask g (show $ kademliaLocation (searchSpace sch) n)
362 (void $ ping n)
363 hPutStrLn stderr "Finished bootstrap pings."
364
365 -- Now run searches until all the buckets are full. On a small network,
366 -- this may never quit.
367 --
368 -- TODO: For small networks, we should give up on filling a nearby bucket
369 -- at some point and move on to one farther away.
370 flip fix 1 $ \again cnt -> do
371 when (cnt==0) $ do
372 -- Force a delay in case the search returns too quickly
373 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
374 threadDelay (60 * 1000000)
375 tbl <- atomically $ readTVar var
376 let shp = zip (R.shape tbl) [0 .. ]
377 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
378 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
379 [] -> do
380 when (length shp < R.defaultBucketCount) $ do
381 -- Not enough buckets, keep trying.
382 hPutStrLn stderr
383 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
384 cnt <- refreshBucket sch var
385 (R.defaultBucketCount - 1)
386 again cnt
387 (size,num):_ -> do
388 hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp)
389 cnt <- refreshBucket sch var num
390 again cnt
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
new file mode 100644
index 00000000..283e054a
--- /dev/null
+++ b/src/Network/Kademlia/Bootstrap.hs
@@ -0,0 +1,255 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE DeriveFunctor #-}
3{-# LANGUAGE DeriveTraversable #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE KindSignatures #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE PatternSynonyms #-}
9{-# LANGUAGE ScopedTypeVariables #-}
10module Network.Kademlia.Bootstrap where
11
12import Data.Function
13import Data.Maybe
14import qualified Data.Set as Set
15import Data.Time.Clock (getCurrentTime)
16import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
17import Network.Kademlia.Routing as R
18#ifdef THREAD_DEBUG
19import Control.Concurrent.Lifted.Instrument
20#else
21import Control.Concurrent.Lifted
22import GHC.Conc (labelThread)
23#endif
24import Control.Concurrent.STM
25import Control.Monad
26import Data.Bits
27import Data.Hashable
28import Data.IP
29import Data.Monoid
30import Data.Serialize (Serialize)
31import Data.Time.Clock.POSIX (POSIXTime)
32import System.Entropy
33import System.Timeout
34import Text.PrettyPrint as PP hiding (($$), (<>))
35import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
36import System.IO
37
38import qualified Data.Wrapper.PSQInt as Int
39 ;import Data.Wrapper.PSQInt (pattern (:->))
40import Network.Address (bucketRange,genBucketSample)
41import Network.Kademlia.Search
42import Control.Concurrent.Tasks
43import Network.Kademlia
44
45
46-- | > pollForRefresh interval queue refresh
47--
48-- Fork a refresh loop. Kill the returned thread to terminate it. The
49-- arguments are: a staleness threshold (if a bucket goes this long without
50-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
51-- schedule for each bucket, and a refresh action to be forked when a bucket
52-- excedes the staleness threshold.
53--
54-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
55-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
56-- TVar.
57forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
58forkPollForRefresh interval psq refresh = do
59 fork $ do
60 myThreadId >>= flip labelThread "pollForRefresh"
61 fix $ \again -> do
62 join $ atomically $ do
63 nextup <- Int.findMin <$> readTVar psq
64 maybe retry (return . go again) nextup
65 where
66 go again ( bktnum :-> refresh_time ) = do
67 now <- getPOSIXTime
68 case fromEnum (refresh_time - now) of
69 x | x <= 0 -> do -- Refresh time!
70 -- Move it to the back of the refresh queue.
71 atomically $ modifyTVar' psq
72 $ Int.insert bktnum (now + interval)
73 -- Now fork the refresh operation.
74 -- TODO: We should probably propogate the kill signal to this thread.
75 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
76 _ <- refresh bktnum
77 return ()
78 return ()
79 seconds -> threadDelay ( seconds * 1000000 )
80 again
81
82refreshBucket :: forall nid tok ni addr.
83 ( Show nid
84 , Serialize nid
85 , Ord nid
86 , Ord ni
87 , Ord addr
88 , Hashable nid
89 , Hashable ni ) =>
90 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
91refreshBucket sch var n = do
92 tbl <- atomically (readTVar var)
93 let count = bktCount tbl
94 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
95 sample <- if n+1 >= count -- Is this the last bucket?
96 then return nid -- Yes? Search our own id.
97 else kademliaSample (searchSpace sch) -- No? Generate a random id.
98 getEntropy
99 nid
100 (bucketRange n (n + 1 < count))
101 fin <- atomically $ newTVar False
102 resultCounter <- atomically $ newTVar Set.empty
103 let fullcount = R.defaultBucketSize
104 saveit True = writeTVar fin True >> return True
105 saveit _ = return False
106 checkBucketFull :: ni -> STM Bool
107 checkBucketFull found_node = do
108 tbl <- readTVar var
109 let counts = R.shape tbl
110 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
111 $ modifyTVar' resultCounter (Set.insert found_node)
112 resultCount <- readTVar resultCounter
113 saveit $ case drop (n - 1) counts of
114 (cnt:_) | cnt < fullcount -> True
115 _ | Set.size resultCount < fullcount -> True
116 _ -> False
117
118 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
119
120 -- Set 15 minute timeout in order to avoid overlapping refreshes.
121 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
122 then const $ return True -- Never short-circuit the last bucket.
123 else checkBucketFull
124 _ <- timeout (15*60*1000000) $ do
125 atomically $ searchIsFinished s >>= check
126 atomically $ searchCancel s
127 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
128 rcount <- atomically $ do
129 c <- Set.size <$> readTVar resultCounter
130 b <- readTVar fin
131 return $ if b then 1 else c
132 return rcount
133
134
135bootstrap ::
136 ( Show nid
137 , Serialize nid
138 -- , FiniteBits nid
139 , Hashable ni
140 , Hashable nid
141 , Ord ni
142 , Ord addr
143 , Ord nid
144 , Traversable t1
145 , Traversable t
146 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
147bootstrap sch var ping ns ns0 = do
148 gotPing <- atomically $ newTVar False
149
150 -- First, ping the given nodes so that they are added to
151 -- our routing table.
152 withTaskGroup "bootstrap.resume" 20 $ \g -> do
153 forM_ ns $ \n -> do
154 let lbl = show $ kademliaLocation (searchSpace sch) n
155 forkTask g lbl $ do
156 b <- ping n
157 when b $ atomically $ writeTVar gotPing True
158
159 -- We resort to the hardcoded fallback nodes only when we got no
160 -- responses. This is to lesson the burden on well-known boostrap
161 -- nodes.
162 fallback <- atomically (readTVar gotPing) >>= return . when . not
163 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
164 forM_ ns0 $ \n -> do
165 forkTask g (show $ kademliaLocation (searchSpace sch) n)
166 (void $ ping n)
167 hPutStrLn stderr "Finished bootstrap pings."
168
169 -- Now run searches until all the buckets are full. On a small network,
170 -- this may never quit.
171 --
172 -- TODO: For small networks, we should give up on filling a nearby bucket
173 -- at some point and move on to one farther away.
174 flip fix 1 $ \again cnt -> do
175 when (cnt==0) $ do
176 -- Force a delay in case the search returns too quickly
177 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
178 threadDelay (60 * 1000000)
179 tbl <- atomically $ readTVar var
180 let shp = zip (R.shape tbl) [0 .. ]
181 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
182 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
183 [] -> do
184 when (length shp < R.defaultBucketCount) $ do
185 -- Not enough buckets, keep trying.
186 hPutStrLn stderr
187 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
188 cnt <- refreshBucket sch var
189 (R.defaultBucketCount - 1)
190 again cnt
191 (size,num):_ -> do
192 hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp)
193 cnt <- refreshBucket sch var num
194 again cnt
195
196-- XXX: This will be redundantly triggered twice upon every node replacement
197-- because we do not currently distinguish between standalone
198-- insertion/deletion events and an insertion/deletion pair constituting
199-- replacement.
200--
201-- It might also be better to pass the timestamp of the transition here and
202-- keep the refresh queue in better sync with the routing table by updating it
203-- within the STM monad.
204touchBucket :: KademliaSpace nid ni
205 -> POSIXTime
206 -> TVar (BucketList ni)
207 -> TVar (Int.PSQ POSIXTime)
208 -> RoutingTransition ni
209 -> STM (IO ())
210touchBucket space interval bkts psq tr
211 | (transitionedTo tr == Applicant)
212 = return $ return ()
213 | otherwise = return $ do
214 now <- getPOSIXTime
215 atomically $ do
216 let nid = kademliaLocation space (transitioningNode tr)
217 num <- R.bucketNumber space nid <$> readTVar bkts
218 modifyTVar' psq $ Int.insert num (now + interval)
219
220-- TODO: Bootstrap/Refresh
221--
222-- From BEP 05:
223--
224-- Each bucket should maintain a "last changed" property to indicate how
225-- "fresh" the contents are.
226--
227-- Note: We will use a "time to next refresh" property instead and store it in
228-- a priority search queue.
229--
230-- When...
231--
232-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
233-- >>> bucketEvents =
234-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
235-- >>>
236-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
237-- >>>
238-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
239-- >>> , Applicant :--> Accepted -- with another node,
240-- >>> ]
241--
242-- the bucket's last changed property should be updated. Buckets
243-- that have not been changed in 15 minutes should be "refreshed." This is done
244-- by picking a random ID in the range of the bucket and performing a
245-- find_nodes search on it.
246--
247-- The only other possible BucketTouchEvents are as follows:
248--
249-- >>> not_handled =
250-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
251-- >>> -- (Applicant :--> Stranger)
252-- >>> -- (Applicant :--> Accepted)
253-- >>> , Accepted :--> Applicant -- Never happens
254-- >>> ]
255--
diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs
index fc28e2d2..494e319b 100644
--- a/src/Network/Tox/DHT/Handlers.hs
+++ b/src/Network/Tox/DHT/Handlers.hs
@@ -11,6 +11,7 @@ import Crypto.Tox
11import Network.Kademlia.Search 11import Network.Kademlia.Search
12import qualified Data.Wrapper.PSQInt as Int 12import qualified Data.Wrapper.PSQInt as Int
13import Network.Kademlia 13import Network.Kademlia
14import Network.Kademlia.Bootstrap (touchBucket)
14import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort) 15import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort)
15import qualified Network.Kademlia.Routing as R 16import qualified Network.Kademlia.Routing as R
16import Control.TriadCommittee 17import Control.TriadCommittee