diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
deleted file mode 100644
index 1324ae77..00000000
--- a/src/Network/Kademlia/Bootstrap.hs
+++ /dev/null
@@ -1,437 +0,0 @@
2{-# LANGUAGE ConstraintKinds #-}
3{-# LANGUAGE DeriveFunctor #-}
4{-# LANGUAGE DeriveTraversable #-}
5{-# LANGUAGE FlexibleContexts #-}
7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
10{-# LANGUAGE PartialTypeSignatures #-}
11{-# LANGUAGE PatternSynonyms #-}
12{-# LANGUAGE RankNTypes #-}
13{-# LANGUAGE ScopedTypeVariables #-}
14module Network.Kademlia.Bootstrap where
16import Data.Function
17import Data.Maybe
18import qualified Data.Set as Set
19import Data.Time.Clock.POSIX (getPOSIXTime)
20import Network.Kademlia.Routing as R
22import Control.Concurrent.Lifted.Instrument
24import Control.Concurrent.Lifted
25import GHC.Conc (labelThread)
27import Control.Concurrent.STM
28import Control.Monad
29import Data.Hashable
30import Data.Time.Clock.POSIX (POSIXTime)
31import Data.Ord
32import System.Entropy
33import System.Timeout
34import DPut
35import DebugTag
37import qualified Data.Wrapper.PSQInt as Int
38 ;import Data.Wrapper.PSQInt (pattern (:->))
39import Network.Address (bucketRange)
40import Network.Kademlia.Search
41import Control.Concurrent.Tasks
42import Network.Kademlia
44type SensibleNodeId nid ni =
45 ( Show nid
46 , Ord nid
47 , Ord ni
48 , Hashable nid
49 , Hashable ni )
51data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
52 { -- | A staleness threshold (if a bucket goes this long without being
53 -- touched, a refresh will be triggered).
54 refreshInterval :: POSIXTime
55 -- | A TVar with the time-to-refresh schedule for each bucket.
56 --
57 -- To "touch" a bucket and prevent it from being refreshed, reschedule
58 -- its refresh time to some time into the future by modifying its
59 -- priority in this priority search queue.
60 , refreshQueue :: TVar (Int.PSQ POSIXTime)
61 -- | This is the kademlia node search specification.
62 , refreshSearch :: Search nid addr tok ni ni
63 -- | The current kademlia routing table buckets.
64 , refreshBuckets :: TVar (R.BucketList ni)
65 -- | Action to ping a node. This is used only during initial bootstrap
66 -- to get some nodes in our table. A 'True' result is interpreted as a a
67 -- pong, where 'False' is a non-response.
68 , refreshPing :: ni -> IO Bool
69 , -- | Timestamp of last bucket event.
70 refreshLastTouch :: TVar POSIXTime
71 , -- | This variable indicates whether or not we are in bootstrapping mode.
72 bootstrapMode :: TVar Bool
73 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on
74 -- every finished refresh.
75 bootstrapCountdown :: TVar (Maybe Int)
76 }
78newBucketRefresher :: ( Ord addr, Hashable addr
79 , SensibleNodeId nid ni )
80 => TVar (R.BucketList ni)
81 -> Search nid addr tok ni ni
82 -> (ni -> IO Bool)
83 -> STM (BucketRefresher nid ni)
84newBucketRefresher bkts sch ping = do
85 let spc = searchSpace sch
86 nodeId = kademliaLocation spc
87 -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
88 sched <- newTVar Int.empty
89 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
90 bootstrapVar <- newTVar True -- Start in bootstrapping mode.
91 bootstrapCnt <- newTVar Nothing
92 return BucketRefresher
93 { refreshInterval = 15 * 60
94 , refreshQueue = sched
95 , refreshSearch = sch
96 , refreshBuckets = bkts
97 , refreshPing = ping
98 , refreshLastTouch = lasttouch
99 , bootstrapMode = bootstrapVar
100 , bootstrapCountdown = bootstrapCnt
101 }
103-- | This was added to avoid the compile error "Record update for
104-- insufficiently polymorphic field" when trying to update the existentially
105-- quantified field 'refreshSearch'.
106updateRefresherIO :: Ord addr
107 => Search nid addr tok ni ni
108 -> (ni -> IO Bool)
109 -> BucketRefresher nid ni -> BucketRefresher nid ni
110updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
111 { refreshSearch = sch
112 , refreshPing = ping
113 , refreshInterval = refreshInterval
114 , refreshBuckets = refreshBuckets
115 , refreshQueue = refreshQueue
116 , refreshLastTouch = refreshLastTouch
117 , bootstrapMode = bootstrapMode
118 , bootstrapCountdown = bootstrapCountdown
119 }
121-- | Fork a refresh loop. Kill the returned thread to terminate it.
122forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
123forkPollForRefresh r@BucketRefresher{ refreshInterval
124 , refreshQueue
125 , refreshBuckets
126 , refreshSearch } = fork $ do
127 myThreadId >>= flip labelThread "pollForRefresh"
128 fix $ \again -> do
129 join $ atomically $ do
130 nextup <- Int.findMin <$> readTVar refreshQueue
131 maybe retry (return . go again) nextup
132 where
133 refresh :: Int -> IO Int
134 refresh n = do
135 -- dput XRefresh $ "Refresh time! "++ show n
136 refreshBucket r n
138 go again ( bktnum :-> refresh_time ) = do
139 now <- getPOSIXTime
140 case fromEnum (refresh_time - now) of
141 x | x <= 0 -> do -- Refresh time!
142 -- Move it to the back of the refresh queue.
143 atomically $ do
144 interval <- effectiveRefreshInterval r bktnum
145 modifyTVar' refreshQueue
146 $ Int.insert bktnum (now + interval)
147 -- Now fork the refresh operation.
148 -- TODO: We should probably propogate the kill signal to this thread.
149 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
150 _ <- refresh bktnum
151 return ()
152 return ()
153 picoseconds -> do
154 -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum
155 threadDelay ( picoseconds `div` 10^6 )
156 again
159-- | This is a helper to 'refreshBucket' which does some book keeping to decide
160-- whether or not a bucket is sufficiently refreshed or not. It will return
161-- false when we can terminate a node search.
162checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
163 -> TVar (BucketList ni) -- ^ The current routing table.
164 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
165 -> TVar Bool -- ^ The result will also be written here.
166 -> Int -- ^ The bucket number of interest.
167 -> ni -- ^ A newly found node.
168 -> STM Bool
169checkBucketFull space var resultCounter fin n found_node = do
170 let fullcount = R.defaultBucketSize
171 saveit True = writeTVar fin True >> return True
172 saveit _ = return False
173 tbl <- readTVar var
174 let counts = R.shape tbl
175 nid = kademliaLocation space found_node
176 -- Update the result set with every found node that is in the
177 -- bucket of interest.
178 when (n == R.bucketNumber space nid tbl)
179 $ modifyTVar' resultCounter (Set.insert found_node)
180 resultCount <- readTVar resultCounter
181 saveit $ case drop (n - 1) counts of
182 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
183 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
184 _ -> False -- okay, good enough, let's quit.
186-- | Called from 'refreshBucket' with the current time when a refresh of the
187-- supplied bucket number finishes.
188onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ())
189onFinishedRefresh BucketRefresher { bootstrapCountdown
190 , bootstrapMode
191 , refreshQueue
192 , refreshBuckets } num now = do
193 bootstrapping <- readTVar bootstrapMode
194 if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num
195 else do
196 tbl <- readTVar refreshBuckets
197 action <-
198 if num /= R.bktCount tbl - 1
199 then do modifyTVar' bootstrapCountdown (fmap pred)
200 return $ return () -- dput XRefresh $ "BOOTSTRAP decrement"
201 else do
202 -- The last bucket finished.
203 cnt <- readTVar bootstrapCountdown
204 case cnt of
205 Nothing -> do
206 let fullsize = R.defaultBucketSize
207 notfull (n,len) | n==num = False
208 | len>=fullsize = False
209 | otherwise = True
210 unfull = case filter notfull $ zip [0..] (R.shape tbl) of
211 [] -> [(0,0)] -- Schedule at least 1 more refresh.
212 xs -> xs
213 forM_ unfull $ \(n,_) -> do
214 -- Schedule immediate refresh for unfull buckets (other than this one).
215 modifyTVar' refreshQueue $ Int.insert n (now - 1)
216 writeTVar bootstrapCountdown $! Just $! length unfull
217 return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull
218 Just n -> do writeTVar bootstrapCountdown $! Just $! pred n
219 return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)"
220 cnt <- readTVar bootstrapCountdown
221 if (cnt == Just 0)
222 then do
223 -- Boostrap finished!
224 writeTVar bootstrapMode False
225 writeTVar bootstrapCountdown Nothing
226 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
227 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
229refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
230 BucketRefresher nid ni -> Int -> IO Int
231refreshBucket r@BucketRefresher{ refreshSearch = sch
232 , refreshBuckets = var }
233 n = do
234 tbl <- atomically (readTVar var)
235 let count = bktCount tbl
236 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
237 sample <- if n+1 >= count -- Is this the last bucket?
238 then return nid -- Yes? Search our own id.
239 else kademliaSample (searchSpace sch) -- No? Generate a random id.
240 getEntropy
241 nid
242 (bucketRange n (n + 1 < count))
243 fin <- atomically $ newTVar False
244 resultCounter <- atomically $ newTVar Set.empty
246 dput XRefresh $ "Start refresh " ++ show (n,sample)
248 -- Set 15 minute timeout in order to avoid overlapping refreshes.
249 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
250 then const $ return True -- Never short-circuit the last bucket.
251 else checkBucketFull (searchSpace sch) var resultCounter fin n
252 _ <- timeout (15*60*1000000) $ do
253 atomically $ searchIsFinished s >>= check
254 atomically $ searchCancel s
255 dput XDHT $ "Finish refresh " ++ show (n,sample)
256 now <- getPOSIXTime
257 join $ atomically $ onFinishedRefresh r n now
258 rcount <- atomically $ do
259 c <- Set.size <$> readTVar resultCounter
260 b <- readTVar fin
261 return $ if b then 1 else c
262 return rcount
264refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ()
265refreshLastBucket r@BucketRefresher { refreshBuckets
266 , refreshQueue } = do
268 now <- getPOSIXTime
269 atomically $ do
270 cnt <- bktCount <$> readTVar refreshBuckets
271 -- Schedule immediate refresh.
272 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
274restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
275 BucketRefresher nid ni -> STM (IO ())
276restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
277 unchanged <- readTVar bootstrapMode
278 writeTVar bootstrapMode True
279 writeTVar bootstrapCountdown Nothing
280 if not unchanged then return $ do
281 dput XRefresh "BOOTSTRAP entered bootstrap mode"
282 refreshLastBucket r
283 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
285bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
286 BucketRefresher nid ni
287 -> t1 ni -- ^ Nodes to bootstrap from.
288 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
289 -> IO ()
290bootstrap r@BucketRefresher { refreshSearch = sch
291 , refreshBuckets = var
292 , refreshPing = ping
293 , bootstrapMode } ns ns0 = do
294 gotPing <- atomically $ newTVar False
296 -- First, ping the given nodes so that they are added to
297 -- our routing table.
298 withTaskGroup "bootstrap.resume" 20 $ \g -> do
299 forM_ ns $ \n -> do
300 let lbl = show $ kademliaLocation (searchSpace sch) n
301 forkTask g lbl $ do
302 b <- ping n
303 when b $ atomically $ writeTVar gotPing True
305 -- We resort to the hardcoded fallback nodes only when we got no
306 -- responses. This is to lesson the burden on well-known boostrap
307 -- nodes.
308 fallback <- atomically (readTVar gotPing) >>= return . when . not
309 fallback $ withTaskGroup "" 20 $ \g -> do
310 forM_ ns0 $ \n -> do
311 forkTask g (show $ kademliaLocation (searchSpace sch) n)
312 (void $ ping n)
313 dput XDHT "Finished bootstrap pings."
314 -- Now search our own Id by entering bootstrap mode from non-bootstrap mode.
315 join $ atomically $ do
316 writeTVar bootstrapMode False
317 restartBootstrap r
318 --
319 -- Hopefully 'forkPollForRefresh' was invoked and can take over
320 -- maintenance.
323effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime
324effectiveRefreshInterval BucketRefresher{ refreshInterval
325 , refreshBuckets
326 , bootstrapMode } num = do
327 tbl <- readTVar refreshBuckets
328 bootstrapping <- readTVar bootstrapMode
329 case bootstrapping of
330 False -> return refreshInterval
331 True -> do
332 -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds.
333 let fullcount = R.defaultBucketSize
334 count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl
335 if count == fullcount
336 then return refreshInterval
337 else return 15 -- seconds
341-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
342-- changes. This will typically be invoked from 'tblTransition'.
344-- From BEP 05:
346-- > Each bucket should maintain a "last changed" property to indicate how
347-- > "fresh" the contents are.
349-- We will use a "time to next refresh" property instead and store it in
350-- a priority search queue.
352-- In detail using an expository (not actually implemented) type
353-- 'BucketTouchEvent'...
355-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
356-- >>> bucketEvents =
357-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
358-- >>>
359-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
360-- >>>
361-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
362-- >>> , Applicant :--> Accepted -- with another node,
363-- >>> ]
365-- the bucket's last changed property should be updated. Buckets that have not
366-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
367-- This is done by picking a random ID in the range of the bucket and
368-- performing a find_nodes search on it.
370-- The only other possible BucketTouchEvents are as follows:
372-- >>> not_handled =
373-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
374-- >>> -- (Applicant :--> Stranger)
375-- >>> -- (Applicant :--> Accepted)
376-- >>> , Accepted :--> Applicant -- Never happens
377-- >>> ]
379-- Because this BucketTouchEvent type is not actually implemented and we only
380-- receive notifications of a node's new state, it suffices to reschedule the
381-- bucket refresh 'touchBucket' on every transition to a state other than
382-- 'Applicant'.
384-- XXX: Unfortunately, this means redundantly triggering twice upon every node
385-- replacement because we do not currently distinguish between standalone
386-- insertion/deletion events and an insertion/deletion pair constituting
387-- replacement.
389-- It might also be better to pass the timestamp of the transition here and
390-- keep the refresh queue in better sync with the routing table by updating it
391-- within the STM monad.
393-- We embed the result in the STM monad but currently, no STM state changes
394-- occur until the returned IO action is invoked. TODO: simplify?
395touchBucket :: SensibleNodeId nid ni
396 => BucketRefresher nid ni
397 -> RoutingTransition ni -- ^ What happened to the bucket?
398 -> STM (IO ())
399touchBucket r@BucketRefresher{ refreshSearch
400 , refreshInterval
401 , refreshBuckets
402 , refreshQueue
403 , refreshLastTouch
404 , bootstrapMode
405 , bootstrapCountdown }
406 RoutingTransition{ transitionedTo
407 , transitioningNode }
408 = case transitionedTo of
409 Applicant -> return $ return () -- Ignore transition to applicant.
410 _ -> return $ do -- Reschedule for any other transition.
411 now <- getPOSIXTime
412 join $ atomically $ do
413 let space = searchSpace refreshSearch
414 nid = kademliaLocation space transitioningNode
415 tbl <- readTVar refreshBuckets
416 let num = R.bucketNumber space nid tbl
417 stamp <- readTVar refreshLastTouch
418 action <- case stamp /= 0 && (now - stamp > 60) of
419 True -> do
420 -- It's been one minute since any bucket has been touched, re-enter bootstrap mode.
421 restartBootstrap r
422 False -> return $ return ()
423 interval <- effectiveRefreshInterval r num
424 modifyTVar' refreshQueue $ Int.insert num (now + interval)
425 writeTVar refreshLastTouch now
426 return action
428refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni
429refreshKademlia r@BucketRefresher { refreshSearch = sch
430 , refreshPing = ping
431 , refreshBuckets = bkts
432 }
433 = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping)
434 { tblTransition = \tr -> do
435 io <- touchBucket r tr
436 return io
437 }
diff --git a/src/Network/Kademlia/CommonAPI.hs b/src/Network/Kademlia/CommonAPI.hs
deleted file mode 100644
index 601be5d8..00000000
--- a/src/Network/Kademlia/CommonAPI.hs
+++ /dev/null
@@ -1,84 +0,0 @@
1{-# LANGUAGE ExistentialQuantification #-}
2module Network.Kademlia.CommonAPI where
5import Control.Concurrent
6import Control.Concurrent.STM
7import Data.Aeson as J (FromJSON, ToJSON)
8import Data.Hashable
9import qualified Data.Map as Map
10import Data.Serialize as S
11import qualified Data.Set as Set
12import Data.Time.Clock.POSIX
13import Data.Typeable
15import Network.Kademlia.Search
16import Network.Kademlia.Routing as R
17import Crypto.Tox (SecretKey,PublicKey)
19data DHT = forall nid ni. ( Show ni
20 , Read ni
21 , ToJSON ni
22 , FromJSON ni
23 , Ord ni
24 , Hashable ni
25 , Show nid
26 , Ord nid
27 , Hashable nid
28 , Typeable ni
29 , S.Serialize nid
30 ) =>
31 DHT
32 { dhtBuckets :: TVar (BucketList ni)
33 , dhtSecretKey :: STM (Maybe SecretKey)
34 , dhtPing :: Map.Map String (DHTPing ni)
35 , dhtQuery :: Map.Map String (DHTQuery nid ni)
36 , dhtAnnouncables :: Map.Map String (DHTAnnouncable nid)
37 , dhtParseId :: String -> Either String nid
38 , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni))
39 , dhtFallbackNodes :: IO [ni]
40 , dhtBootstrap :: [ni] -> [ni] -> IO ()
41 }
43data DHTQuery nid ni = forall addr r tok.
44 ( Ord addr
45 , Typeable r
46 , Typeable tok
47 , Typeable ni
48 ) => DHTQuery
49 { qsearch :: Search nid addr tok ni r
50 , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination.
51 , qshowR :: r -> String
52 , qshowTok :: tok -> Maybe String
53 }
55data DHTAnnouncable nid = forall dta tok ni r.
56 ( Show r
57 , Typeable dta -- information being announced
58 , Typeable tok -- token
59 , Typeable r -- search result
60 , Typeable ni -- node
61 ) => DHTAnnouncable
62 { announceParseData :: String -> Either String dta
63 , announceParseToken :: dta -> String -> Either String tok
64 , announceParseAddress :: String -> Either String ni
65 , announceSendData :: Either ( String {- search name -}
66 , String -> Either String r
67 , PublicKey {- me -} -> dta -> r -> IO ())
68 (dta -> tok -> Maybe ni -> IO (Maybe r))
69 , announceInterval :: POSIXTime
70 , announceTarget :: dta -> nid
71 }
73data DHTSearch nid ni = forall addr tok r. DHTSearch
74 { searchThread :: ThreadId
75 , searchState :: SearchState nid addr tok ni r
76 , searchShowTok :: tok -> Maybe String
77 , searchResults :: TVar (Set.Set String)
78 }
80data DHTPing ni = forall r. DHTPing
81 { pingQuery :: [String] -> ni -> IO (Maybe r)
82 , pingShowResult :: r -> String
83 }
diff --git a/src/Network/Kademlia/Persistence.hs b/src/Network/Kademlia/Persistence.hs
deleted file mode 100644
index d7431671..00000000
--- a/src/Network/Kademlia/Persistence.hs
+++ /dev/null
@@ -1,51 +0,0 @@
1{-# LANGUAGE NamedFieldPuns #-}
2module Network.Kademlia.Persistence where
4import Network.Kademlia.CommonAPI
5import Network.Kademlia.Routing as R
7import Control.Concurrent.STM
8import qualified Data.Aeson as J
9 ;import Data.Aeson as J (FromJSON)
10import qualified Data.ByteString.Lazy as L
11import qualified Data.HashMap.Strict as HashMap
12import Data.List
13import qualified Data.Vector as V
14import System.IO.Error
16saveNodes :: String -> DHT -> IO ()
17saveNodes netname DHT{dhtBuckets} = do
18 bkts <- atomically $ readTVar dhtBuckets
19 let ns = map fst $ concat $ R.toList bkts
20 bs = J.encode ns
21 fname = nodesFileName netname
22 L.writeFile fname bs
24loadNodes :: FromJSON ni => String -> IO [ni]
25loadNodes netname = do
26 let fname = nodesFileName netname
27 attempt <- tryIOError $ do
28 J.decode <$> L.readFile fname
29 >>= maybe (ioError $ userError "Nothing") return
30 either (const $ fallbackLoad fname) return attempt
32nodesFileName :: String -> String
33nodesFileName netname = netname ++ "-nodes.json"
35fallbackLoad :: FromJSON t => FilePath -> IO [t]
36fallbackLoad fname = do
37 attempt <- tryIOError $ do
38 J.decode <$> L.readFile fname
39 >>= maybe (ioError $ userError "Nothing") return
40 let go r = do
41 let m = HashMap.lookup "nodes" (r :: J.Object)
42 ns0 = case m of Just (J.Array v) -> V.toList v
43 Nothing -> []
44 ns1 = zip (map J.fromJSON ns0) ns0
45 issuc (J.Error _,_) = False
46 issuc _ = True
47 (ss,fs) = partition issuc ns1
48 ns = map (\(J.Success n,_) -> n) ss
49 mapM_ print (map snd fs) >> return ns
50 either (const $ return []) go attempt
diff --git a/src/Network/Kademlia/Routing.hs b/src/Network/Kademlia/Routing.hs
deleted file mode 100644
index a52cca73..00000000
--- a/src/Network/Kademlia/Routing.hs
+++ /dev/null
@@ -1,808 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- (c) Joe Crayne 2017
4-- License : BSD3
5-- Maintainer :
6-- Stability : experimental
7-- Portability : portable
9-- Every node maintains a routing table of known good nodes. The
10-- nodes in the routing table are used as starting points for
11-- queries in the DHT. Nodes from the routing table are returned in
12-- response to queries from other nodes.
14-- For more info see:
15-- <>
17{-# LANGUAGE CPP #-}
18{-# LANGUAGE RecordWildCards #-}
19{-# LANGUAGE BangPatterns #-}
20{-# LANGUAGE RankNTypes #-}
21{-# LANGUAGE ViewPatterns #-}
22{-# LANGUAGE TypeOperators #-}
23{-# LANGUAGE DeriveGeneric #-}
24{-# LANGUAGE DeriveFunctor #-}
25{-# LANGUAGE GADTs #-}
26{-# LANGUAGE ScopedTypeVariables #-}
27{-# LANGUAGE TupleSections #-}
28{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-}
29{-# OPTIONS_GHC -fno-warn-orphans #-}
30module Network.Kademlia.Routing
31 {-
32 ( -- * BucketList
33 BucketList
34 , Info(..)
36 -- * Attributes
37 , BucketCount
38 , defaultBucketCount
39 , BucketSize
40 , defaultBucketSize
41 , NodeCount
43 -- * Query
44 , Network.Kademlia.Routing.null
45 , Network.Kademlia.Routing.full
46 , thisId
47 , shape
48 , Network.Kademlia.Routing.size
49 , Network.Kademlia.Routing.depth
50 , compatibleNodeId
52 -- * Lookup
53 , K
54 , defaultK
55 , TableKey (..)
56 , kclosest
58 -- * Construction
59 , Network.Kademlia.Routing.nullTable
60 , Event(..)
61 , CheckPing(..)
62 , Network.Kademlia.Routing.insert
64 -- * Conversion
65 , Network.Kademlia.Routing.TableEntry
66 , Network.Kademlia.Routing.toList
68 -- * Routing
69 , Timestamp
70 , getTimestamp
71 ) -} where
73import Control.Applicative as A
74import Control.Arrow
75import Control.Monad
76import Data.Function
77import Data.Functor.Contravariant
78import Data.Functor.Identity
79import Data.List as L hiding (insert)
80import Data.Maybe
81import Data.Monoid
82import Data.Wrapper.PSQ as PSQ
83import Data.Serialize as S hiding (Result, Done)
84import qualified Data.Sequence as Seq
85import Data.Time
86import Data.Time.Clock.POSIX
87import Data.Word
88import GHC.Generics
89import Text.PrettyPrint as PP hiding ((<>))
90import Text.PrettyPrint.HughesPJClass (pPrint,Pretty)
91import qualified Data.ByteString as BS
92import Data.Bits
93import Data.Ord
94import Data.Reflection
95import Network.Address
96import Data.Typeable
97import Data.Coerce
98import Data.Hashable
101-- | Last time the node was responding to our queries.
103-- Not all nodes that we learn about are equal. Some are \"good\" and
104-- some are not. Many nodes using the DHT are able to send queries
105-- and receive responses, but are not able to respond to queries
106-- from other nodes. It is important that each node's routing table
107-- must contain only known good nodes. A good node is a node has
108-- responded to one of our queries within the last 15 minutes. A
109-- node is also good if it has ever responded to one of our queries
110-- and has sent us a query within the last 15 minutes. After 15
111-- minutes of inactivity, a node becomes questionable. Nodes become
112-- bad when they fail to respond to multiple queries in a row. Nodes
113-- that we know are good are given priority over nodes with unknown
114-- status.
116type Timestamp = POSIXTime
118getTimestamp :: IO Timestamp
119getTimestamp = do
120 utcTime <- getCurrentTime
121 return $ utcTimeToPOSIXSeconds utcTime
126 Bucket
129-- When a k-bucket is full and a new node is discovered for that
130-- k-bucket, the least recently seen node in the k-bucket is
131-- PINGed. If the node is found to be still alive, the new node is
132-- place in a secondary list, a replacement cache. The replacement
133-- cache is used only if a node in the k-bucket stops responding. In
134-- other words: new nodes are used only when older nodes disappear.
136-- | Timestamp - last time this node is pinged.
137type NodeEntry ni = Binding ni Timestamp
140-- | Maximum number of 'NodeInfo's stored in a bucket. Most clients
141-- use this value.
142defaultBucketSize :: Int
143defaultBucketSize = 8
145data QueueMethods m elem fifo = QueueMethods
146 { pushBack :: elem -> fifo -> m fifo
147 , popFront :: fifo -> m (Maybe elem, fifo)
148 , emptyQueue :: m fifo
149 }
152fromQ :: Functor m =>
153 ( a -> b )
154 -> ( b -> a )
155 -> QueueMethods m elem a
156 -> QueueMethods m elem b
157fromQ embed project QueueMethods{..} =
158 QueueMethods { pushBack = \e -> fmap embed . pushBack e . project
159 , popFront = fmap (second embed) . popFront . project
160 , emptyQueue = fmap embed emptyQueue
161 }
164seqQ :: QueueMethods Identity ni (Seq.Seq ni)
165seqQ = QueueMethods
166 { pushBack = \e fifo -> pure (fifo Seq.|> e)
167 , popFront = \fifo -> case Seq.viewl fifo of
168 e Seq.:< fifo' -> pure (Just e, fifo')
169 Seq.EmptyL -> pure (Nothing, Seq.empty)
170 , emptyQueue = pure Seq.empty
171 }
173type BucketQueue ni = Seq.Seq ni
175bucketQ :: QueueMethods Identity ni (BucketQueue ni)
176bucketQ = seqQ
179data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int)
181contramapC :: (b -> a) -> Compare a -> Compare b
182contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b))
183 (\s x -> hsh s (f x))
185newtype Ordered' s a = Ordered a
186 deriving (Show)
188-- | Hack to avoid UndecidableInstances
189newtype Shrink a = Shrink a
190 deriving (Show)
192type Ordered s a = Ordered' s (Shrink a)
194instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where
195 a == b = (compare a b == EQ)
197instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where
198 compare a b = cmp (coerce a) (coerce b)
199 where Compare cmp _ = reflect (Proxy :: Proxy s)
201instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where
202 hashWithSalt salt x = hash salt (coerce x)
203 where Compare _ hash = reflect (Proxy :: Proxy s)
205-- | Bucket is also limited in its length — thus it's called k-bucket.
206-- When bucket becomes full, we should split it in two lists by
207-- current span bit. Span bit is defined by depth in the routing
208-- table tree. Size of the bucket should be choosen such that it's
209-- very unlikely that all nodes in bucket fail within an hour of
210-- each other.
211data Bucket s ni = Bucket
212 { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes
213 , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs
214 } deriving (Generic)
216#define CAN_SHOW_BUCKET 0
219deriving instance Show ni => Show (Bucket s ni)
222bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni
223bucketCompare _ = reflect (Proxy :: Proxy s)
225mapBucket :: ( Reifies s (Compare a)
226 , Reifies t (Compare ni)
227 ) => (a -> ni) -> Bucket s a -> Bucket t ni
228mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns)
229 (fmap (second f) q)
230 where f' = coerce . f . coerce
233#if 0
236getGenericNode :: ( Serialize (NodeId)
237 , Serialize ip
238 , Serialize u
239 ) => Get (NodeInfo)
240getGenericNode = do
241 nid <- get
242 naddr <- get
243 u <- get
244 return NodeInfo
245 { nodeId = nid
246 , nodeAddr = naddr
247 , nodeAnnotation = u
248 }
250putGenericNode :: ( Serialize (NodeId)
251 , Serialize ip
252 , Serialize u
253 ) => NodeInfo -> Put
254putGenericNode (NodeInfo nid naddr u) = do
255 put nid
256 put naddr
257 put u
259instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where
260 get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ)
261 put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes
266psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p
267psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs
269psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)]
270psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq
272-- | Update interval, in seconds.
273delta :: NominalDiffTime
274delta = 15 * 60
276-- | Should maintain a set of stable long running nodes.
278-- Note: pings are triggerd only when a bucket is full.
279updateBucketForInbound :: ( Coercible t1 t
280 , Alternative f
281 , Reifies s (Compare t1)
282 ) => NominalDiffTime -> t1 -> Bucket s t1 -> f ([t], Bucket s t1)
283updateBucketForInbound curTime info bucket
284 -- Just update timestamp if a node is already in bucket.
285 --
286 -- Note PingResult events should only occur for nodes we requested a ping for,
287 -- and those will always already be in the routing queue and will get their
288 -- timestamp updated here, since 'TryInsert' is called on every inbound packet,
289 -- including ping results.
290 | already_have
291 = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime )
292 -- bucket is good, but not full => we can insert a new node
293 | PSQ.size (bktNodes bucket) < defaultBucketSize
294 = pure ( [], map_ns $ PSQ.insert (coerce info) curTime )
295 -- If there are any questionable nodes in the bucket have not been
296 -- seen in the last 15 minutes, the least recently seen node is
297 -- pinged. If any nodes in the bucket are known to have become bad,
298 -- then one is replaced by the new node in the next insertBucket
299 -- iteration.
300 | not (L.null stales)
301 = pure ( stales
302 , bucket { -- Update timestamps so that we don't redundantly ping.
303 bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket
304 -- Update queue with the pending NodeInfo in case of ping fail.
305 , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } )
306 -- When the bucket is full of good nodes, the new node is simply discarded.
307 -- We must return 'A.empty' here to ensure that bucket splitting happens
308 -- inside 'modifyBucket'.
309 | otherwise = A.empty
310 where
311 -- We (take 1) to keep a 1-to-1 correspondence between pending pings and
312 -- waiting nodes in the bktQ. This way, we don't have to worry about what
313 -- to do with failed pings for which there is no ready replacements.
314 stales = -- One stale:
315 do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket)
316 guard (t < curTime - delta)
317 return $ coerce n
318 -- All stale:
319 -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket
321 already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket)
323 map_ns f = bucket { bktNodes = f (bktNodes bucket) }
324 -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) }
326updateBucketForPingResult :: (Applicative f, Reifies s (Compare a)) =>
327 a -> Bool -> Bucket s a -> f ([(a, Maybe (Timestamp, a))], Bucket s a)
328updateBucketForPingResult bad_node got_response bucket
329 = pure ( map (,Nothing) forgotten
330 ++ map (second Just) replacements
331 , Bucket (foldr replace
332 (bktNodes bucket)
333 replacements)
334 popped
335 )
336 where
337 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket)
339 -- Dropped from accepted, replaced by pending.
340 replacements | got_response = [] -- Timestamp was already updated by TryInsert.
341 | Just info <- top = do
342 -- Insert only if there's a removal.
343 _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket)
344 return (bad_node, info)
345 | otherwise = []
347 -- Dropped from the pending queue without replacing.
348 forgotten | got_response = maybeToList $ fmap snd top
349 | otherwise = []
352 replace (bad_node, (tm, info)) =
353 PSQ.insert (coerce info) tm
354 . PSQ.delete (coerce bad_node)
357updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp
358updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales
360type BitIx = Word
362partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b)
363partitionQ imp test q0 = do
364 pass0 <- emptyQueue imp
365 fail0 <- emptyQueue imp
366 let flipfix a b f = fix f a b
367 flipfix q0 (pass0,fail0) $ \rec q qs -> do
368 (mb,q') <- popFront imp q
369 case mb of
370 Nothing -> return qs
371 Just e -> do qs' <- select (pushBack imp e) qs
372 rec q' qs'
373 where
374 select :: Functor f => (b -> f b) -> (b, b) -> f (b, b)
375 select f = if test e then \(a,b) -> flip (,) b <$> f a
376 else \(a,b) -> (,) a <$> f b
380split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
381 forall ni s. ( Reifies s (Compare ni) ) =>
382 (ni -> Word -> Bool)
383 -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni)
384split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs)
385 where
386 (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b
387 (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b
389 spanBit :: ni -> Bool
390 spanBit entry = testNodeIdBit entry i
394-- BucketList
397defaultBucketCount :: Int
398defaultBucketCount = 20
400defaultMaxBucketCount :: Word
401defaultMaxBucketCount = 24
403data Info ni nid = Info
404 { myBuckets :: BucketList ni
405 , myNodeId :: nid
406 , myAddress :: SockAddr
407 }
408 deriving Generic
410deriving instance (Eq ni, Eq nid) => Eq (Info ni nid)
411deriving instance (Show ni, Show nid) => Show (Info ni nid)
413-- instance (Eq ip, Serialize ip) => Serialize (Info ip)
415-- | The routing table covers the entire 'NodeId' space from 0 to 2 ^
416-- 160. The routing table is subdivided into 'Bucket's that each cover
417-- a portion of the space. An empty table has one bucket with an ID
418-- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\"
419-- is inserted into the table, it is placed within the bucket that has
420-- @min <= N < max@. An empty table has only one bucket so any node
421-- must fit within it. Each bucket can only hold 'K' nodes, currently
422-- eight, before becoming 'Full'. When a bucket is full of known good
423-- nodes, no more nodes may be added unless our own 'NodeId' falls
424-- within the range of the 'Bucket'. In that case, the bucket is
425-- replaced by two new buckets each with half the range of the old
426-- bucket and the nodes from the old bucket are distributed among the
427-- two new ones. For a new table with only one bucket, the full bucket
428-- is always split into two new buckets covering the ranges @0..2 ^
429-- 159@ and @2 ^ 159..2 ^ 160@.
431data BucketList ni = forall s. Reifies s (Compare ni) =>
432 BucketList { thisNode :: !ni
433 -- | Non-empty list of buckets.
434 , buckets :: [Bucket s ni]
435 }
437mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b
438mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts)
439 $ \p -> BucketList
440 { thisNode = f self
441 , buckets = map (resolve p . mapBucket f) bkts
442 }
443 where
444 resolve :: Proxy s -> Bucket s ni -> Bucket s ni
445 resolve = const id
447instance (Eq ni) => Eq (BucketList ni) where
448 (==) = (==) `on` Network.Kademlia.Routing.toList
450#if 0
452instance Serialize NominalDiffTime where
453 put = putWord32be . fromIntegral . fromEnum
454 get = (toEnum . fromIntegral) <$> getWord32be
459deriving instance (Show ni) => Show (BucketList ni)
461instance Show ni => Show (BucketList ni) where
462 showsPrec d (BucketList self bkts) =
463 mappend "BucketList "
464 . showsPrec (d+1) self
465 . mappend " (fromList "
466 . showsPrec (d+1) ( ( tableEntry . PSQ.toList . bktNodes) $ bkts)
467 . mappend ") "
470#if 0
472-- | Normally, routing table should be saved between invocations of
473-- the client software. Note that you don't need to store /this/
474-- 'NodeId' since it is already included in routing table.
475instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList)
479-- | Shape of the table.
480instance Pretty (BucketList ni) where
481 pPrint t
482 | bucketCount < 6 = hcat $ punctuate ", " $ ss
483 | otherwise = brackets $
484 (L.sum ss) <> " nodes, " <>
485 bucketCount <> " buckets"
486 where
487 bucketCount = L.length ss
488 ss = shape t
490-- | Empty table with specified /spine/ node id.
492-- XXX: The comparison function argument is awkward here.
493nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni
494nullTable cmp hsh ni n =
495 reify (Compare cmp hsh)
496 $ \p -> BucketList
497 ni
498 [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)]
499 where
500 empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp
501 empty = const $ PSQ.empty
503#if 0
505-- | Test if table is empty. In this case DHT should start
506-- bootstrapping process until table becomes 'full'.
507null :: BucketList -> Bool
508null (Tip _ _ b) = PSQ.null $ bktNodes b
509null _ = False
511-- | Test if table have maximum number of nodes. No more nodes can be
512-- 'insert'ed, except old ones becomes bad.
513full :: BucketList -> Bool
514full (Tip _ n _) = n == 0
515full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t
516full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t
518-- | Get the /spine/ node id.
519thisId :: BucketList -> NodeId
520thisId (Tip nid _ _) = nid
521thisId (Zero table _) = thisId table
522thisId (One _ table) = thisId table
524-- | Number of nodes in a bucket or a table.
525type NodeCount = Int
529-- | Internally, routing table is similar to list of buckets or a
530-- /matrix/ of nodes. This function returns the shape of the matrix.
531shape :: BucketList ni -> [Int]
532shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl
534#if 0
536-- | Get number of nodes in the table.
537size :: BucketList -> NodeCount
538size = L.sum . shape
540-- | Get number of buckets in the table.
541depth :: BucketList -> BucketCount
542depth = L.length . shape
546lookupBucket :: forall ni nid x.
547 ( -- FiniteBits nid
548 Ord nid
549 ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x
550lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts
551 where
552 d = kademliaXor space nid (kademliaLocation space self)
554 go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni]
555 go i bs (bucket : buckets)
556 | kademliaTestBit space d i = bucket : buckets ++ bs
557 | otherwise = go (succ i) (bucket:bs) buckets
558 go _ bs [] = bs
560bucketNumber :: forall ni nid.
561 KademliaSpace nid ni -> nid -> BucketList ni -> Int
562bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts
563 where
564 d = kademliaXor space nid (kademliaLocation space self)
566 go :: Word -> [Bucket s ni] -> Word
567 go i (bucket : buckets)
568 | kademliaTestBit space d i = i
569 | otherwise = go (succ i) buckets
570 go i [] = i
573compatibleNodeId :: forall ni nid.
574 ( Serialize nid, FiniteBits nid) =>
575 (ni -> nid) -> BucketList ni -> IO nid
576compatibleNodeId nodeId tbl = genBucketSample prefix br
577 where
578 br = bucketRange (L.length (shape tbl) - 1) True
579 nodeIdSize = finiteBitSize (undefined :: nid) `div` 8
580 bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0
581 prefix = either error id $ S.decode bs
583tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8]
584tablePrefix testbit = map (packByte . take 8 . (++repeat False))
585 . chunksOf 8
586 . tableBits testbit
587 where
588 packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0]
589 bitmask ix True = bit ix
590 bitmask _ _ = 0
592tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool]
593tableBits testbit (BucketList self bkts) =
594 zipWith const (map (testbit self) [0..])
595 bkts
597selfNode :: BucketList ni -> ni
598selfNode (BucketList self _) = self
600chunksOf :: Int -> [e] -> [[e]]
601chunksOf i ls = map (take i) (build (splitter ls)) where
602 splitter :: [e] -> ([e] -> a -> a) -> a -> a
603 splitter [] _ n = n
604 splitter l c n = l `c` splitter (drop i l) c n
606build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a]
607build g = g (:) []
611-- | Count of closest nodes in find_node reply.
612type K = Int
614-- | Default 'K' is equal to 'defaultBucketSize'.
615defaultK :: K
616defaultK = 8
618#if 0
619class TableKey dht k where
620 toNodeId :: k -> NodeId
622instance TableKey dht (NodeId) where
623 toNodeId = id
627-- | In Kademlia, the distance metric is XOR and the result is
628-- interpreted as an unsigned integer.
629newtype NodeDistance nodeid = NodeDistance nodeid
630 deriving (Eq, Ord)
632-- | distance(A,B) = |A xor B| Smaller values are closer.
633distance :: Bits nid => nid -> nid -> NodeDistance nid
634distance a b = NodeDistance $ xor a b
636-- | Order by closeness: nearest nodes first.
637rank :: ( Ord nid
638 ) => KademliaSpace nid ni -> nid -> [ni] -> [ni]
639rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space))
642-- | Get a list of /K/ closest nodes using XOR metric. Used in
643-- 'find_node' and 'get_peers' queries.
644kclosest :: ( -- FiniteBits nid
645 Ord nid
646 ) =>
647 KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni]
648kclosest space k nid tbl = take k $ rank space nid (L.concat bucket)
649 ++ rank space nid (L.concat everyone)
650 where
651 (bucket,everyone) =
652 L.splitAt 1
653 . lookupBucket space nid ( (coerce . PSQ.key . PSQ.toList . bktNodes))
654 $ tbl
659-- Routing
662splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
663 ( Reifies s (Compare ni) ) =>
664 (ni -> Word -> Bool)
665 -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ]
666splitTip testNodeBit ni i bucket
667 | testNodeBit ni i = [zeros , ones ]
668 | otherwise = [ones , zeros ]
669 where
670 (ones, zeros) = split testNodeBit i bucket
672-- | Used in each query.
674-- TODO: Kademlia non-empty subtrees should should split if they have less than
675-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia
676-- paper. The rule requiring additional splits is in section 2.4.
678 :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
679 forall ni nid xs.
680 KademliaSpace nid ni
681 -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni)
682modifyBucket space nid f (BucketList self bkts)
683 = second (BucketList self) <$> go (0 :: BitIx) bkts
684 where
685 d = kademliaXor space nid (kademliaLocation space self)
687 -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni])
689 go !i (bucket : buckets@(_:_))
690 | kademliaTestBit space d i = second (: buckets) <$> f bucket
691 | otherwise = second (bucket :) <$> go (succ i) buckets
693 go !i [bucket] = second (: []) <$> f bucket <|> gosplit
694 where
695 gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space
696 . kademliaLocation space )
697 self
698 i
699 bucket)
700 | otherwise = Nothing -- Limit the number of buckets.
703bktCount :: BucketList ni -> Int
704bktCount (BucketList _ bkts) = L.length bkts
706-- | Triggering event for atomic table update
707data Event ni = TryInsert { foreignNode :: ni }
708 | PingResult { foreignNode :: ni , ponged :: Bool }
710#if 0
711deriving instance Eq (NodeId) => Eq (Event)
712deriving instance ( Show ip
713 , Show (NodeId)
714 , Show u
715 ) => Show (Event)
719eventId :: (ni -> nid) -> Event ni -> nid
720eventId nodeId (TryInsert ni) = nodeId ni
721eventId nodeId (PingResult ni _) = nodeId ni
724-- | Actions requested by atomic table update
725data CheckPing ni = CheckPing [ni]
727#if 0
729deriving instance Eq (NodeId) => Eq (CheckPing)
730deriving instance ( Show ip
731 , Show (NodeId)
732 , Show u
733 ) => Show (CheckPing)
738-- | Call on every inbound packet (including requested ping results).
739-- Returns a triple (was_inserted, to_ping, tbl') where
741-- [ /was_inserted/ ] True if the node was added to the routing table.
743-- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'.
744-- This will be empty if /was_inserted/, but a non-inserted node
745-- may be added to a replacement queue and will be inserted if
746-- one of the items in this list time out.
748-- [ /tbl'/ ] The updated routing 'BucketList'.
750updateForInbound ::
751 KademliaSpace nid ni
752 -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni)
753updateForInbound space tm ni tbl@(BucketList _ bkts) =
754 maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl'))
755 $ modifyBucket space
756 (kademliaLocation space ni)
757 (updateBucketForInbound tm ni)
758 tbl
760-- | Update the routing table with the results of a ping.
762-- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the
763-- routing table and the node /b/, with timestamp /tm/, has taken its place.
764updateForPingResult ::
765 KademliaSpace nid ni
766 -> ni -- ^ The pinged node.
767 -> Bool -- ^ True if we got a reply, False if it timed out.
768 -> BucketList ni -- ^ The routing table.
769 -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni )
770updateForPingResult space ni got_reply tbl =
771 fromMaybe ([],tbl)
772 $ modifyBucket space
773 (kademliaLocation space ni)
774 (updateBucketForPingResult ni got_reply)
775 tbl
779-- Conversion
782type TableEntry ni = (ni, Timestamp)
784tableEntry :: NodeEntry ni -> TableEntry ni
785tableEntry (a :-> b) = (a, b)
787toList :: BucketList ni -> [[TableEntry ni]]
788toList (BucketList _ bkts) = coerce $ ( tableEntry . PSQ.toList . bktNodes) bkts
790data KademliaSpace nid ni = KademliaSpace
791 { -- | Given a node record (probably including IP address), yields a
792 -- kademlia xor-metric location.
793 kademliaLocation :: ni -> nid
794 -- | Used when comparing locations. This is similar to
795 -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so
796 -- that 0 is the most significant bit.
797 , kademliaTestBit :: nid -> Word -> Bool
798 -- | The Kademlia xor-metric.
799 , kademliaXor :: nid -> nid -> nid
801 , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid
802 }
804instance Contravariant (KademliaSpace nid) where
805 contramap f ks = ks
806 { kademliaLocation = kademliaLocation ks . f
807 }
diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs
deleted file mode 100644
index 1be1afc1..00000000
--- a/src/Network/Kademlia/Search.hs
+++ /dev/null
@@ -1,236 +0,0 @@
2{-# LANGUAGE PatternSynonyms #-}
3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE LambdaCase #-}
7module Network.Kademlia.Search where
9import Control.Concurrent.Tasks
10import Control.Concurrent.STM
11import Control.Monad
12import Data.Function
13import Data.Maybe
14import qualified Data.Set as Set
15 ;import Data.Set (Set)
16import Data.Hashable (Hashable(..)) -- for type sigs
17import System.IO.Error
19import qualified Data.MinMaxPSQ as MM
20 ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ')
21import qualified Data.Wrapper.PSQ as PSQ
22 ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey)
23import Network.Kademlia.Routing as R
25import Control.Concurrent.Lifted.Instrument
27import Control.Concurrent.Lifted
28import GHC.Conc (labelThread)
31data Search nid addr tok ni r = Search
32 { searchSpace :: KademliaSpace nid ni
33 , searchNodeAddress :: ni -> addr
34 , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok)))
35 (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ())
36 , searchAlpha :: Int -- α = 8
37 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
38 -- how fast the queries are. For Tox's much slower onion-routed queries, we
39 -- need to ensure that closer non-responding queries don't completely push out
40 -- farther away queries.
41 --
42 -- For BitTorrent, setting them both 8 was not an issue, but that is no longer
43 -- supported because now the number of remembered informants is now the
44 -- difference between these two numbers. So, if searchK = 16 and searchAlpha =
45 -- 4, then the number of remembered query responses is 12.
46 , searchK :: Int -- K = 16
47 }
49data SearchState nid addr tok ni r = SearchState
50 { -- | The number of pending queries. Incremented before any query is sent
51 -- and decremented when we get a reply.
52 searchPendingCount :: TVar Int
53 -- | Nodes scheduled to be queried (roughly at most K).
54 , searchQueued :: TVar (MinMaxPSQ ni nid)
55 -- | The nearest (K - α) nodes that issued a reply.
56 --
57 -- α is the maximum number of simultaneous queries.
58 , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok))
59 -- | This tracks already-queried addresses so we avoid bothering them
60 -- again. XXX: We could probably keep only the pending queries in this
61 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
62 -- should limit the number of outstanding queries.
63 , searchVisited :: TVar (Set addr)
64 , searchSpec :: Search nid addr tok ni r
65 }
68newSearch :: ( Ord addr
69 , PSQKey nid
70 , PSQKey ni
71 ) =>
72 {-
73 KademliaSpace nid ni
74 -> (ni -> addr)
75 -> (ni -> IO ([ni], [r])) -- the query action.
76 -> (r -> STM Bool) -- receives search results.
77 -> nid -- target of search
78 -}
79 Search nid addr tok ni r
80 -> nid
81 -> [ni] -- Initial nodes to query.
82 -> STM (SearchState nid addr tok ni r)
83newSearch s@(Search space nAddr qry _ _) target ns = do
84 c <- newTVar 0
85 q <- newTVar $ MM.fromList
86 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
87 $ ns
88 i <- newTVar MM.empty
89 v <- newTVar Set.empty
90 return -- (Search space nAddr qry) , r , target
91 ( SearchState c q i v s )
93-- | Discard a value from a key-priority-value tuple. This is useful for
94-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ".
95stripValue :: Binding' k p v -> Binding k p
96stripValue (Binding ni _ nid) = (ni :-> nid)
98-- | Reset a 'SearchState' object to ready it for a repeated search.
99reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
100 (nid -> STM [ni])
101 -> Search nid addr1 tok1 ni r1
102 -> nid
103 -> SearchState nid addr tok ni r
104 -> STM (SearchState nid addr tok ni r)
105reset nearestNodes qsearch target st = do
106 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
107 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
108 <$> nearestNodes target
109 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st)
110 writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes
111 writeTVar (searchInformant st) MM.empty
112 writeTVar (searchVisited st) Set.empty
113 writeTVar (searchPendingCount st) 0
114 return st
116sendAsyncQuery :: forall addr nid tok ni r.
117 ( Ord addr
118 , PSQKey nid
119 , PSQKey ni
120 , Show nid
121 ) =>
122 Search nid addr tok ni r
123 -> nid
124 -> (r -> STM Bool) -- ^ return False to terminate the search.
125 -> SearchState nid addr tok ni r
126 -> Binding ni nid
127 -> TaskGroup
128 -> IO ()
129sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g =
130 case searchQuery of
131 Left blockingQuery ->
132 forkTask g "searchQuery" $ do
133 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
134 reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing)
135 atomically $ do
136 modifyTVar searchPendingCount pred
137 maybe (return ()) go reply
138 Right nonblockingQuery -> do
139 nonblockingQuery searchTarget ni $ \reply ->
140 atomically $ do
141 modifyTVar searchPendingCount pred
142 maybe (return ()) go reply
143 where
144 go (ns,rs,tok) = do
145 vs <- readTVar searchVisited
146 -- We only queue a node if it is not yet visited
147 let insertFoundNode :: Int
148 -> ni
149 -> MinMaxPSQ ni nid
150 -> MinMaxPSQ ni nid
151 insertFoundNode k n q
152 | searchNodeAddress n `Set.member` vs
153 = q
154 | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget
155 $ kademliaLocation searchSpace n )
156 q
158 qsize0 <- MM.size <$> readTVar searchQueued
159 let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow
160 -- only when there's fewer than
161 -- K elements.
162 modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns
163 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
164 flip fix rs $ \loop -> \case
165 r:rs' -> do
166 wanting <- searchResult r
167 if wanting then loop rs'
168 else searchCancel sch
169 [] -> return ()
172searchIsFinished :: ( PSQKey nid
173 , PSQKey ni
174 ) => SearchState nid addr tok ni r -> STM Bool
175searchIsFinished SearchState{..} = do
176 q <- readTVar searchQueued
177 cnt <- readTVar searchPendingCount
178 informants <- readTVar searchInformant
179 return $ cnt == 0
180 && ( MM.null q
181 || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec)
182 && ( PSQ.prio (fromJust $ MM.findMax informants)
183 <= PSQ.prio (fromJust $ MM.findMin q))))
185searchCancel :: SearchState nid addr tok ni r -> STM ()
186searchCancel SearchState{..} = do
187 writeTVar searchPendingCount 0
188 writeTVar searchQueued MM.empty
190search ::
191 ( Ord r
192 , Ord addr
193 , PSQKey nid
194 , PSQKey ni
195 , Show nid
196 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r)
197search sch buckets target result = do
198 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
199 st <- atomically $ newSearch sch target ns
200 forkIO $ searchLoop sch target result st
201 return st
203searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni )
204 => Search nid addr tok ni r -- ^ Query and distance methods.
205 -> nid -- ^ The target we are searching for.
206 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
207 -> SearchState nid addr tok ni r -- ^ Search-related state.
208 -> IO ()
209searchLoop sch@Search{..} target result s@SearchState{..} = do
210 myThreadId >>= flip labelThread ("search."++show target)
211 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do
212 join $ atomically $ do
213 cnt <- readTVar $ searchPendingCount
214 check (cnt <= 8) -- Only 8 pending queries at a time.
215 informants <- readTVar searchInformant
216 found <- MM.minView <$> readTVar searchQueued
217 case found of
218 Just (ni :-> d, q)
219 | -- If there's fewer than /k - α/ informants and there's any
220 -- node we haven't yet got a response from.
221 (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q))
222 -- Or there's no informants yet at all.
223 || MM.null informants
224 -- Or if the closest scheduled node is nearer than the
225 -- nearest /k/ informants.
226 || (d < PSQ.prio (fromJust $ MM.findMax informants))
227 -> -- Then the search continues, send a query.
228 do writeTVar searchQueued q
229 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
230 modifyTVar searchPendingCount succ
231 return $ do
232 sendAsyncQuery sch target result s (ni :-> d) g
233 again
234 _ -> -- Otherwise, we are finished.
235 do check (cnt == 0)
236 return $ return ()