diff options
Diffstat (limited to 'src/Network/Kademlia/Bootstrap.hs')
-rw-r--r-- | src/Network/Kademlia/Bootstrap.hs | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs new file mode 100644 index 00000000..283e054a --- /dev/null +++ b/src/Network/Kademlia/Bootstrap.hs | |||
@@ -0,0 +1,255 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE DeriveFunctor #-} | ||
3 | {-# LANGUAGE DeriveTraversable #-} | ||
4 | {-# LANGUAGE FlexibleContexts #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | {-# LANGUAGE KindSignatures #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | ||
8 | {-# LANGUAGE PatternSynonyms #-} | ||
9 | {-# LANGUAGE ScopedTypeVariables #-} | ||
10 | module Network.Kademlia.Bootstrap where | ||
11 | |||
12 | import Data.Function | ||
13 | import Data.Maybe | ||
14 | import qualified Data.Set as Set | ||
15 | import Data.Time.Clock (getCurrentTime) | ||
16 | import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) | ||
17 | import Network.Kademlia.Routing as R | ||
18 | #ifdef THREAD_DEBUG | ||
19 | import Control.Concurrent.Lifted.Instrument | ||
20 | #else | ||
21 | import Control.Concurrent.Lifted | ||
22 | import GHC.Conc (labelThread) | ||
23 | #endif | ||
24 | import Control.Concurrent.STM | ||
25 | import Control.Monad | ||
26 | import Data.Bits | ||
27 | import Data.Hashable | ||
28 | import Data.IP | ||
29 | import Data.Monoid | ||
30 | import Data.Serialize (Serialize) | ||
31 | import Data.Time.Clock.POSIX (POSIXTime) | ||
32 | import System.Entropy | ||
33 | import System.Timeout | ||
34 | import Text.PrettyPrint as PP hiding (($$), (<>)) | ||
35 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) | ||
36 | import System.IO | ||
37 | |||
38 | import qualified Data.Wrapper.PSQInt as Int | ||
39 | ;import Data.Wrapper.PSQInt (pattern (:->)) | ||
40 | import Network.Address (bucketRange,genBucketSample) | ||
41 | import Network.Kademlia.Search | ||
42 | import Control.Concurrent.Tasks | ||
43 | import Network.Kademlia | ||
44 | |||
45 | |||
46 | -- | > pollForRefresh interval queue refresh | ||
47 | -- | ||
48 | -- Fork a refresh loop. Kill the returned thread to terminate it. The | ||
49 | -- arguments are: a staleness threshold (if a bucket goes this long without | ||
50 | -- being touched, a refresh will be triggered), a TVar with the time-to-refresh | ||
51 | -- schedule for each bucket, and a refresh action to be forked when a bucket | ||
52 | -- excedes the staleness threshold. | ||
53 | -- | ||
54 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's | ||
55 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the | ||
56 | -- TVar. | ||
57 | forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId | ||
58 | forkPollForRefresh interval psq refresh = do | ||
59 | fork $ do | ||
60 | myThreadId >>= flip labelThread "pollForRefresh" | ||
61 | fix $ \again -> do | ||
62 | join $ atomically $ do | ||
63 | nextup <- Int.findMin <$> readTVar psq | ||
64 | maybe retry (return . go again) nextup | ||
65 | where | ||
66 | go again ( bktnum :-> refresh_time ) = do | ||
67 | now <- getPOSIXTime | ||
68 | case fromEnum (refresh_time - now) of | ||
69 | x | x <= 0 -> do -- Refresh time! | ||
70 | -- Move it to the back of the refresh queue. | ||
71 | atomically $ modifyTVar' psq | ||
72 | $ Int.insert bktnum (now + interval) | ||
73 | -- Now fork the refresh operation. | ||
74 | -- TODO: We should probably propogate the kill signal to this thread. | ||
75 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | ||
76 | _ <- refresh bktnum | ||
77 | return () | ||
78 | return () | ||
79 | seconds -> threadDelay ( seconds * 1000000 ) | ||
80 | again | ||
81 | |||
82 | refreshBucket :: forall nid tok ni addr. | ||
83 | ( Show nid | ||
84 | , Serialize nid | ||
85 | , Ord nid | ||
86 | , Ord ni | ||
87 | , Ord addr | ||
88 | , Hashable nid | ||
89 | , Hashable ni ) => | ||
90 | Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int | ||
91 | refreshBucket sch var n = do | ||
92 | tbl <- atomically (readTVar var) | ||
93 | let count = bktCount tbl | ||
94 | nid = kademliaLocation (searchSpace sch) (thisNode tbl) | ||
95 | sample <- if n+1 >= count -- Is this the last bucket? | ||
96 | then return nid -- Yes? Search our own id. | ||
97 | else kademliaSample (searchSpace sch) -- No? Generate a random id. | ||
98 | getEntropy | ||
99 | nid | ||
100 | (bucketRange n (n + 1 < count)) | ||
101 | fin <- atomically $ newTVar False | ||
102 | resultCounter <- atomically $ newTVar Set.empty | ||
103 | let fullcount = R.defaultBucketSize | ||
104 | saveit True = writeTVar fin True >> return True | ||
105 | saveit _ = return False | ||
106 | checkBucketFull :: ni -> STM Bool | ||
107 | checkBucketFull found_node = do | ||
108 | tbl <- readTVar var | ||
109 | let counts = R.shape tbl | ||
110 | when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) | ||
111 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
112 | resultCount <- readTVar resultCounter | ||
113 | saveit $ case drop (n - 1) counts of | ||
114 | (cnt:_) | cnt < fullcount -> True | ||
115 | _ | Set.size resultCount < fullcount -> True | ||
116 | _ -> False | ||
117 | |||
118 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) | ||
119 | |||
120 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | ||
121 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | ||
122 | then const $ return True -- Never short-circuit the last bucket. | ||
123 | else checkBucketFull | ||
124 | _ <- timeout (15*60*1000000) $ do | ||
125 | atomically $ searchIsFinished s >>= check | ||
126 | atomically $ searchCancel s | ||
127 | hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) | ||
128 | rcount <- atomically $ do | ||
129 | c <- Set.size <$> readTVar resultCounter | ||
130 | b <- readTVar fin | ||
131 | return $ if b then 1 else c | ||
132 | return rcount | ||
133 | |||
134 | |||
135 | bootstrap :: | ||
136 | ( Show nid | ||
137 | , Serialize nid | ||
138 | -- , FiniteBits nid | ||
139 | , Hashable ni | ||
140 | , Hashable nid | ||
141 | , Ord ni | ||
142 | , Ord addr | ||
143 | , Ord nid | ||
144 | , Traversable t1 | ||
145 | , Traversable t | ||
146 | ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () | ||
147 | bootstrap sch var ping ns ns0 = do | ||
148 | gotPing <- atomically $ newTVar False | ||
149 | |||
150 | -- First, ping the given nodes so that they are added to | ||
151 | -- our routing table. | ||
152 | withTaskGroup "bootstrap.resume" 20 $ \g -> do | ||
153 | forM_ ns $ \n -> do | ||
154 | let lbl = show $ kademliaLocation (searchSpace sch) n | ||
155 | forkTask g lbl $ do | ||
156 | b <- ping n | ||
157 | when b $ atomically $ writeTVar gotPing True | ||
158 | |||
159 | -- We resort to the hardcoded fallback nodes only when we got no | ||
160 | -- responses. This is to lesson the burden on well-known boostrap | ||
161 | -- nodes. | ||
162 | fallback <- atomically (readTVar gotPing) >>= return . when . not | ||
163 | fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do | ||
164 | forM_ ns0 $ \n -> do | ||
165 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | ||
166 | (void $ ping n) | ||
167 | hPutStrLn stderr "Finished bootstrap pings." | ||
168 | |||
169 | -- Now run searches until all the buckets are full. On a small network, | ||
170 | -- this may never quit. | ||
171 | -- | ||
172 | -- TODO: For small networks, we should give up on filling a nearby bucket | ||
173 | -- at some point and move on to one farther away. | ||
174 | flip fix 1 $ \again cnt -> do | ||
175 | when (cnt==0) $ do | ||
176 | -- Force a delay in case the search returns too quickly | ||
177 | hPutStrLn stderr $ "Zero results, forcing 1 minute delay" | ||
178 | threadDelay (60 * 1000000) | ||
179 | tbl <- atomically $ readTVar var | ||
180 | let shp = zip (R.shape tbl) [0 .. ] | ||
181 | unfull = filter ( (< R.defaultBucketSize) . fst ) shp | ||
182 | case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of | ||
183 | [] -> do | ||
184 | when (length shp < R.defaultBucketCount) $ do | ||
185 | -- Not enough buckets, keep trying. | ||
186 | hPutStrLn stderr | ||
187 | $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) | ||
188 | cnt <- refreshBucket sch var | ||
189 | (R.defaultBucketCount - 1) | ||
190 | again cnt | ||
191 | (size,num):_ -> do | ||
192 | hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp) | ||
193 | cnt <- refreshBucket sch var num | ||
194 | again cnt | ||
195 | |||
196 | -- XXX: This will be redundantly triggered twice upon every node replacement | ||
197 | -- because we do not currently distinguish between standalone | ||
198 | -- insertion/deletion events and an insertion/deletion pair constituting | ||
199 | -- replacement. | ||
200 | -- | ||
201 | -- It might also be better to pass the timestamp of the transition here and | ||
202 | -- keep the refresh queue in better sync with the routing table by updating it | ||
203 | -- within the STM monad. | ||
204 | touchBucket :: KademliaSpace nid ni | ||
205 | -> POSIXTime | ||
206 | -> TVar (BucketList ni) | ||
207 | -> TVar (Int.PSQ POSIXTime) | ||
208 | -> RoutingTransition ni | ||
209 | -> STM (IO ()) | ||
210 | touchBucket space interval bkts psq tr | ||
211 | | (transitionedTo tr == Applicant) | ||
212 | = return $ return () | ||
213 | | otherwise = return $ do | ||
214 | now <- getPOSIXTime | ||
215 | atomically $ do | ||
216 | let nid = kademliaLocation space (transitioningNode tr) | ||
217 | num <- R.bucketNumber space nid <$> readTVar bkts | ||
218 | modifyTVar' psq $ Int.insert num (now + interval) | ||
219 | |||
220 | -- TODO: Bootstrap/Refresh | ||
221 | -- | ||
222 | -- From BEP 05: | ||
223 | -- | ||
224 | -- Each bucket should maintain a "last changed" property to indicate how | ||
225 | -- "fresh" the contents are. | ||
226 | -- | ||
227 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
228 | -- a priority search queue. | ||
229 | -- | ||
230 | -- When... | ||
231 | -- | ||
232 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
233 | -- >>> bucketEvents = | ||
234 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
235 | -- >>> | ||
236 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
237 | -- >>> | ||
238 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
239 | -- >>> , Applicant :--> Accepted -- with another node, | ||
240 | -- >>> ] | ||
241 | -- | ||
242 | -- the bucket's last changed property should be updated. Buckets | ||
243 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
244 | -- by picking a random ID in the range of the bucket and performing a | ||
245 | -- find_nodes search on it. | ||
246 | -- | ||
247 | -- The only other possible BucketTouchEvents are as follows: | ||
248 | -- | ||
249 | -- >>> not_handled = | ||
250 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
251 | -- >>> -- (Applicant :--> Stranger) | ||
252 | -- >>> -- (Applicant :--> Accepted) | ||
253 | -- >>> , Accepted :--> Applicant -- Never happens | ||
254 | -- >>> ] | ||
255 | -- | ||