diff options
Diffstat (limited to 'src/Network/Kademlia')
-rw-r--r-- | src/Network/Kademlia/Routing.hs | 798 | ||||
-rw-r--r-- | src/Network/Kademlia/Search.hs | 2 |
2 files changed, 799 insertions, 1 deletions
diff --git a/src/Network/Kademlia/Routing.hs b/src/Network/Kademlia/Routing.hs new file mode 100644 index 00000000..7f76ac77 --- /dev/null +++ b/src/Network/Kademlia/Routing.hs | |||
@@ -0,0 +1,798 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Every node maintains a routing table of known good nodes. The | ||
9 | -- nodes in the routing table are used as starting points for | ||
10 | -- queries in the DHT. Nodes from the routing table are returned in | ||
11 | -- response to queries from other nodes. | ||
12 | -- | ||
13 | -- For more info see: | ||
14 | -- <http://www.bittorrent.org/beps/bep_0005.html#routing-table> | ||
15 | -- | ||
16 | {-# LANGUAGE CPP #-} | ||
17 | {-# LANGUAGE RecordWildCards #-} | ||
18 | {-# LANGUAGE BangPatterns #-} | ||
19 | {-# LANGUAGE RankNTypes #-} | ||
20 | {-# LANGUAGE ViewPatterns #-} | ||
21 | {-# LANGUAGE TypeOperators #-} | ||
22 | {-# LANGUAGE DeriveGeneric #-} | ||
23 | {-# LANGUAGE DeriveFunctor #-} | ||
24 | {-# LANGUAGE GADTs #-} | ||
25 | {-# LANGUAGE ScopedTypeVariables #-} | ||
26 | {-# LANGUAGE TupleSections #-} | ||
27 | {-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} | ||
28 | {-# OPTIONS_GHC -fno-warn-orphans #-} | ||
29 | module Network.Kademlia.Routing | ||
30 | {- | ||
31 | ( -- * BucketList | ||
32 | BucketList | ||
33 | , Info(..) | ||
34 | |||
35 | -- * Attributes | ||
36 | , BucketCount | ||
37 | , defaultBucketCount | ||
38 | , BucketSize | ||
39 | , defaultBucketSize | ||
40 | , NodeCount | ||
41 | |||
42 | -- * Query | ||
43 | , Network.Kademlia.Routing.null | ||
44 | , Network.Kademlia.Routing.full | ||
45 | , thisId | ||
46 | , shape | ||
47 | , Network.Kademlia.Routing.size | ||
48 | , Network.Kademlia.Routing.depth | ||
49 | , compatibleNodeId | ||
50 | |||
51 | -- * Lookup | ||
52 | , K | ||
53 | , defaultK | ||
54 | , TableKey (..) | ||
55 | , kclosest | ||
56 | |||
57 | -- * Construction | ||
58 | , Network.Kademlia.Routing.nullTable | ||
59 | , Event(..) | ||
60 | , CheckPing(..) | ||
61 | , Network.Kademlia.Routing.insert | ||
62 | |||
63 | -- * Conversion | ||
64 | , Network.Kademlia.Routing.TableEntry | ||
65 | , Network.Kademlia.Routing.toList | ||
66 | |||
67 | -- * Routing | ||
68 | , Timestamp | ||
69 | , getTimestamp | ||
70 | ) -} where | ||
71 | |||
72 | import Control.Applicative as A | ||
73 | import Control.Arrow | ||
74 | import Control.Monad | ||
75 | import Data.Function | ||
76 | import Data.Functor.Identity | ||
77 | import Data.List as L hiding (insert) | ||
78 | import Data.Maybe | ||
79 | import Data.Monoid | ||
80 | import Data.Wrapper.PSQ as PSQ | ||
81 | import Data.Serialize as S hiding (Result, Done) | ||
82 | import qualified Data.Sequence as Seq | ||
83 | import Data.Time | ||
84 | import Data.Time.Clock.POSIX | ||
85 | import Data.Word | ||
86 | import GHC.Generics | ||
87 | import Text.PrettyPrint as PP hiding ((<>)) | ||
88 | import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) | ||
89 | import qualified Data.ByteString as BS | ||
90 | import Data.Bits | ||
91 | import Data.Ord | ||
92 | import Data.Reflection | ||
93 | import Network.Address | ||
94 | import Data.Typeable | ||
95 | import Data.Coerce | ||
96 | import Data.Hashable | ||
97 | |||
98 | -- | Last time the node was responding to our queries. | ||
99 | -- | ||
100 | -- Not all nodes that we learn about are equal. Some are \"good\" and | ||
101 | -- some are not. Many nodes using the DHT are able to send queries | ||
102 | -- and receive responses, but are not able to respond to queries | ||
103 | -- from other nodes. It is important that each node's routing table | ||
104 | -- must contain only known good nodes. A good node is a node has | ||
105 | -- responded to one of our queries within the last 15 minutes. A | ||
106 | -- node is also good if it has ever responded to one of our queries | ||
107 | -- and has sent us a query within the last 15 minutes. After 15 | ||
108 | -- minutes of inactivity, a node becomes questionable. Nodes become | ||
109 | -- bad when they fail to respond to multiple queries in a row. Nodes | ||
110 | -- that we know are good are given priority over nodes with unknown | ||
111 | -- status. | ||
112 | -- | ||
113 | type Timestamp = POSIXTime | ||
114 | |||
115 | getTimestamp :: IO Timestamp | ||
116 | getTimestamp = do | ||
117 | utcTime <- getCurrentTime | ||
118 | return $ utcTimeToPOSIXSeconds utcTime | ||
119 | |||
120 | |||
121 | |||
122 | {----------------------------------------------------------------------- | ||
123 | Bucket | ||
124 | -----------------------------------------------------------------------} | ||
125 | -- | ||
126 | -- When a k-bucket is full and a new node is discovered for that | ||
127 | -- k-bucket, the least recently seen node in the k-bucket is | ||
128 | -- PINGed. If the node is found to be still alive, the new node is | ||
129 | -- place in a secondary list, a replacement cache. The replacement | ||
130 | -- cache is used only if a node in the k-bucket stops responding. In | ||
131 | -- other words: new nodes are used only when older nodes disappear. | ||
132 | |||
133 | -- | Timestamp - last time this node is pinged. | ||
134 | type NodeEntry ni = Binding ni Timestamp | ||
135 | |||
136 | |||
137 | -- | Maximum number of 'NodeInfo's stored in a bucket. Most clients | ||
138 | -- use this value. | ||
139 | defaultBucketSize :: Int | ||
140 | defaultBucketSize = 8 | ||
141 | |||
142 | data QueueMethods m elem fifo = QueueMethods | ||
143 | { pushBack :: elem -> fifo -> m fifo | ||
144 | , popFront :: fifo -> m (Maybe elem, fifo) | ||
145 | , emptyQueue :: m fifo | ||
146 | } | ||
147 | |||
148 | {- | ||
149 | fromQ :: Functor m => | ||
150 | ( a -> b ) | ||
151 | -> ( b -> a ) | ||
152 | -> QueueMethods m elem a | ||
153 | -> QueueMethods m elem b | ||
154 | fromQ embed project QueueMethods{..} = | ||
155 | QueueMethods { pushBack = \e -> fmap embed . pushBack e . project | ||
156 | , popFront = fmap (second embed) . popFront . project | ||
157 | , emptyQueue = fmap embed emptyQueue | ||
158 | } | ||
159 | -} | ||
160 | |||
161 | seqQ :: QueueMethods Identity ni (Seq.Seq ni) | ||
162 | seqQ = QueueMethods | ||
163 | { pushBack = \e fifo -> pure (fifo Seq.|> e) | ||
164 | , popFront = \fifo -> case Seq.viewl fifo of | ||
165 | e Seq.:< fifo' -> pure (Just e, fifo') | ||
166 | Seq.EmptyL -> pure (Nothing, Seq.empty) | ||
167 | , emptyQueue = pure Seq.empty | ||
168 | } | ||
169 | |||
170 | type BucketQueue ni = Seq.Seq ni | ||
171 | |||
172 | bucketQ :: QueueMethods Identity ni (BucketQueue ni) | ||
173 | bucketQ = seqQ | ||
174 | |||
175 | |||
176 | data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int) | ||
177 | |||
178 | contramapC :: (b -> a) -> Compare a -> Compare b | ||
179 | contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b)) | ||
180 | (\s x -> hsh s (f x)) | ||
181 | |||
182 | newtype Ordered' s a = Ordered a | ||
183 | deriving (Show) | ||
184 | |||
185 | -- | Hack to avoid UndecidableInstances | ||
186 | newtype Shrink a = Shrink a | ||
187 | deriving (Show) | ||
188 | |||
189 | type Ordered s a = Ordered' s (Shrink a) | ||
190 | |||
191 | instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where | ||
192 | a == b = (compare a b == EQ) | ||
193 | |||
194 | instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where | ||
195 | compare a b = cmp (coerce a) (coerce b) | ||
196 | where Compare cmp _ = reflect (Proxy :: Proxy s) | ||
197 | |||
198 | instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where | ||
199 | hashWithSalt salt x = hash salt (coerce x) | ||
200 | where Compare _ hash = reflect (Proxy :: Proxy s) | ||
201 | |||
202 | -- | Bucket is also limited in its length — thus it's called k-bucket. | ||
203 | -- When bucket becomes full, we should split it in two lists by | ||
204 | -- current span bit. Span bit is defined by depth in the routing | ||
205 | -- table tree. Size of the bucket should be choosen such that it's | ||
206 | -- very unlikely that all nodes in bucket fail within an hour of | ||
207 | -- each other. | ||
208 | data Bucket s ni = Bucket | ||
209 | { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes | ||
210 | , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs | ||
211 | } deriving (Generic) | ||
212 | |||
213 | #define CAN_SHOW_BUCKET 0 | ||
214 | |||
215 | #if CAN_SHOW_BUCKET | ||
216 | deriving instance Show ni => Show (Bucket s ni) | ||
217 | #endif | ||
218 | |||
219 | bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni | ||
220 | bucketCompare _ = reflect (Proxy :: Proxy s) | ||
221 | |||
222 | mapBucket :: ( Reifies s (Compare a) | ||
223 | , Reifies t (Compare ni) | ||
224 | ) => (a -> ni) -> Bucket s a -> Bucket t ni | ||
225 | mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns) | ||
226 | (fmap (second f) q) | ||
227 | where f' = coerce . f . coerce | ||
228 | |||
229 | |||
230 | #if 0 | ||
231 | |||
232 | {- | ||
233 | getGenericNode :: ( Serialize (NodeId) | ||
234 | , Serialize ip | ||
235 | , Serialize u | ||
236 | ) => Get (NodeInfo) | ||
237 | getGenericNode = do | ||
238 | nid <- get | ||
239 | naddr <- get | ||
240 | u <- get | ||
241 | return NodeInfo | ||
242 | { nodeId = nid | ||
243 | , nodeAddr = naddr | ||
244 | , nodeAnnotation = u | ||
245 | } | ||
246 | |||
247 | putGenericNode :: ( Serialize (NodeId) | ||
248 | , Serialize ip | ||
249 | , Serialize u | ||
250 | ) => NodeInfo -> Put | ||
251 | putGenericNode (NodeInfo nid naddr u) = do | ||
252 | put nid | ||
253 | put naddr | ||
254 | put u | ||
255 | |||
256 | instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where | ||
257 | get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) | ||
258 | put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes | ||
259 | -} | ||
260 | |||
261 | #endif | ||
262 | |||
263 | psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p | ||
264 | psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs | ||
265 | |||
266 | psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)] | ||
267 | psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq | ||
268 | |||
269 | -- | Update interval, in seconds. | ||
270 | delta :: NominalDiffTime | ||
271 | delta = 15 * 60 | ||
272 | |||
273 | -- | Should maintain a set of stable long running nodes. | ||
274 | -- | ||
275 | -- Note: pings are triggerd only when a bucket is full. | ||
276 | updateBucketForInbound curTime info bucket | ||
277 | -- Just update timestamp if a node is already in bucket. | ||
278 | -- | ||
279 | -- Note PingResult events should only occur for nodes we requested a ping for, | ||
280 | -- and those will always already be in the routing queue and will get their | ||
281 | -- timestamp updated here, since 'TryInsert' is called on every inbound packet, | ||
282 | -- including ping results. | ||
283 | | already_have | ||
284 | = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime ) | ||
285 | -- bucket is good, but not full => we can insert a new node | ||
286 | | PSQ.size (bktNodes bucket) < defaultBucketSize | ||
287 | = pure ( [], map_ns $ PSQ.insert (coerce info) curTime ) | ||
288 | -- If there are any questionable nodes in the bucket have not been | ||
289 | -- seen in the last 15 minutes, the least recently seen node is | ||
290 | -- pinged. If any nodes in the bucket are known to have become bad, | ||
291 | -- then one is replaced by the new node in the next insertBucket | ||
292 | -- iteration. | ||
293 | | not (L.null stales) | ||
294 | = pure ( stales | ||
295 | , bucket { -- Update timestamps so that we don't redundantly ping. | ||
296 | bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket | ||
297 | -- Update queue with the pending NodeInfo in case of ping fail. | ||
298 | , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } ) | ||
299 | -- When the bucket is full of good nodes, the new node is simply discarded. | ||
300 | -- We must return 'A.empty' here to ensure that bucket splitting happens | ||
301 | -- inside 'modifyBucket'. | ||
302 | | otherwise = A.empty | ||
303 | where | ||
304 | -- We (take 1) to keep a 1-to-1 correspondence between pending pings and | ||
305 | -- waiting nodes in the bktQ. This way, we don't have to worry about what | ||
306 | -- to do with failed pings for which there is no ready replacements. | ||
307 | stales = -- One stale: | ||
308 | do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) | ||
309 | guard (t < curTime - delta) | ||
310 | return $ coerce n | ||
311 | -- All stale: | ||
312 | -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket | ||
313 | |||
314 | already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket) | ||
315 | |||
316 | map_ns f = bucket { bktNodes = f (bktNodes bucket) } | ||
317 | -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } | ||
318 | |||
319 | updateBucketForPingResult bad_node got_response bucket | ||
320 | = pure ( map (,Nothing) forgotten | ||
321 | ++ map (second Just) replacements | ||
322 | , Bucket (foldr replace | ||
323 | (bktNodes bucket) | ||
324 | replacements) | ||
325 | popped | ||
326 | ) | ||
327 | where | ||
328 | (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) | ||
329 | |||
330 | -- Dropped from accepted, replaced by pending. | ||
331 | replacements | got_response = [] -- Timestamp was already updated by TryInsert. | ||
332 | | Just info <- top = do | ||
333 | -- Insert only if there's a removal. | ||
334 | _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket) | ||
335 | return (bad_node, info) | ||
336 | | otherwise = [] | ||
337 | |||
338 | -- Dropped from the pending queue without replacing. | ||
339 | forgotten | got_response = maybeToList $ fmap snd top | ||
340 | | otherwise = [] | ||
341 | |||
342 | |||
343 | replace (bad_node, (tm, info)) = | ||
344 | PSQ.insert (coerce info) tm | ||
345 | . PSQ.delete (coerce bad_node) | ||
346 | |||
347 | |||
348 | updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp | ||
349 | updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales | ||
350 | |||
351 | type BitIx = Word | ||
352 | |||
353 | partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) | ||
354 | partitionQ imp test q0 = do | ||
355 | pass0 <- emptyQueue imp | ||
356 | fail0 <- emptyQueue imp | ||
357 | let flipfix a b f = fix f a b | ||
358 | flipfix q0 (pass0,fail0) $ \rec q qs -> do | ||
359 | (mb,q') <- popFront imp q | ||
360 | case mb of | ||
361 | Nothing -> return qs | ||
362 | Just e -> do qs' <- select (pushBack imp e) qs | ||
363 | rec q' qs' | ||
364 | where | ||
365 | select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) | ||
366 | select f = if test e then \(a,b) -> flip (,) b <$> f a | ||
367 | else \(a,b) -> (,) a <$> f b | ||
368 | |||
369 | |||
370 | |||
371 | split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
372 | forall ni s. ( Reifies s (Compare ni) ) => | ||
373 | (ni -> Word -> Bool) | ||
374 | -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni) | ||
375 | split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs) | ||
376 | where | ||
377 | (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b | ||
378 | (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b | ||
379 | |||
380 | spanBit :: ni -> Bool | ||
381 | spanBit entry = testNodeIdBit entry i | ||
382 | |||
383 | |||
384 | {----------------------------------------------------------------------- | ||
385 | -- BucketList | ||
386 | -----------------------------------------------------------------------} | ||
387 | |||
388 | defaultBucketCount :: Int | ||
389 | defaultBucketCount = 20 | ||
390 | |||
391 | defaultMaxBucketCount :: Word | ||
392 | defaultMaxBucketCount = 24 | ||
393 | |||
394 | data Info ni nid = Info | ||
395 | { myBuckets :: BucketList ni | ||
396 | , myNodeId :: nid | ||
397 | , myAddress :: SockAddr | ||
398 | } | ||
399 | deriving Generic | ||
400 | |||
401 | deriving instance (Eq ni, Eq nid) => Eq (Info ni nid) | ||
402 | deriving instance (Show ni, Show nid) => Show (Info ni nid) | ||
403 | |||
404 | -- instance (Eq ip, Serialize ip) => Serialize (Info ip) | ||
405 | |||
406 | -- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ | ||
407 | -- 160. The routing table is subdivided into 'Bucket's that each cover | ||
408 | -- a portion of the space. An empty table has one bucket with an ID | ||
409 | -- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" | ||
410 | -- is inserted into the table, it is placed within the bucket that has | ||
411 | -- @min <= N < max@. An empty table has only one bucket so any node | ||
412 | -- must fit within it. Each bucket can only hold 'K' nodes, currently | ||
413 | -- eight, before becoming 'Full'. When a bucket is full of known good | ||
414 | -- nodes, no more nodes may be added unless our own 'NodeId' falls | ||
415 | -- within the range of the 'Bucket'. In that case, the bucket is | ||
416 | -- replaced by two new buckets each with half the range of the old | ||
417 | -- bucket and the nodes from the old bucket are distributed among the | ||
418 | -- two new ones. For a new table with only one bucket, the full bucket | ||
419 | -- is always split into two new buckets covering the ranges @0..2 ^ | ||
420 | -- 159@ and @2 ^ 159..2 ^ 160@. | ||
421 | -- | ||
422 | data BucketList ni = forall s. Reifies s (Compare ni) => | ||
423 | BucketList { thisNode :: !ni | ||
424 | -- | Non-empty list of buckets. | ||
425 | , buckets :: [Bucket s ni] | ||
426 | } | ||
427 | |||
428 | mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b | ||
429 | mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts) | ||
430 | $ \p -> BucketList | ||
431 | { thisNode = f self | ||
432 | , buckets = map (resolve p . mapBucket f) bkts | ||
433 | } | ||
434 | where | ||
435 | resolve :: Proxy s -> Bucket s ni -> Bucket s ni | ||
436 | resolve = const id | ||
437 | |||
438 | instance (Eq ni) => Eq (BucketList ni) where | ||
439 | (==) = (==) `on` Network.Kademlia.Routing.toList | ||
440 | |||
441 | #if 0 | ||
442 | |||
443 | instance Serialize NominalDiffTime where | ||
444 | put = putWord32be . fromIntegral . fromEnum | ||
445 | get = (toEnum . fromIntegral) <$> getWord32be | ||
446 | |||
447 | #endif | ||
448 | |||
449 | #if CAN_SHOW_BUCKET | ||
450 | deriving instance (Show ni) => Show (BucketList ni) | ||
451 | #else | ||
452 | instance Show ni => Show (BucketList ni) where | ||
453 | showsPrec d (BucketList self bkts) = | ||
454 | mappend "BucketList " | ||
455 | . showsPrec (d+1) self | ||
456 | . mappend " (fromList " | ||
457 | . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts) | ||
458 | . mappend ") " | ||
459 | #endif | ||
460 | |||
461 | #if 0 | ||
462 | |||
463 | -- | Normally, routing table should be saved between invocations of | ||
464 | -- the client software. Note that you don't need to store /this/ | ||
465 | -- 'NodeId' since it is already included in routing table. | ||
466 | instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList) | ||
467 | |||
468 | #endif | ||
469 | |||
470 | -- | Shape of the table. | ||
471 | instance Pretty (BucketList ni) where | ||
472 | pPrint t | ||
473 | | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss | ||
474 | | otherwise = brackets $ | ||
475 | PP.int (L.sum ss) <> " nodes, " <> | ||
476 | PP.int bucketCount <> " buckets" | ||
477 | where | ||
478 | bucketCount = L.length ss | ||
479 | ss = shape t | ||
480 | |||
481 | -- | Empty table with specified /spine/ node id. | ||
482 | -- | ||
483 | -- XXX: The comparison function argument is awkward here. | ||
484 | nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni | ||
485 | nullTable cmp hsh ni n = | ||
486 | reify (Compare cmp hsh) | ||
487 | $ \p -> BucketList | ||
488 | ni | ||
489 | [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)] | ||
490 | where | ||
491 | empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp | ||
492 | empty = const $ PSQ.empty | ||
493 | |||
494 | #if 0 | ||
495 | |||
496 | -- | Test if table is empty. In this case DHT should start | ||
497 | -- bootstrapping process until table becomes 'full'. | ||
498 | null :: BucketList -> Bool | ||
499 | null (Tip _ _ b) = PSQ.null $ bktNodes b | ||
500 | null _ = False | ||
501 | |||
502 | -- | Test if table have maximum number of nodes. No more nodes can be | ||
503 | -- 'insert'ed, except old ones becomes bad. | ||
504 | full :: BucketList -> Bool | ||
505 | full (Tip _ n _) = n == 0 | ||
506 | full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t | ||
507 | full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t | ||
508 | |||
509 | -- | Get the /spine/ node id. | ||
510 | thisId :: BucketList -> NodeId | ||
511 | thisId (Tip nid _ _) = nid | ||
512 | thisId (Zero table _) = thisId table | ||
513 | thisId (One _ table) = thisId table | ||
514 | |||
515 | -- | Number of nodes in a bucket or a table. | ||
516 | type NodeCount = Int | ||
517 | |||
518 | #endif | ||
519 | |||
520 | -- | Internally, routing table is similar to list of buckets or a | ||
521 | -- /matrix/ of nodes. This function returns the shape of the matrix. | ||
522 | shape :: BucketList ni -> [Int] | ||
523 | shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl | ||
524 | |||
525 | #if 0 | ||
526 | |||
527 | -- | Get number of nodes in the table. | ||
528 | size :: BucketList -> NodeCount | ||
529 | size = L.sum . shape | ||
530 | |||
531 | -- | Get number of buckets in the table. | ||
532 | depth :: BucketList -> BucketCount | ||
533 | depth = L.length . shape | ||
534 | |||
535 | #endif | ||
536 | |||
537 | lookupBucket :: forall ni nid x. | ||
538 | ( -- FiniteBits nid | ||
539 | Ord nid | ||
540 | ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x | ||
541 | lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts | ||
542 | where | ||
543 | d = kademliaXor space nid (kademliaLocation space self) | ||
544 | |||
545 | go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] | ||
546 | go i bs (bucket : buckets) | ||
547 | | kademliaTestBit space d i = bucket : buckets ++ bs | ||
548 | | otherwise = go (succ i) (bucket:bs) buckets | ||
549 | go _ bs [] = bs | ||
550 | |||
551 | bucketNumber :: forall ni nid. | ||
552 | KademliaSpace nid ni -> nid -> BucketList ni -> Int | ||
553 | bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts | ||
554 | where | ||
555 | d = kademliaXor space nid (kademliaLocation space self) | ||
556 | |||
557 | go :: Word -> [Bucket s ni] -> Word | ||
558 | go i (bucket : buckets) | ||
559 | | kademliaTestBit space d i = i | ||
560 | | otherwise = go (succ i) buckets | ||
561 | go i [] = i | ||
562 | |||
563 | |||
564 | compatibleNodeId :: forall ni nid. | ||
565 | ( Serialize nid, FiniteBits nid) => | ||
566 | (ni -> nid) -> BucketList ni -> IO nid | ||
567 | compatibleNodeId nodeId tbl = genBucketSample prefix br | ||
568 | where | ||
569 | br = bucketRange (L.length (shape tbl) - 1) True | ||
570 | nodeIdSize = finiteBitSize (undefined :: nid) `div` 8 | ||
571 | bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0 | ||
572 | prefix = either error id $ S.decode bs | ||
573 | |||
574 | tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8] | ||
575 | tablePrefix testbit = map (packByte . take 8 . (++repeat False)) | ||
576 | . chunksOf 8 | ||
577 | . tableBits testbit | ||
578 | where | ||
579 | packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] | ||
580 | bitmask ix True = bit ix | ||
581 | bitmask _ _ = 0 | ||
582 | |||
583 | tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool] | ||
584 | tableBits testbit (BucketList self bkts) = | ||
585 | zipWith const (map (testbit self) [0..]) | ||
586 | bkts | ||
587 | |||
588 | selfNode :: BucketList ni -> ni | ||
589 | selfNode (BucketList self _) = self | ||
590 | |||
591 | chunksOf :: Int -> [e] -> [[e]] | ||
592 | chunksOf i ls = map (take i) (build (splitter ls)) where | ||
593 | splitter :: [e] -> ([e] -> a -> a) -> a -> a | ||
594 | splitter [] _ n = n | ||
595 | splitter l c n = l `c` splitter (drop i l) c n | ||
596 | |||
597 | build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] | ||
598 | build g = g (:) [] | ||
599 | |||
600 | |||
601 | |||
602 | -- | Count of closest nodes in find_node reply. | ||
603 | type K = Int | ||
604 | |||
605 | -- | Default 'K' is equal to 'defaultBucketSize'. | ||
606 | defaultK :: K | ||
607 | defaultK = 8 | ||
608 | |||
609 | #if 0 | ||
610 | class TableKey dht k where | ||
611 | toNodeId :: k -> NodeId | ||
612 | |||
613 | instance TableKey dht (NodeId) where | ||
614 | toNodeId = id | ||
615 | |||
616 | #endif | ||
617 | |||
618 | -- | In Kademlia, the distance metric is XOR and the result is | ||
619 | -- interpreted as an unsigned integer. | ||
620 | newtype NodeDistance nodeid = NodeDistance nodeid | ||
621 | deriving (Eq, Ord) | ||
622 | |||
623 | -- | distance(A,B) = |A xor B| Smaller values are closer. | ||
624 | distance :: Bits nid => nid -> nid -> NodeDistance nid | ||
625 | distance a b = NodeDistance $ xor a b | ||
626 | |||
627 | -- | Order by closeness: nearest nodes first. | ||
628 | rank :: ( Ord nid | ||
629 | ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] | ||
630 | rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) | ||
631 | |||
632 | |||
633 | -- | Get a list of /K/ closest nodes using XOR metric. Used in | ||
634 | -- 'find_node' and 'get_peers' queries. | ||
635 | kclosest :: ( -- FiniteBits nid | ||
636 | Ord nid | ||
637 | ) => | ||
638 | KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] | ||
639 | kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) | ||
640 | ++ rank space nid (L.concat everyone) | ||
641 | where | ||
642 | (bucket,everyone) = | ||
643 | L.splitAt 1 | ||
644 | . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) | ||
645 | $ tbl | ||
646 | |||
647 | |||
648 | |||
649 | {----------------------------------------------------------------------- | ||
650 | -- Routing | ||
651 | -----------------------------------------------------------------------} | ||
652 | |||
653 | splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
654 | ( Reifies s (Compare ni) ) => | ||
655 | (ni -> Word -> Bool) | ||
656 | -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ] | ||
657 | splitTip testNodeBit ni i bucket | ||
658 | | testNodeBit ni i = [zeros , ones ] | ||
659 | | otherwise = [ones , zeros ] | ||
660 | where | ||
661 | (ones, zeros) = split testNodeBit i bucket | ||
662 | |||
663 | -- | Used in each query. | ||
664 | -- | ||
665 | -- TODO: Kademlia non-empty subtrees should should split if they have less than | ||
666 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia | ||
667 | -- paper. The rule requiring additional splits is in section 2.4. | ||
668 | modifyBucket | ||
669 | :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
670 | forall ni nid xs. | ||
671 | KademliaSpace nid ni | ||
672 | -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni) | ||
673 | modifyBucket space nid f (BucketList self bkts) | ||
674 | = second (BucketList self) <$> go (0 :: BitIx) bkts | ||
675 | where | ||
676 | d = kademliaXor space nid (kademliaLocation space self) | ||
677 | |||
678 | -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni]) | ||
679 | |||
680 | go !i (bucket : buckets@(_:_)) | ||
681 | | kademliaTestBit space d i = second (: buckets) <$> f bucket | ||
682 | | otherwise = second (bucket :) <$> go (succ i) buckets | ||
683 | |||
684 | go !i [bucket] = second (: []) <$> f bucket <|> gosplit | ||
685 | where | ||
686 | gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space | ||
687 | . kademliaLocation space ) | ||
688 | self | ||
689 | i | ||
690 | bucket) | ||
691 | | otherwise = Nothing -- Limit the number of buckets. | ||
692 | |||
693 | |||
694 | bktCount :: BucketList ni -> Int | ||
695 | bktCount (BucketList _ bkts) = L.length bkts | ||
696 | |||
697 | -- | Triggering event for atomic table update | ||
698 | data Event ni = TryInsert { foreignNode :: ni } | ||
699 | | PingResult { foreignNode :: ni , ponged :: Bool } | ||
700 | |||
701 | #if 0 | ||
702 | deriving instance Eq (NodeId) => Eq (Event) | ||
703 | deriving instance ( Show ip | ||
704 | , Show (NodeId) | ||
705 | , Show u | ||
706 | ) => Show (Event) | ||
707 | |||
708 | #endif | ||
709 | |||
710 | eventId :: (ni -> nid) -> Event ni -> nid | ||
711 | eventId nodeId (TryInsert ni) = nodeId ni | ||
712 | eventId nodeId (PingResult ni _) = nodeId ni | ||
713 | |||
714 | |||
715 | -- | Actions requested by atomic table update | ||
716 | data CheckPing ni = CheckPing [ni] | ||
717 | |||
718 | #if 0 | ||
719 | |||
720 | deriving instance Eq (NodeId) => Eq (CheckPing) | ||
721 | deriving instance ( Show ip | ||
722 | , Show (NodeId) | ||
723 | , Show u | ||
724 | ) => Show (CheckPing) | ||
725 | |||
726 | #endif | ||
727 | |||
728 | |||
729 | -- | Call on every inbound packet (including requested ping results). | ||
730 | -- Returns a triple (was_inserted, to_ping, tbl') where | ||
731 | -- | ||
732 | -- [ /was_inserted/ ] True if the node was added to the routing table. | ||
733 | -- | ||
734 | -- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'. | ||
735 | -- This will be empty if /was_inserted/, but a non-inserted node | ||
736 | -- may be added to a replacement queue and will be inserted if | ||
737 | -- one of the items in this list time out. | ||
738 | -- | ||
739 | -- [ /tbl'/ ] The updated routing 'BucketList'. | ||
740 | -- | ||
741 | updateForInbound :: | ||
742 | KademliaSpace nid ni | ||
743 | -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni) | ||
744 | updateForInbound space tm ni tbl@(BucketList _ bkts) = | ||
745 | maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl')) | ||
746 | $ modifyBucket space | ||
747 | (kademliaLocation space ni) | ||
748 | (updateBucketForInbound tm ni) | ||
749 | tbl | ||
750 | |||
751 | -- | Update the routing table with the results of a ping. | ||
752 | -- | ||
753 | -- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the | ||
754 | -- routing table and the node /b/, with timestamp /tm/, has taken its place. | ||
755 | updateForPingResult :: | ||
756 | KademliaSpace nid ni | ||
757 | -> ni -- ^ The pinged node. | ||
758 | -> Bool -- ^ True if we got a reply, False if it timed out. | ||
759 | -> BucketList ni -- ^ The routing table. | ||
760 | -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni ) | ||
761 | updateForPingResult space ni got_reply tbl = | ||
762 | fromMaybe ([],tbl) | ||
763 | $ modifyBucket space | ||
764 | (kademliaLocation space ni) | ||
765 | (updateBucketForPingResult ni got_reply) | ||
766 | tbl | ||
767 | |||
768 | |||
769 | {----------------------------------------------------------------------- | ||
770 | -- Conversion | ||
771 | -----------------------------------------------------------------------} | ||
772 | |||
773 | type TableEntry ni = (ni, Timestamp) | ||
774 | |||
775 | tableEntry :: NodeEntry ni -> TableEntry ni | ||
776 | tableEntry (a :-> b) = (a, b) | ||
777 | |||
778 | toList :: BucketList ni -> [[TableEntry ni]] | ||
779 | toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts | ||
780 | |||
781 | data KademliaSpace nid ni = KademliaSpace | ||
782 | { -- | Given a node record (probably including IP address), yields a | ||
783 | -- kademlia xor-metric location. | ||
784 | kademliaLocation :: ni -> nid | ||
785 | -- | Used when comparing locations. This is similar to | ||
786 | -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so | ||
787 | -- that 0 is the most significant bit. | ||
788 | , kademliaTestBit :: nid -> Word -> Bool | ||
789 | -- | The Kademlia xor-metric. | ||
790 | , kademliaXor :: nid -> nid -> nid | ||
791 | |||
792 | , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid | ||
793 | } | ||
794 | |||
795 | contramapKS f ks = ks | ||
796 | { kademliaLocation = kademliaLocation ks . f | ||
797 | } | ||
798 | |||
diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs index 195bed14..71107fbd 100644 --- a/src/Network/Kademlia/Search.hs +++ b/src/Network/Kademlia/Search.hs | |||
@@ -26,7 +26,7 @@ import qualified Data.MinMaxPSQ as MM | |||
26 | import qualified Data.Wrapper.PSQ as PSQ | 26 | import qualified Data.Wrapper.PSQ as PSQ |
27 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) | 27 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) |
28 | import Network.Address hiding (NodeId) | 28 | import Network.Address hiding (NodeId) |
29 | import Network.DHT.Routing as R | 29 | import Network.Kademlia.Routing as R |
30 | #ifdef THREAD_DEBUG | 30 | #ifdef THREAD_DEBUG |
31 | import Control.Concurrent.Lifted.Instrument | 31 | import Control.Concurrent.Lifted.Instrument |
32 | #else | 32 | #else |