summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-06 05:18:04 -0500
committerjoe <joe@jerkface.net>2017-11-08 02:30:43 -0500
commit70a96073db817b19e98d058702b1a8aa3d4b8445 (patch)
tree83414727033ad1fb66ea6289a20495b275a4e13c /src/Network/Kademlia
parent6749c25eb6bf544ebef51817049c922030e8369d (diff)
Bootstrapping rework in progress.
Diffstat (limited to 'src/Network/Kademlia')
-rw-r--r--src/Network/Kademlia/Bootstrap.hs227
1 files changed, 127 insertions, 100 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
index 283e054a..42bff665 100644
--- a/src/Network/Kademlia/Bootstrap.hs
+++ b/src/Network/Kademlia/Bootstrap.hs
@@ -1,11 +1,14 @@
1{-# LANGUAGE CPP #-} 1{-# LANGUAGE CPP #-}
2{-# LANGUAGE ConstraintKinds #-}
2{-# LANGUAGE DeriveFunctor #-} 3{-# LANGUAGE DeriveFunctor #-}
3{-# LANGUAGE DeriveTraversable #-} 4{-# LANGUAGE DeriveTraversable #-}
4{-# LANGUAGE FlexibleContexts #-} 5{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GADTs #-} 6{-# LANGUAGE GADTs #-}
6{-# LANGUAGE KindSignatures #-} 7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE NamedFieldPuns #-}
7{-# LANGUAGE PartialTypeSignatures #-} 9{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE PatternSynonyms #-} 10{-# LANGUAGE PatternSynonyms #-}
11{-# LANGUAGE RankNTypes #-}
9{-# LANGUAGE ScopedTypeVariables #-} 12{-# LANGUAGE ScopedTypeVariables #-}
10module Network.Kademlia.Bootstrap where 13module Network.Kademlia.Bootstrap where
11 14
@@ -42,34 +45,90 @@ import Network.Kademlia.Search
42import Control.Concurrent.Tasks 45import Control.Concurrent.Tasks
43import Network.Kademlia 46import Network.Kademlia
44 47
45 48-- From BEP 05:
46-- | > pollForRefresh interval queue refresh 49--
50-- Each bucket should maintain a "last changed" property to indicate how
51-- "fresh" the contents are.
52--
53-- Note: We will use a "time to next refresh" property instead and store it in
54-- a priority search queue.
55--
56-- When...
57--
58-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
59-- >>> bucketEvents =
60-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
61-- >>>
62-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
63-- >>>
64-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
65-- >>> , Applicant :--> Accepted -- with another node,
66-- >>> ]
67--
68-- the bucket's last changed property should be updated. Buckets
69-- that have not been changed in 15 minutes should be "refreshed." This is done
70-- by picking a random ID in the range of the bucket and performing a
71-- find_nodes search on it.
72--
73-- The only other possible BucketTouchEvents are as follows:
47-- 74--
48-- Fork a refresh loop. Kill the returned thread to terminate it. The 75-- >>> not_handled =
49-- arguments are: a staleness threshold (if a bucket goes this long without 76-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
50-- being touched, a refresh will be triggered), a TVar with the time-to-refresh 77-- >>> -- (Applicant :--> Stranger)
51-- schedule for each bucket, and a refresh action to be forked when a bucket 78-- >>> -- (Applicant :--> Accepted)
52-- excedes the staleness threshold. 79-- >>> , Accepted :--> Applicant -- Never happens
80-- >>> ]
53-- 81--
54-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's 82
55-- refresh time to some time into the future by modifying the 'Int.PSQ' in the 83type SensibleNodeId nid ni =
56-- TVar. 84 ( Show nid
57forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId 85 , Ord nid
58forkPollForRefresh interval psq refresh = do 86 , Ord ni
59 fork $ do 87 , Hashable nid
88 , Hashable ni )
89
90data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
91 { -- | A staleness threshold (if a bucket goes this long without being
92 -- touched, a refresh will be triggered).
93 refreshInterval :: POSIXTime
94 -- | A TVar with the time-to-refresh schedule for each bucket.
95 --
96 -- To "touch" a bucket and prevent it from being refreshed, reschedule
97 -- it's refresh time to some time into the future by modifying the
98 -- 'Int.PSQ' in the TVar. (See 'touchBucket').
99 , refreshQueue :: TVar (Int.PSQ POSIXTime)
100 -- | This is the kademlia node search specification.
101 , refreshSearch :: Search nid addr tok ni ni
102 -- | The current kademlia routing table buckets.
103 , refreshBuckets :: TVar (R.BucketList ni)
104 -- | Action to ping a node. This is used only during initial bootstrap
105 -- to get some nodes in our table. A 'True' result is interpreted as a a
106 -- pong, where 'False' is a non-response.
107 , refreshPing :: ni -> IO Bool
108 }
109
110-- | Fork a refresh loop. Kill the returned thread to terminate it.
111forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
112forkPollForRefresh BucketRefresher{ refreshInterval
113 , refreshQueue
114 , refreshBuckets
115 , refreshSearch } = fork $ do
60 myThreadId >>= flip labelThread "pollForRefresh" 116 myThreadId >>= flip labelThread "pollForRefresh"
61 fix $ \again -> do 117 fix $ \again -> do
62 join $ atomically $ do 118 join $ atomically $ do
63 nextup <- Int.findMin <$> readTVar psq 119 nextup <- Int.findMin <$> readTVar refreshQueue
64 maybe retry (return . go again) nextup 120 maybe retry (return . go again) nextup
65 where 121 where
122 refresh :: Int -> IO Int
123 refresh = refreshBucket refreshSearch refreshBuckets
124
66 go again ( bktnum :-> refresh_time ) = do 125 go again ( bktnum :-> refresh_time ) = do
67 now <- getPOSIXTime 126 now <- getPOSIXTime
68 case fromEnum (refresh_time - now) of 127 case fromEnum (refresh_time - now) of
69 x | x <= 0 -> do -- Refresh time! 128 x | x <= 0 -> do -- Refresh time!
70 -- Move it to the back of the refresh queue. 129 -- Move it to the back of the refresh queue.
71 atomically $ modifyTVar' psq 130 atomically $ modifyTVar' refreshQueue
72 $ Int.insert bktnum (now + interval) 131 $ Int.insert bktnum (now + refreshInterval)
73 -- Now fork the refresh operation. 132 -- Now fork the refresh operation.
74 -- TODO: We should probably propogate the kill signal to this thread. 133 -- TODO: We should probably propogate the kill signal to this thread.
75 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) 134 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
@@ -79,15 +138,37 @@ forkPollForRefresh interval psq refresh = do
79 seconds -> threadDelay ( seconds * 1000000 ) 138 seconds -> threadDelay ( seconds * 1000000 )
80 again 139 again
81 140
82refreshBucket :: forall nid tok ni addr. 141
83 ( Show nid 142-- | This is a helper to 'refreshBucket' which does some book keeping to decide
84 , Serialize nid 143-- whether or not a bucket is sufficiently refreshed or not. It will return
85 , Ord nid 144-- false when we can terminate a node search.
86 , Ord ni 145checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
87 , Ord addr 146 -> TVar (BucketList ni) -- ^ The current routing table.
88 , Hashable nid 147 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
89 , Hashable ni ) => 148 -> TVar Bool -- ^ The result will also be written here.
90 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int 149 -> Int -- ^ The bucket number of interest.
150 -> ni -- ^ A newly found node.
151 -> STM Bool
152checkBucketFull space var resultCounter fin n found_node = do
153 let fullcount = R.defaultBucketSize
154 saveit True = writeTVar fin True >> return True
155 saveit _ = return False
156 tbl <- readTVar var
157 let counts = R.shape tbl
158 nid = kademliaLocation space found_node
159 -- Update the result set with every found node that is in the
160 -- bucket of interest.
161 when (n == R.bucketNumber space nid tbl)
162 $ modifyTVar' resultCounter (Set.insert found_node)
163 resultCount <- readTVar resultCounter
164 saveit $ case drop (n - 1) counts of
165 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
166 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
167 _ -> False -- okay, good enough, let's quit.
168
169
170refreshBucket :: (Hashable a, Hashable t, Ord t, Ord addr, Ord a, Show t) =>
171 Search t addr tok a a -> TVar (BucketList a) -> Int -> IO Int
91refreshBucket sch var n = do 172refreshBucket sch var n = do
92 tbl <- atomically (readTVar var) 173 tbl <- atomically (readTVar var)
93 let count = bktCount tbl 174 let count = bktCount tbl
@@ -100,27 +181,13 @@ refreshBucket sch var n = do
100 (bucketRange n (n + 1 < count)) 181 (bucketRange n (n + 1 < count))
101 fin <- atomically $ newTVar False 182 fin <- atomically $ newTVar False
102 resultCounter <- atomically $ newTVar Set.empty 183 resultCounter <- atomically $ newTVar Set.empty
103 let fullcount = R.defaultBucketSize
104 saveit True = writeTVar fin True >> return True
105 saveit _ = return False
106 checkBucketFull :: ni -> STM Bool
107 checkBucketFull found_node = do
108 tbl <- readTVar var
109 let counts = R.shape tbl
110 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
111 $ modifyTVar' resultCounter (Set.insert found_node)
112 resultCount <- readTVar resultCounter
113 saveit $ case drop (n - 1) counts of
114 (cnt:_) | cnt < fullcount -> True
115 _ | Set.size resultCount < fullcount -> True
116 _ -> False
117 184
118 hPutStrLn stderr $ "Start refresh " ++ show (n,sample) 185 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
119 186
120 -- Set 15 minute timeout in order to avoid overlapping refreshes. 187 -- Set 15 minute timeout in order to avoid overlapping refreshes.
121 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount 188 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
122 then const $ return True -- Never short-circuit the last bucket. 189 then const $ return True -- Never short-circuit the last bucket.
123 else checkBucketFull 190 else checkBucketFull (searchSpace sch) var resultCounter fin n
124 _ <- timeout (15*60*1000000) $ do 191 _ <- timeout (15*60*1000000) $ do
125 atomically $ searchIsFinished s >>= check 192 atomically $ searchIsFinished s >>= check
126 atomically $ searchCancel s 193 atomically $ searchCancel s
@@ -131,20 +198,11 @@ refreshBucket sch var n = do
131 return $ if b then 1 else c 198 return $ if b then 1 else c
132 return rcount 199 return rcount
133 200
134 201bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
135bootstrap :: 202 BucketRefresher nid ni -> t1 ni -> t ni -> IO ()
136 ( Show nid 203bootstrap BucketRefresher { refreshSearch = sch
137 , Serialize nid 204 , refreshBuckets = var
138 -- , FiniteBits nid 205 , refreshPing = ping } ns ns0 = do
139 , Hashable ni
140 , Hashable nid
141 , Ord ni
142 , Ord addr
143 , Ord nid
144 , Traversable t1
145 , Traversable t
146 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
147bootstrap sch var ping ns ns0 = do
148 gotPing <- atomically $ newTVar False 206 gotPing <- atomically $ newTVar False
149 207
150 -- First, ping the given nodes so that they are added to 208 -- First, ping the given nodes so that they are added to
@@ -193,6 +251,9 @@ bootstrap sch var ping ns ns0 = do
193 cnt <- refreshBucket sch var num 251 cnt <- refreshBucket sch var num
194 again cnt 252 again cnt
195 253
254-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
255-- changes. This will typically be invoked from 'tblTransition'.
256--
196-- XXX: This will be redundantly triggered twice upon every node replacement 257-- XXX: This will be redundantly triggered twice upon every node replacement
197-- because we do not currently distinguish between standalone 258-- because we do not currently distinguish between standalone
198-- insertion/deletion events and an insertion/deletion pair constituting 259-- insertion/deletion events and an insertion/deletion pair constituting
@@ -201,55 +262,21 @@ bootstrap sch var ping ns ns0 = do
201-- It might also be better to pass the timestamp of the transition here and 262-- It might also be better to pass the timestamp of the transition here and
202-- keep the refresh queue in better sync with the routing table by updating it 263-- keep the refresh queue in better sync with the routing table by updating it
203-- within the STM monad. 264-- within the STM monad.
204touchBucket :: KademliaSpace nid ni 265touchBucket :: BucketRefresher nid ni
205 -> POSIXTime 266 -> RoutingTransition ni -- ^ What happened to the bucket?
206 -> TVar (BucketList ni)
207 -> TVar (Int.PSQ POSIXTime)
208 -> RoutingTransition ni
209 -> STM (IO ()) 267 -> STM (IO ())
210touchBucket space interval bkts psq tr 268touchBucket BucketRefresher{ refreshSearch
269 , refreshInterval
270 , refreshBuckets
271 , refreshQueue
272 }
273 tr
211 | (transitionedTo tr == Applicant) 274 | (transitionedTo tr == Applicant)
212 = return $ return () 275 = return $ return ()
213 | otherwise = return $ do 276 | otherwise = return $ do
214 now <- getPOSIXTime 277 now <- getPOSIXTime
215 atomically $ do 278 atomically $ do
216 let nid = kademliaLocation space (transitioningNode tr) 279 let space = searchSpace refreshSearch
217 num <- R.bucketNumber space nid <$> readTVar bkts 280 nid = kademliaLocation space (transitioningNode tr)
218 modifyTVar' psq $ Int.insert num (now + interval) 281 num <- R.bucketNumber space nid <$> readTVar refreshBuckets
219 282 modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval)
220-- TODO: Bootstrap/Refresh
221--
222-- From BEP 05:
223--
224-- Each bucket should maintain a "last changed" property to indicate how
225-- "fresh" the contents are.
226--
227-- Note: We will use a "time to next refresh" property instead and store it in
228-- a priority search queue.
229--
230-- When...
231--
232-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
233-- >>> bucketEvents =
234-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
235-- >>>
236-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
237-- >>>
238-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
239-- >>> , Applicant :--> Accepted -- with another node,
240-- >>> ]
241--
242-- the bucket's last changed property should be updated. Buckets
243-- that have not been changed in 15 minutes should be "refreshed." This is done
244-- by picking a random ID in the range of the bucket and performing a
245-- find_nodes search on it.
246--
247-- The only other possible BucketTouchEvents are as follows:
248--
249-- >>> not_handled =
250-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
251-- >>> -- (Applicant :--> Stranger)
252-- >>> -- (Applicant :--> Accepted)
253-- >>> , Accepted :--> Applicant -- Never happens
254-- >>> ]
255--