diff options
author | joe <joe@jerkface.net> | 2017-09-15 03:31:29 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-09-15 03:31:29 -0400 |
commit | 16631d46cd6c64c81cbc7dd3fa33afdeb6ea2366 (patch) | |
tree | 9f679fb097f3e33aa3fbba5992057c64d9fd3735 /src/Network/Kademlia.hs | |
parent | 718f00853fa3b42940d6544c054bb23fb38ba0c2 (diff) |
Moved Kademlia to hierarchical location.
Diffstat (limited to 'src/Network/Kademlia.hs')
-rw-r--r-- | src/Network/Kademlia.hs | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs new file mode 100644 index 00000000..c6c59ae6 --- /dev/null +++ b/src/Network/Kademlia.hs | |||
@@ -0,0 +1,380 @@ | |||
1 | {-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} | ||
2 | {-# LANGUAGE KindSignatures #-} | ||
3 | {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} | ||
4 | -- {-# LANGUAGE TypeFamilies #-} | ||
5 | {-# LANGUAGE GADTs #-} | ||
6 | {-# LANGUAGE PatternSynonyms #-} | ||
7 | module Network.Kademlia where | ||
8 | |||
9 | import Data.Function | ||
10 | import Data.Maybe | ||
11 | import qualified Data.Set as Set | ||
12 | import Data.Time.Clock (getCurrentTime) | ||
13 | import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) | ||
14 | import Network.DHT.Routing as R | ||
15 | #ifdef THREAD_DEBUG | ||
16 | import Control.Concurrent.Lifted.Instrument | ||
17 | #else | ||
18 | import Control.Concurrent.Lifted | ||
19 | import GHC.Conc (labelThread) | ||
20 | #endif | ||
21 | import Control.Concurrent.STM | ||
22 | import Control.Monad | ||
23 | import Data.Bits | ||
24 | import Data.Hashable | ||
25 | import Data.IP | ||
26 | import Data.Monoid | ||
27 | import Data.Serialize (Serialize) | ||
28 | import Data.Time.Clock.POSIX (POSIXTime) | ||
29 | import qualified Data.Wrapper.PSQInt as Int | ||
30 | ;import Data.Wrapper.PSQInt (pattern (:->)) | ||
31 | import Network.Address (bucketRange,genBucketSample) | ||
32 | import Network.BitTorrent.DHT.Search | ||
33 | import System.Entropy | ||
34 | import System.Timeout | ||
35 | import Text.PrettyPrint as PP hiding (($$), (<>)) | ||
36 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) | ||
37 | import System.IO | ||
38 | import Tasks | ||
39 | |||
40 | -- | The status of a given node with respect to a given routint table. | ||
41 | data RoutingStatus | ||
42 | = Stranger -- ^ The node is unknown to the Kademlia routing table. | ||
43 | | Applicant -- ^ The node may be inserted pending a ping timeout. | ||
44 | | Accepted -- ^ The node has a slot in one of the Kademlia buckets. | ||
45 | deriving (Eq,Ord,Enum,Show,Read) | ||
46 | |||
47 | -- | A change occured in the kademlia routing table. | ||
48 | data RoutingTransition ni = RoutingTransition | ||
49 | { transitioningNode :: ni | ||
50 | , transitionedTo :: !RoutingStatus | ||
51 | } | ||
52 | deriving (Eq,Ord,Show,Read) | ||
53 | |||
54 | data InsertionReporter ni = InsertionReporter | ||
55 | { -- | Called on every inbound packet. | ||
56 | reportArrival :: POSIXTime | ||
57 | -> ni -- ^ Origin of packet. | ||
58 | -> [ni] -- ^ These will be pinged as a result. | ||
59 | -> IO () | ||
60 | -- | Called on every ping probe. | ||
61 | , reportPingResult :: POSIXTime | ||
62 | -> ni -- ^ Who was pinged. | ||
63 | -> Bool -- ^ True if they ponged. | ||
64 | -> IO () | ||
65 | } | ||
66 | |||
67 | quietInsertions :: InsertionReporter ni | ||
68 | quietInsertions = InsertionReporter | ||
69 | { reportArrival = \_ _ _ -> return () | ||
70 | , reportPingResult = \_ _ _ -> return () | ||
71 | } | ||
72 | |||
73 | contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t | ||
74 | contramapIR f ir = InsertionReporter | ||
75 | { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) | ||
76 | , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b | ||
77 | } | ||
78 | |||
79 | -- | All the IO operations neccessary to maintain a Kademlia routing table. | ||
80 | data TableStateIO ni = TableStateIO | ||
81 | { -- | Write the routing table. Typically 'writeTVar'. | ||
82 | tblWrite :: R.BucketList ni -> STM () | ||
83 | |||
84 | -- | Read the routing table. Typically 'readTVar'. | ||
85 | , tblRead :: STM (R.BucketList ni) | ||
86 | |||
87 | -- | Issue a ping to a remote node and report 'True' if the node | ||
88 | -- responded within an acceptable time and 'False' otherwise. | ||
89 | , tblPing :: ni -> IO Bool | ||
90 | |||
91 | -- | Convenience method provided to assist in maintaining state | ||
92 | -- consistent with the routing table. It will be invoked in the same | ||
93 | -- transaction that 'tblRead'\/'tblWrite' occured but only when there was | ||
94 | -- an interesting change. The returned IO action will be triggered soon | ||
95 | -- afterward. | ||
96 | -- | ||
97 | -- It is not necessary to do anything interesting here. The following | ||
98 | -- trivial implementation is fine: | ||
99 | -- | ||
100 | -- > tblTransition = const $ return $ return () | ||
101 | , tblTransition :: RoutingTransition ni -> STM (IO ()) | ||
102 | } | ||
103 | |||
104 | vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni | ||
105 | vanillaIO var ping = TableStateIO | ||
106 | { tblRead = readTVar var | ||
107 | , tblWrite = writeTVar var | ||
108 | , tblPing = ping | ||
109 | , tblTransition = const $ return $ return () | ||
110 | } | ||
111 | |||
112 | -- | Everything neccessary to maintain a routing table of /ni/ (node | ||
113 | -- information) entries. | ||
114 | data Kademlia nid ni = Kademlia (InsertionReporter ni) | ||
115 | (KademliaSpace nid ni) | ||
116 | (TableStateIO ni) | ||
117 | |||
118 | |||
119 | -- Helper to 'insertNode'. | ||
120 | -- | ||
121 | -- Adapt return value from 'updateForPingResult' into a | ||
122 | -- more easily groked list of transitions. | ||
123 | transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni] | ||
124 | transition (x,m) = | ||
125 | -- | Just _ <- m = Node transition: Accepted --> Stranger | ||
126 | -- | Nothing <- m = Node transition: Applicant --> Stranger | ||
127 | RoutingTransition x Stranger | ||
128 | : maybeToList (accepted <$> m) | ||
129 | |||
130 | -- Helper to 'transition' | ||
131 | -- | ||
132 | -- Node transition: Applicant --> Accepted | ||
133 | accepted :: (t,ni) -> RoutingTransition ni | ||
134 | accepted (_,y) = RoutingTransition y Accepted | ||
135 | |||
136 | |||
137 | insertNode :: Kademlia nid ni -> ni -> IO () | ||
138 | insertNode (Kademlia reporter space io) node = do | ||
139 | |||
140 | tm <- utcTimeToPOSIXSeconds <$> getCurrentTime | ||
141 | |||
142 | (ps,reaction) <- atomically $ do | ||
143 | tbl <- tblRead io | ||
144 | let (inserted, ps,t') = R.updateForInbound space tm node tbl | ||
145 | tblWrite io t' | ||
146 | reaction <- case ps of | ||
147 | _ | inserted -> -- Node transition: Stranger --> Accepted | ||
148 | tblTransition io $ RoutingTransition node Accepted | ||
149 | (_:_) -> -- Node transition: Stranger --> Applicant | ||
150 | tblTransition io $ RoutingTransition node Applicant | ||
151 | _ -> return $ return () | ||
152 | return (ps, reaction) | ||
153 | |||
154 | reportArrival reporter tm node ps | ||
155 | reaction | ||
156 | |||
157 | _ <- fork $ do | ||
158 | myThreadId >>= flip labelThread "pingResults" | ||
159 | forM_ ps $ \n -> do | ||
160 | b <- tblPing io n | ||
161 | reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result | ||
162 | join $ atomically $ do | ||
163 | tbl <- tblRead io | ||
164 | let (replacements, t') = R.updateForPingResult space n b tbl | ||
165 | tblWrite io t' | ||
166 | ios <- sequence $ concatMap | ||
167 | (map (tblTransition io) . transition) | ||
168 | replacements | ||
169 | return $ sequence_ ios | ||
170 | |||
171 | return () | ||
172 | |||
173 | |||
174 | -- TODO: Bootstrap/Refresh | ||
175 | -- | ||
176 | -- From BEP 05: | ||
177 | -- | ||
178 | -- Each bucket should maintain a "last changed" property to indicate how | ||
179 | -- "fresh" the contents are. | ||
180 | -- | ||
181 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
182 | -- a priority search queue. | ||
183 | -- | ||
184 | -- When... | ||
185 | -- | ||
186 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
187 | -- >>> bucketEvents = | ||
188 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
189 | -- >>> | ||
190 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
191 | -- >>> | ||
192 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
193 | -- >>> , Applicant :--> Accepted -- with another node, | ||
194 | -- >>> ] | ||
195 | -- | ||
196 | -- the bucket's last changed property should be updated. Buckets | ||
197 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
198 | -- by picking a random ID in the range of the bucket and performing a | ||
199 | -- find_nodes search on it. | ||
200 | -- | ||
201 | -- The only other possible BucketTouchEvents are as follows: | ||
202 | -- | ||
203 | -- >>> not_handled = | ||
204 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
205 | -- >>> -- (Applicant :--> Stranger) | ||
206 | -- >>> -- (Applicant :--> Accepted) | ||
207 | -- >>> , Accepted :--> Applicant -- Never happens | ||
208 | -- >>> ] | ||
209 | -- | ||
210 | |||
211 | -- XXX: This will be redundantly triggered twice upon every node replacement | ||
212 | -- because we do not currently distinguish between standalone | ||
213 | -- insertion/deletion events and an insertion/deletion pair constituting | ||
214 | -- replacement. | ||
215 | -- | ||
216 | -- It might also be better to pass the timestamp of the transition here and | ||
217 | -- keep the refresh queue in better sync with the routing table by updating it | ||
218 | -- within the STM monad. | ||
219 | touchBucket :: KademliaSpace nid ni | ||
220 | -> POSIXTime | ||
221 | -> TVar (BucketList ni) | ||
222 | -> TVar (Int.PSQ POSIXTime) | ||
223 | -> RoutingTransition ni | ||
224 | -> STM (IO ()) | ||
225 | touchBucket space interval bkts psq tr | ||
226 | | (transitionedTo tr == Applicant) | ||
227 | = return $ return () | ||
228 | | otherwise = return $ do | ||
229 | now <- getPOSIXTime | ||
230 | atomically $ do | ||
231 | let nid = kademliaLocation space (transitioningNode tr) | ||
232 | num <- R.bucketNumber space nid <$> readTVar bkts | ||
233 | modifyTVar' psq $ Int.insert num (now + interval) | ||
234 | |||
235 | -- | > pollForRefresh interval queue refresh | ||
236 | -- | ||
237 | -- Fork a refresh loop. Kill the returned thread to terminate it. The | ||
238 | -- arguments are: a staleness threshold (if a bucket goes this long without | ||
239 | -- being touched, a refresh will be triggered), a TVar with the time-to-refresh | ||
240 | -- schedule for each bucket, and a refresh action to be forked when a bucket | ||
241 | -- excedes the staleness threshold. | ||
242 | -- | ||
243 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's | ||
244 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the | ||
245 | -- TVar. | ||
246 | forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId | ||
247 | forkPollForRefresh interval psq refresh = do | ||
248 | fork $ do | ||
249 | myThreadId >>= flip labelThread "pollForRefresh" | ||
250 | fix $ \again -> do | ||
251 | join $ atomically $ do | ||
252 | nextup <- Int.findMin <$> readTVar psq | ||
253 | maybe retry (return . go again) nextup | ||
254 | where | ||
255 | go again ( bktnum :-> refresh_time ) = do | ||
256 | now <- getPOSIXTime | ||
257 | case fromEnum (refresh_time - now) of | ||
258 | x | x <= 0 -> do -- Refresh time! | ||
259 | -- Move it to the back of the refresh queue. | ||
260 | atomically $ modifyTVar' psq | ||
261 | $ Int.insert bktnum (now + interval) | ||
262 | -- Now fork the refresh operation. | ||
263 | -- TODO: We should probably propogate the kill signal to this thread. | ||
264 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | ||
265 | _ <- refresh bktnum | ||
266 | return () | ||
267 | return () | ||
268 | seconds -> threadDelay ( seconds * 1000000 ) | ||
269 | again | ||
270 | |||
271 | refreshBucket :: forall nid tok ni addr. | ||
272 | ( Show nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => | ||
273 | Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int | ||
274 | refreshBucket sch var n = do | ||
275 | tbl <- atomically (readTVar var) | ||
276 | let count = bktCount tbl | ||
277 | nid = kademliaLocation (searchSpace sch) (thisNode tbl) | ||
278 | sample <- if n+1 >= count -- Is this the last bucket? | ||
279 | then return nid -- Yes? Search our own id. | ||
280 | else kademliaSample (searchSpace sch) -- No? Generate a random id. | ||
281 | getEntropy | ||
282 | nid | ||
283 | (bucketRange n (n + 1 < count)) | ||
284 | fin <- atomically $ newTVar False | ||
285 | resultCounter <- atomically $ newTVar Set.empty | ||
286 | let fullcount = R.defaultBucketSize | ||
287 | saveit True = writeTVar fin True >> return True | ||
288 | saveit _ = return False | ||
289 | checkBucketFull :: ni -> STM Bool | ||
290 | checkBucketFull found_node = do | ||
291 | tbl <- readTVar var | ||
292 | let counts = R.shape tbl | ||
293 | when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) | ||
294 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
295 | resultCount <- readTVar resultCounter | ||
296 | saveit $ case drop (n - 1) counts of | ||
297 | (cnt:_) | cnt < fullcount -> True | ||
298 | _ | Set.size resultCount < fullcount -> True | ||
299 | _ -> False | ||
300 | |||
301 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) | ||
302 | |||
303 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | ||
304 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | ||
305 | then const $ return True -- Never short-circuit the last bucket. | ||
306 | else checkBucketFull | ||
307 | _ <- timeout (15*60*1000000) $ do | ||
308 | atomically $ searchIsFinished s >>= check | ||
309 | atomically $ searchCancel s | ||
310 | hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) | ||
311 | rcount <- atomically $ do | ||
312 | c <- Set.size <$> readTVar resultCounter | ||
313 | b <- readTVar fin | ||
314 | return $ if b then 1 else c | ||
315 | return rcount | ||
316 | |||
317 | bootstrap :: | ||
318 | ( Show nid | ||
319 | , Serialize nid | ||
320 | -- , FiniteBits nid | ||
321 | , Hashable ni | ||
322 | , Hashable nid | ||
323 | , Ord ni | ||
324 | , Ord addr | ||
325 | , Ord nid | ||
326 | , Traversable t1 | ||
327 | , Traversable t | ||
328 | ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () | ||
329 | bootstrap sch var ping ns ns0 = do | ||
330 | gotPing <- atomically $ newTVar False | ||
331 | |||
332 | -- First, ping the given nodes so that they are added to | ||
333 | -- our routing table. | ||
334 | withTaskGroup "bootstrap.resume" 20 $ \g -> do | ||
335 | forM_ ns $ \n -> do | ||
336 | let lbl = show $ kademliaLocation (searchSpace sch) n | ||
337 | forkTask g lbl $ do | ||
338 | b <- ping n | ||
339 | when b $ atomically $ writeTVar gotPing True | ||
340 | |||
341 | -- We resort to the hardcoded fallback nodes only when we got no | ||
342 | -- responses. This is to lesson the burden on well-known boostrap | ||
343 | -- nodes. | ||
344 | fallback <- atomically (readTVar gotPing) >>= return . when . not | ||
345 | fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do | ||
346 | forM_ ns0 $ \n -> do | ||
347 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | ||
348 | (void $ ping n) | ||
349 | hPutStrLn stderr "Finished bootstrap pings." | ||
350 | |||
351 | -- Now run searches until all the buckets are full. On a small network, | ||
352 | -- this may never quit. | ||
353 | -- | ||
354 | -- TODO: For small networks, we should give up on filling a nearby bucket | ||
355 | -- at some point and move on to one farther away. | ||
356 | flip fix 1 $ \again cnt -> do | ||
357 | when (cnt==0) $ do | ||
358 | -- Force a delay in case the search returns too quickly | ||
359 | hPutStrLn stderr $ "Zero results, forcing 1 minute delay" | ||
360 | threadDelay (60 * 1000000) | ||
361 | tbl <- atomically $ readTVar var | ||
362 | let shp = zip (R.shape tbl) [0 .. ] | ||
363 | unfull = filter ( (< R.defaultBucketSize) . fst ) shp | ||
364 | case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of | ||
365 | [] -> do | ||
366 | when (length shp < R.defaultBucketCount) $ do | ||
367 | -- Not enough buckets, keep trying. | ||
368 | hPutStrLn stderr | ||
369 | $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) | ||
370 | cnt <- refreshBucket sch var | ||
371 | (R.defaultBucketCount - 1) | ||
372 | again cnt | ||
373 | (size,num):_ -> do | ||
374 | -- If we don't yet have enough buckets, we need to search our own id. | ||
375 | -- We indicate that by setting the bucket number to the target. | ||
376 | let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1 | ||
377 | | otherwise = num | ||
378 | hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp) | ||
379 | cnt <- refreshBucket sch var num' | ||
380 | again cnt | ||