summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-09-15 17:45:57 -0400
committerjoe <joe@jerkface.net>2017-09-15 17:45:57 -0400
commit65f9152b7be0dc86a09870114c9e33ff4642f918 (patch)
tree2b0ee17754f2f38a63e5ea9aeb0eb1819de85db5 /src/Network/Kademlia
parentfdfe7279339f91bad5ceb0cab8e699415686ab3f (diff)
Moved Network.DHT.Routing -> Network.Kademlia.Routing
Diffstat (limited to 'src/Network/Kademlia')
-rw-r--r--src/Network/Kademlia/Routing.hs798
-rw-r--r--src/Network/Kademlia/Search.hs2
2 files changed, 799 insertions, 1 deletions
diff --git a/src/Network/Kademlia/Routing.hs b/src/Network/Kademlia/Routing.hs
new file mode 100644
index 00000000..7f76ac77
--- /dev/null
+++ b/src/Network/Kademlia/Routing.hs
@@ -0,0 +1,798 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Every node maintains a routing table of known good nodes. The
9-- nodes in the routing table are used as starting points for
10-- queries in the DHT. Nodes from the routing table are returned in
11-- response to queries from other nodes.
12--
13-- For more info see:
14-- <http://www.bittorrent.org/beps/bep_0005.html#routing-table>
15--
16{-# LANGUAGE CPP #-}
17{-# LANGUAGE RecordWildCards #-}
18{-# LANGUAGE BangPatterns #-}
19{-# LANGUAGE RankNTypes #-}
20{-# LANGUAGE ViewPatterns #-}
21{-# LANGUAGE TypeOperators #-}
22{-# LANGUAGE DeriveGeneric #-}
23{-# LANGUAGE DeriveFunctor #-}
24{-# LANGUAGE GADTs #-}
25{-# LANGUAGE ScopedTypeVariables #-}
26{-# LANGUAGE TupleSections #-}
27{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-}
28{-# OPTIONS_GHC -fno-warn-orphans #-}
29module Network.Kademlia.Routing
30 {-
31 ( -- * BucketList
32 BucketList
33 , Info(..)
34
35 -- * Attributes
36 , BucketCount
37 , defaultBucketCount
38 , BucketSize
39 , defaultBucketSize
40 , NodeCount
41
42 -- * Query
43 , Network.Kademlia.Routing.null
44 , Network.Kademlia.Routing.full
45 , thisId
46 , shape
47 , Network.Kademlia.Routing.size
48 , Network.Kademlia.Routing.depth
49 , compatibleNodeId
50
51 -- * Lookup
52 , K
53 , defaultK
54 , TableKey (..)
55 , kclosest
56
57 -- * Construction
58 , Network.Kademlia.Routing.nullTable
59 , Event(..)
60 , CheckPing(..)
61 , Network.Kademlia.Routing.insert
62
63 -- * Conversion
64 , Network.Kademlia.Routing.TableEntry
65 , Network.Kademlia.Routing.toList
66
67 -- * Routing
68 , Timestamp
69 , getTimestamp
70 ) -} where
71
72import Control.Applicative as A
73import Control.Arrow
74import Control.Monad
75import Data.Function
76import Data.Functor.Identity
77import Data.List as L hiding (insert)
78import Data.Maybe
79import Data.Monoid
80import Data.Wrapper.PSQ as PSQ
81import Data.Serialize as S hiding (Result, Done)
82import qualified Data.Sequence as Seq
83import Data.Time
84import Data.Time.Clock.POSIX
85import Data.Word
86import GHC.Generics
87import Text.PrettyPrint as PP hiding ((<>))
88import Text.PrettyPrint.HughesPJClass (pPrint,Pretty)
89import qualified Data.ByteString as BS
90import Data.Bits
91import Data.Ord
92import Data.Reflection
93import Network.Address
94import Data.Typeable
95import Data.Coerce
96import Data.Hashable
97
98-- | Last time the node was responding to our queries.
99--
100-- Not all nodes that we learn about are equal. Some are \"good\" and
101-- some are not. Many nodes using the DHT are able to send queries
102-- and receive responses, but are not able to respond to queries
103-- from other nodes. It is important that each node's routing table
104-- must contain only known good nodes. A good node is a node has
105-- responded to one of our queries within the last 15 minutes. A
106-- node is also good if it has ever responded to one of our queries
107-- and has sent us a query within the last 15 minutes. After 15
108-- minutes of inactivity, a node becomes questionable. Nodes become
109-- bad when they fail to respond to multiple queries in a row. Nodes
110-- that we know are good are given priority over nodes with unknown
111-- status.
112--
113type Timestamp = POSIXTime
114
115getTimestamp :: IO Timestamp
116getTimestamp = do
117 utcTime <- getCurrentTime
118 return $ utcTimeToPOSIXSeconds utcTime
119
120
121
122{-----------------------------------------------------------------------
123 Bucket
124-----------------------------------------------------------------------}
125--
126-- When a k-bucket is full and a new node is discovered for that
127-- k-bucket, the least recently seen node in the k-bucket is
128-- PINGed. If the node is found to be still alive, the new node is
129-- place in a secondary list, a replacement cache. The replacement
130-- cache is used only if a node in the k-bucket stops responding. In
131-- other words: new nodes are used only when older nodes disappear.
132
133-- | Timestamp - last time this node is pinged.
134type NodeEntry ni = Binding ni Timestamp
135
136
137-- | Maximum number of 'NodeInfo's stored in a bucket. Most clients
138-- use this value.
139defaultBucketSize :: Int
140defaultBucketSize = 8
141
142data QueueMethods m elem fifo = QueueMethods
143 { pushBack :: elem -> fifo -> m fifo
144 , popFront :: fifo -> m (Maybe elem, fifo)
145 , emptyQueue :: m fifo
146 }
147
148{-
149fromQ :: Functor m =>
150 ( a -> b )
151 -> ( b -> a )
152 -> QueueMethods m elem a
153 -> QueueMethods m elem b
154fromQ embed project QueueMethods{..} =
155 QueueMethods { pushBack = \e -> fmap embed . pushBack e . project
156 , popFront = fmap (second embed) . popFront . project
157 , emptyQueue = fmap embed emptyQueue
158 }
159-}
160
161seqQ :: QueueMethods Identity ni (Seq.Seq ni)
162seqQ = QueueMethods
163 { pushBack = \e fifo -> pure (fifo Seq.|> e)
164 , popFront = \fifo -> case Seq.viewl fifo of
165 e Seq.:< fifo' -> pure (Just e, fifo')
166 Seq.EmptyL -> pure (Nothing, Seq.empty)
167 , emptyQueue = pure Seq.empty
168 }
169
170type BucketQueue ni = Seq.Seq ni
171
172bucketQ :: QueueMethods Identity ni (BucketQueue ni)
173bucketQ = seqQ
174
175
176data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int)
177
178contramapC :: (b -> a) -> Compare a -> Compare b
179contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b))
180 (\s x -> hsh s (f x))
181
182newtype Ordered' s a = Ordered a
183 deriving (Show)
184
185-- | Hack to avoid UndecidableInstances
186newtype Shrink a = Shrink a
187 deriving (Show)
188
189type Ordered s a = Ordered' s (Shrink a)
190
191instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where
192 a == b = (compare a b == EQ)
193
194instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where
195 compare a b = cmp (coerce a) (coerce b)
196 where Compare cmp _ = reflect (Proxy :: Proxy s)
197
198instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where
199 hashWithSalt salt x = hash salt (coerce x)
200 where Compare _ hash = reflect (Proxy :: Proxy s)
201
202-- | Bucket is also limited in its length — thus it's called k-bucket.
203-- When bucket becomes full, we should split it in two lists by
204-- current span bit. Span bit is defined by depth in the routing
205-- table tree. Size of the bucket should be choosen such that it's
206-- very unlikely that all nodes in bucket fail within an hour of
207-- each other.
208data Bucket s ni = Bucket
209 { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes
210 , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs
211 } deriving (Generic)
212
213#define CAN_SHOW_BUCKET 0
214
215#if CAN_SHOW_BUCKET
216deriving instance Show ni => Show (Bucket s ni)
217#endif
218
219bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni
220bucketCompare _ = reflect (Proxy :: Proxy s)
221
222mapBucket :: ( Reifies s (Compare a)
223 , Reifies t (Compare ni)
224 ) => (a -> ni) -> Bucket s a -> Bucket t ni
225mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns)
226 (fmap (second f) q)
227 where f' = coerce . f . coerce
228
229
230#if 0
231
232{-
233getGenericNode :: ( Serialize (NodeId)
234 , Serialize ip
235 , Serialize u
236 ) => Get (NodeInfo)
237getGenericNode = do
238 nid <- get
239 naddr <- get
240 u <- get
241 return NodeInfo
242 { nodeId = nid
243 , nodeAddr = naddr
244 , nodeAnnotation = u
245 }
246
247putGenericNode :: ( Serialize (NodeId)
248 , Serialize ip
249 , Serialize u
250 ) => NodeInfo -> Put
251putGenericNode (NodeInfo nid naddr u) = do
252 put nid
253 put naddr
254 put u
255
256instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where
257 get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ)
258 put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes
259-}
260
261#endif
262
263psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p
264psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs
265
266psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)]
267psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq
268
269-- | Update interval, in seconds.
270delta :: NominalDiffTime
271delta = 15 * 60
272
273-- | Should maintain a set of stable long running nodes.
274--
275-- Note: pings are triggerd only when a bucket is full.
276updateBucketForInbound curTime info bucket
277 -- Just update timestamp if a node is already in bucket.
278 --
279 -- Note PingResult events should only occur for nodes we requested a ping for,
280 -- and those will always already be in the routing queue and will get their
281 -- timestamp updated here, since 'TryInsert' is called on every inbound packet,
282 -- including ping results.
283 | already_have
284 = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime )
285 -- bucket is good, but not full => we can insert a new node
286 | PSQ.size (bktNodes bucket) < defaultBucketSize
287 = pure ( [], map_ns $ PSQ.insert (coerce info) curTime )
288 -- If there are any questionable nodes in the bucket have not been
289 -- seen in the last 15 minutes, the least recently seen node is
290 -- pinged. If any nodes in the bucket are known to have become bad,
291 -- then one is replaced by the new node in the next insertBucket
292 -- iteration.
293 | not (L.null stales)
294 = pure ( stales
295 , bucket { -- Update timestamps so that we don't redundantly ping.
296 bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket
297 -- Update queue with the pending NodeInfo in case of ping fail.
298 , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } )
299 -- When the bucket is full of good nodes, the new node is simply discarded.
300 -- We must return 'A.empty' here to ensure that bucket splitting happens
301 -- inside 'modifyBucket'.
302 | otherwise = A.empty
303 where
304 -- We (take 1) to keep a 1-to-1 correspondence between pending pings and
305 -- waiting nodes in the bktQ. This way, we don't have to worry about what
306 -- to do with failed pings for which there is no ready replacements.
307 stales = -- One stale:
308 do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket)
309 guard (t < curTime - delta)
310 return $ coerce n
311 -- All stale:
312 -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket
313
314 already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket)
315
316 map_ns f = bucket { bktNodes = f (bktNodes bucket) }
317 -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) }
318
319updateBucketForPingResult bad_node got_response bucket
320 = pure ( map (,Nothing) forgotten
321 ++ map (second Just) replacements
322 , Bucket (foldr replace
323 (bktNodes bucket)
324 replacements)
325 popped
326 )
327 where
328 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket)
329
330 -- Dropped from accepted, replaced by pending.
331 replacements | got_response = [] -- Timestamp was already updated by TryInsert.
332 | Just info <- top = do
333 -- Insert only if there's a removal.
334 _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket)
335 return (bad_node, info)
336 | otherwise = []
337
338 -- Dropped from the pending queue without replacing.
339 forgotten | got_response = maybeToList $ fmap snd top
340 | otherwise = []
341
342
343 replace (bad_node, (tm, info)) =
344 PSQ.insert (coerce info) tm
345 . PSQ.delete (coerce bad_node)
346
347
348updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp
349updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales
350
351type BitIx = Word
352
353partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b)
354partitionQ imp test q0 = do
355 pass0 <- emptyQueue imp
356 fail0 <- emptyQueue imp
357 let flipfix a b f = fix f a b
358 flipfix q0 (pass0,fail0) $ \rec q qs -> do
359 (mb,q') <- popFront imp q
360 case mb of
361 Nothing -> return qs
362 Just e -> do qs' <- select (pushBack imp e) qs
363 rec q' qs'
364 where
365 select :: Functor f => (b -> f b) -> (b, b) -> f (b, b)
366 select f = if test e then \(a,b) -> flip (,) b <$> f a
367 else \(a,b) -> (,) a <$> f b
368
369
370
371split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
372 forall ni s. ( Reifies s (Compare ni) ) =>
373 (ni -> Word -> Bool)
374 -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni)
375split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs)
376 where
377 (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b
378 (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b
379
380 spanBit :: ni -> Bool
381 spanBit entry = testNodeIdBit entry i
382
383
384{-----------------------------------------------------------------------
385-- BucketList
386-----------------------------------------------------------------------}
387
388defaultBucketCount :: Int
389defaultBucketCount = 20
390
391defaultMaxBucketCount :: Word
392defaultMaxBucketCount = 24
393
394data Info ni nid = Info
395 { myBuckets :: BucketList ni
396 , myNodeId :: nid
397 , myAddress :: SockAddr
398 }
399 deriving Generic
400
401deriving instance (Eq ni, Eq nid) => Eq (Info ni nid)
402deriving instance (Show ni, Show nid) => Show (Info ni nid)
403
404-- instance (Eq ip, Serialize ip) => Serialize (Info ip)
405
406-- | The routing table covers the entire 'NodeId' space from 0 to 2 ^
407-- 160. The routing table is subdivided into 'Bucket's that each cover
408-- a portion of the space. An empty table has one bucket with an ID
409-- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\"
410-- is inserted into the table, it is placed within the bucket that has
411-- @min <= N < max@. An empty table has only one bucket so any node
412-- must fit within it. Each bucket can only hold 'K' nodes, currently
413-- eight, before becoming 'Full'. When a bucket is full of known good
414-- nodes, no more nodes may be added unless our own 'NodeId' falls
415-- within the range of the 'Bucket'. In that case, the bucket is
416-- replaced by two new buckets each with half the range of the old
417-- bucket and the nodes from the old bucket are distributed among the
418-- two new ones. For a new table with only one bucket, the full bucket
419-- is always split into two new buckets covering the ranges @0..2 ^
420-- 159@ and @2 ^ 159..2 ^ 160@.
421--
422data BucketList ni = forall s. Reifies s (Compare ni) =>
423 BucketList { thisNode :: !ni
424 -- | Non-empty list of buckets.
425 , buckets :: [Bucket s ni]
426 }
427
428mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b
429mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts)
430 $ \p -> BucketList
431 { thisNode = f self
432 , buckets = map (resolve p . mapBucket f) bkts
433 }
434 where
435 resolve :: Proxy s -> Bucket s ni -> Bucket s ni
436 resolve = const id
437
438instance (Eq ni) => Eq (BucketList ni) where
439 (==) = (==) `on` Network.Kademlia.Routing.toList
440
441#if 0
442
443instance Serialize NominalDiffTime where
444 put = putWord32be . fromIntegral . fromEnum
445 get = (toEnum . fromIntegral) <$> getWord32be
446
447#endif
448
449#if CAN_SHOW_BUCKET
450deriving instance (Show ni) => Show (BucketList ni)
451#else
452instance Show ni => Show (BucketList ni) where
453 showsPrec d (BucketList self bkts) =
454 mappend "BucketList "
455 . showsPrec (d+1) self
456 . mappend " (fromList "
457 . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts)
458 . mappend ") "
459#endif
460
461#if 0
462
463-- | Normally, routing table should be saved between invocations of
464-- the client software. Note that you don't need to store /this/
465-- 'NodeId' since it is already included in routing table.
466instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList)
467
468#endif
469
470-- | Shape of the table.
471instance Pretty (BucketList ni) where
472 pPrint t
473 | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss
474 | otherwise = brackets $
475 PP.int (L.sum ss) <> " nodes, " <>
476 PP.int bucketCount <> " buckets"
477 where
478 bucketCount = L.length ss
479 ss = shape t
480
481-- | Empty table with specified /spine/ node id.
482--
483-- XXX: The comparison function argument is awkward here.
484nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni
485nullTable cmp hsh ni n =
486 reify (Compare cmp hsh)
487 $ \p -> BucketList
488 ni
489 [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)]
490 where
491 empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp
492 empty = const $ PSQ.empty
493
494#if 0
495
496-- | Test if table is empty. In this case DHT should start
497-- bootstrapping process until table becomes 'full'.
498null :: BucketList -> Bool
499null (Tip _ _ b) = PSQ.null $ bktNodes b
500null _ = False
501
502-- | Test if table have maximum number of nodes. No more nodes can be
503-- 'insert'ed, except old ones becomes bad.
504full :: BucketList -> Bool
505full (Tip _ n _) = n == 0
506full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t
507full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t
508
509-- | Get the /spine/ node id.
510thisId :: BucketList -> NodeId
511thisId (Tip nid _ _) = nid
512thisId (Zero table _) = thisId table
513thisId (One _ table) = thisId table
514
515-- | Number of nodes in a bucket or a table.
516type NodeCount = Int
517
518#endif
519
520-- | Internally, routing table is similar to list of buckets or a
521-- /matrix/ of nodes. This function returns the shape of the matrix.
522shape :: BucketList ni -> [Int]
523shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl
524
525#if 0
526
527-- | Get number of nodes in the table.
528size :: BucketList -> NodeCount
529size = L.sum . shape
530
531-- | Get number of buckets in the table.
532depth :: BucketList -> BucketCount
533depth = L.length . shape
534
535#endif
536
537lookupBucket :: forall ni nid x.
538 ( -- FiniteBits nid
539 Ord nid
540 ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x
541lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts
542 where
543 d = kademliaXor space nid (kademliaLocation space self)
544
545 go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni]
546 go i bs (bucket : buckets)
547 | kademliaTestBit space d i = bucket : buckets ++ bs
548 | otherwise = go (succ i) (bucket:bs) buckets
549 go _ bs [] = bs
550
551bucketNumber :: forall ni nid.
552 KademliaSpace nid ni -> nid -> BucketList ni -> Int
553bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts
554 where
555 d = kademliaXor space nid (kademliaLocation space self)
556
557 go :: Word -> [Bucket s ni] -> Word
558 go i (bucket : buckets)
559 | kademliaTestBit space d i = i
560 | otherwise = go (succ i) buckets
561 go i [] = i
562
563
564compatibleNodeId :: forall ni nid.
565 ( Serialize nid, FiniteBits nid) =>
566 (ni -> nid) -> BucketList ni -> IO nid
567compatibleNodeId nodeId tbl = genBucketSample prefix br
568 where
569 br = bucketRange (L.length (shape tbl) - 1) True
570 nodeIdSize = finiteBitSize (undefined :: nid) `div` 8
571 bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0
572 prefix = either error id $ S.decode bs
573
574tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8]
575tablePrefix testbit = map (packByte . take 8 . (++repeat False))
576 . chunksOf 8
577 . tableBits testbit
578 where
579 packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0]
580 bitmask ix True = bit ix
581 bitmask _ _ = 0
582
583tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool]
584tableBits testbit (BucketList self bkts) =
585 zipWith const (map (testbit self) [0..])
586 bkts
587
588selfNode :: BucketList ni -> ni
589selfNode (BucketList self _) = self
590
591chunksOf :: Int -> [e] -> [[e]]
592chunksOf i ls = map (take i) (build (splitter ls)) where
593 splitter :: [e] -> ([e] -> a -> a) -> a -> a
594 splitter [] _ n = n
595 splitter l c n = l `c` splitter (drop i l) c n
596
597build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a]
598build g = g (:) []
599
600
601
602-- | Count of closest nodes in find_node reply.
603type K = Int
604
605-- | Default 'K' is equal to 'defaultBucketSize'.
606defaultK :: K
607defaultK = 8
608
609#if 0
610class TableKey dht k where
611 toNodeId :: k -> NodeId
612
613instance TableKey dht (NodeId) where
614 toNodeId = id
615
616#endif
617
618-- | In Kademlia, the distance metric is XOR and the result is
619-- interpreted as an unsigned integer.
620newtype NodeDistance nodeid = NodeDistance nodeid
621 deriving (Eq, Ord)
622
623-- | distance(A,B) = |A xor B| Smaller values are closer.
624distance :: Bits nid => nid -> nid -> NodeDistance nid
625distance a b = NodeDistance $ xor a b
626
627-- | Order by closeness: nearest nodes first.
628rank :: ( Ord nid
629 ) => KademliaSpace nid ni -> nid -> [ni] -> [ni]
630rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space))
631
632
633-- | Get a list of /K/ closest nodes using XOR metric. Used in
634-- 'find_node' and 'get_peers' queries.
635kclosest :: ( -- FiniteBits nid
636 Ord nid
637 ) =>
638 KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni]
639kclosest space k nid tbl = take k $ rank space nid (L.concat bucket)
640 ++ rank space nid (L.concat everyone)
641 where
642 (bucket,everyone) =
643 L.splitAt 1
644 . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes))
645 $ tbl
646
647
648
649{-----------------------------------------------------------------------
650-- Routing
651-----------------------------------------------------------------------}
652
653splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
654 ( Reifies s (Compare ni) ) =>
655 (ni -> Word -> Bool)
656 -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ]
657splitTip testNodeBit ni i bucket
658 | testNodeBit ni i = [zeros , ones ]
659 | otherwise = [ones , zeros ]
660 where
661 (ones, zeros) = split testNodeBit i bucket
662
663-- | Used in each query.
664--
665-- TODO: Kademlia non-empty subtrees should should split if they have less than
666-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia
667-- paper. The rule requiring additional splits is in section 2.4.
668modifyBucket
669 :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
670 forall ni nid xs.
671 KademliaSpace nid ni
672 -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni)
673modifyBucket space nid f (BucketList self bkts)
674 = second (BucketList self) <$> go (0 :: BitIx) bkts
675 where
676 d = kademliaXor space nid (kademliaLocation space self)
677
678 -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni])
679
680 go !i (bucket : buckets@(_:_))
681 | kademliaTestBit space d i = second (: buckets) <$> f bucket
682 | otherwise = second (bucket :) <$> go (succ i) buckets
683
684 go !i [bucket] = second (: []) <$> f bucket <|> gosplit
685 where
686 gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space
687 . kademliaLocation space )
688 self
689 i
690 bucket)
691 | otherwise = Nothing -- Limit the number of buckets.
692
693
694bktCount :: BucketList ni -> Int
695bktCount (BucketList _ bkts) = L.length bkts
696
697-- | Triggering event for atomic table update
698data Event ni = TryInsert { foreignNode :: ni }
699 | PingResult { foreignNode :: ni , ponged :: Bool }
700
701#if 0
702deriving instance Eq (NodeId) => Eq (Event)
703deriving instance ( Show ip
704 , Show (NodeId)
705 , Show u
706 ) => Show (Event)
707
708#endif
709
710eventId :: (ni -> nid) -> Event ni -> nid
711eventId nodeId (TryInsert ni) = nodeId ni
712eventId nodeId (PingResult ni _) = nodeId ni
713
714
715-- | Actions requested by atomic table update
716data CheckPing ni = CheckPing [ni]
717
718#if 0
719
720deriving instance Eq (NodeId) => Eq (CheckPing)
721deriving instance ( Show ip
722 , Show (NodeId)
723 , Show u
724 ) => Show (CheckPing)
725
726#endif
727
728
729-- | Call on every inbound packet (including requested ping results).
730-- Returns a triple (was_inserted, to_ping, tbl') where
731--
732-- [ /was_inserted/ ] True if the node was added to the routing table.
733--
734-- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'.
735-- This will be empty if /was_inserted/, but a non-inserted node
736-- may be added to a replacement queue and will be inserted if
737-- one of the items in this list time out.
738--
739-- [ /tbl'/ ] The updated routing 'BucketList'.
740--
741updateForInbound ::
742 KademliaSpace nid ni
743 -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni)
744updateForInbound space tm ni tbl@(BucketList _ bkts) =
745 maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl'))
746 $ modifyBucket space
747 (kademliaLocation space ni)
748 (updateBucketForInbound tm ni)
749 tbl
750
751-- | Update the routing table with the results of a ping.
752--
753-- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the
754-- routing table and the node /b/, with timestamp /tm/, has taken its place.
755updateForPingResult ::
756 KademliaSpace nid ni
757 -> ni -- ^ The pinged node.
758 -> Bool -- ^ True if we got a reply, False if it timed out.
759 -> BucketList ni -- ^ The routing table.
760 -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni )
761updateForPingResult space ni got_reply tbl =
762 fromMaybe ([],tbl)
763 $ modifyBucket space
764 (kademliaLocation space ni)
765 (updateBucketForPingResult ni got_reply)
766 tbl
767
768
769{-----------------------------------------------------------------------
770-- Conversion
771-----------------------------------------------------------------------}
772
773type TableEntry ni = (ni, Timestamp)
774
775tableEntry :: NodeEntry ni -> TableEntry ni
776tableEntry (a :-> b) = (a, b)
777
778toList :: BucketList ni -> [[TableEntry ni]]
779toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts
780
781data KademliaSpace nid ni = KademliaSpace
782 { -- | Given a node record (probably including IP address), yields a
783 -- kademlia xor-metric location.
784 kademliaLocation :: ni -> nid
785 -- | Used when comparing locations. This is similar to
786 -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so
787 -- that 0 is the most significant bit.
788 , kademliaTestBit :: nid -> Word -> Bool
789 -- | The Kademlia xor-metric.
790 , kademliaXor :: nid -> nid -> nid
791
792 , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid
793 }
794
795contramapKS f ks = ks
796 { kademliaLocation = kademliaLocation ks . f
797 }
798
diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs
index 195bed14..71107fbd 100644
--- a/src/Network/Kademlia/Search.hs
+++ b/src/Network/Kademlia/Search.hs
@@ -26,7 +26,7 @@ import qualified Data.MinMaxPSQ as MM
26import qualified Data.Wrapper.PSQ as PSQ 26import qualified Data.Wrapper.PSQ as PSQ
27 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) 27 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey)
28import Network.Address hiding (NodeId) 28import Network.Address hiding (NodeId)
29import Network.DHT.Routing as R 29import Network.Kademlia.Routing as R
30#ifdef THREAD_DEBUG 30#ifdef THREAD_DEBUG
31import Control.Concurrent.Lifted.Instrument 31import Control.Concurrent.Lifted.Instrument
32#else 32#else