summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia/Bootstrap.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-06 00:16:36 -0500
committerjoe <joe@jerkface.net>2017-11-06 00:16:36 -0500
commit159f60689ab70413d963d8103b561dd587c448d6 (patch)
treed7d2a36839f7d30424731b0b7c4d46bd129a553a /src/Network/Kademlia/Bootstrap.hs
parente3efc5455185bc984b2e49f0d54c9bded1b4f269 (diff)
Factored out Network.Kademlia.Bootstrap.
Diffstat (limited to 'src/Network/Kademlia/Bootstrap.hs')
-rw-r--r--src/Network/Kademlia/Bootstrap.hs255
1 files changed, 255 insertions, 0 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
new file mode 100644
index 00000000..283e054a
--- /dev/null
+++ b/src/Network/Kademlia/Bootstrap.hs
@@ -0,0 +1,255 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE DeriveFunctor #-}
3{-# LANGUAGE DeriveTraversable #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE KindSignatures #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE PatternSynonyms #-}
9{-# LANGUAGE ScopedTypeVariables #-}
10module Network.Kademlia.Bootstrap where
11
12import Data.Function
13import Data.Maybe
14import qualified Data.Set as Set
15import Data.Time.Clock (getCurrentTime)
16import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
17import Network.Kademlia.Routing as R
18#ifdef THREAD_DEBUG
19import Control.Concurrent.Lifted.Instrument
20#else
21import Control.Concurrent.Lifted
22import GHC.Conc (labelThread)
23#endif
24import Control.Concurrent.STM
25import Control.Monad
26import Data.Bits
27import Data.Hashable
28import Data.IP
29import Data.Monoid
30import Data.Serialize (Serialize)
31import Data.Time.Clock.POSIX (POSIXTime)
32import System.Entropy
33import System.Timeout
34import Text.PrettyPrint as PP hiding (($$), (<>))
35import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
36import System.IO
37
38import qualified Data.Wrapper.PSQInt as Int
39 ;import Data.Wrapper.PSQInt (pattern (:->))
40import Network.Address (bucketRange,genBucketSample)
41import Network.Kademlia.Search
42import Control.Concurrent.Tasks
43import Network.Kademlia
44
45
46-- | > pollForRefresh interval queue refresh
47--
48-- Fork a refresh loop. Kill the returned thread to terminate it. The
49-- arguments are: a staleness threshold (if a bucket goes this long without
50-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
51-- schedule for each bucket, and a refresh action to be forked when a bucket
52-- excedes the staleness threshold.
53--
54-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
55-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
56-- TVar.
57forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
58forkPollForRefresh interval psq refresh = do
59 fork $ do
60 myThreadId >>= flip labelThread "pollForRefresh"
61 fix $ \again -> do
62 join $ atomically $ do
63 nextup <- Int.findMin <$> readTVar psq
64 maybe retry (return . go again) nextup
65 where
66 go again ( bktnum :-> refresh_time ) = do
67 now <- getPOSIXTime
68 case fromEnum (refresh_time - now) of
69 x | x <= 0 -> do -- Refresh time!
70 -- Move it to the back of the refresh queue.
71 atomically $ modifyTVar' psq
72 $ Int.insert bktnum (now + interval)
73 -- Now fork the refresh operation.
74 -- TODO: We should probably propogate the kill signal to this thread.
75 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
76 _ <- refresh bktnum
77 return ()
78 return ()
79 seconds -> threadDelay ( seconds * 1000000 )
80 again
81
82refreshBucket :: forall nid tok ni addr.
83 ( Show nid
84 , Serialize nid
85 , Ord nid
86 , Ord ni
87 , Ord addr
88 , Hashable nid
89 , Hashable ni ) =>
90 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
91refreshBucket sch var n = do
92 tbl <- atomically (readTVar var)
93 let count = bktCount tbl
94 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
95 sample <- if n+1 >= count -- Is this the last bucket?
96 then return nid -- Yes? Search our own id.
97 else kademliaSample (searchSpace sch) -- No? Generate a random id.
98 getEntropy
99 nid
100 (bucketRange n (n + 1 < count))
101 fin <- atomically $ newTVar False
102 resultCounter <- atomically $ newTVar Set.empty
103 let fullcount = R.defaultBucketSize
104 saveit True = writeTVar fin True >> return True
105 saveit _ = return False
106 checkBucketFull :: ni -> STM Bool
107 checkBucketFull found_node = do
108 tbl <- readTVar var
109 let counts = R.shape tbl
110 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
111 $ modifyTVar' resultCounter (Set.insert found_node)
112 resultCount <- readTVar resultCounter
113 saveit $ case drop (n - 1) counts of
114 (cnt:_) | cnt < fullcount -> True
115 _ | Set.size resultCount < fullcount -> True
116 _ -> False
117
118 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
119
120 -- Set 15 minute timeout in order to avoid overlapping refreshes.
121 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
122 then const $ return True -- Never short-circuit the last bucket.
123 else checkBucketFull
124 _ <- timeout (15*60*1000000) $ do
125 atomically $ searchIsFinished s >>= check
126 atomically $ searchCancel s
127 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
128 rcount <- atomically $ do
129 c <- Set.size <$> readTVar resultCounter
130 b <- readTVar fin
131 return $ if b then 1 else c
132 return rcount
133
134
135bootstrap ::
136 ( Show nid
137 , Serialize nid
138 -- , FiniteBits nid
139 , Hashable ni
140 , Hashable nid
141 , Ord ni
142 , Ord addr
143 , Ord nid
144 , Traversable t1
145 , Traversable t
146 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
147bootstrap sch var ping ns ns0 = do
148 gotPing <- atomically $ newTVar False
149
150 -- First, ping the given nodes so that they are added to
151 -- our routing table.
152 withTaskGroup "bootstrap.resume" 20 $ \g -> do
153 forM_ ns $ \n -> do
154 let lbl = show $ kademliaLocation (searchSpace sch) n
155 forkTask g lbl $ do
156 b <- ping n
157 when b $ atomically $ writeTVar gotPing True
158
159 -- We resort to the hardcoded fallback nodes only when we got no
160 -- responses. This is to lesson the burden on well-known boostrap
161 -- nodes.
162 fallback <- atomically (readTVar gotPing) >>= return . when . not
163 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
164 forM_ ns0 $ \n -> do
165 forkTask g (show $ kademliaLocation (searchSpace sch) n)
166 (void $ ping n)
167 hPutStrLn stderr "Finished bootstrap pings."
168
169 -- Now run searches until all the buckets are full. On a small network,
170 -- this may never quit.
171 --
172 -- TODO: For small networks, we should give up on filling a nearby bucket
173 -- at some point and move on to one farther away.
174 flip fix 1 $ \again cnt -> do
175 when (cnt==0) $ do
176 -- Force a delay in case the search returns too quickly
177 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
178 threadDelay (60 * 1000000)
179 tbl <- atomically $ readTVar var
180 let shp = zip (R.shape tbl) [0 .. ]
181 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
182 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
183 [] -> do
184 when (length shp < R.defaultBucketCount) $ do
185 -- Not enough buckets, keep trying.
186 hPutStrLn stderr
187 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
188 cnt <- refreshBucket sch var
189 (R.defaultBucketCount - 1)
190 again cnt
191 (size,num):_ -> do
192 hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp)
193 cnt <- refreshBucket sch var num
194 again cnt
195
196-- XXX: This will be redundantly triggered twice upon every node replacement
197-- because we do not currently distinguish between standalone
198-- insertion/deletion events and an insertion/deletion pair constituting
199-- replacement.
200--
201-- It might also be better to pass the timestamp of the transition here and
202-- keep the refresh queue in better sync with the routing table by updating it
203-- within the STM monad.
204touchBucket :: KademliaSpace nid ni
205 -> POSIXTime
206 -> TVar (BucketList ni)
207 -> TVar (Int.PSQ POSIXTime)
208 -> RoutingTransition ni
209 -> STM (IO ())
210touchBucket space interval bkts psq tr
211 | (transitionedTo tr == Applicant)
212 = return $ return ()
213 | otherwise = return $ do
214 now <- getPOSIXTime
215 atomically $ do
216 let nid = kademliaLocation space (transitioningNode tr)
217 num <- R.bucketNumber space nid <$> readTVar bkts
218 modifyTVar' psq $ Int.insert num (now + interval)
219
220-- TODO: Bootstrap/Refresh
221--
222-- From BEP 05:
223--
224-- Each bucket should maintain a "last changed" property to indicate how
225-- "fresh" the contents are.
226--
227-- Note: We will use a "time to next refresh" property instead and store it in
228-- a priority search queue.
229--
230-- When...
231--
232-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
233-- >>> bucketEvents =
234-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
235-- >>>
236-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
237-- >>>
238-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
239-- >>> , Applicant :--> Accepted -- with another node,
240-- >>> ]
241--
242-- the bucket's last changed property should be updated. Buckets
243-- that have not been changed in 15 minutes should be "refreshed." This is done
244-- by picking a random ID in the range of the bucket and performing a
245-- find_nodes search on it.
246--
247-- The only other possible BucketTouchEvents are as follows:
248--
249-- >>> not_handled =
250-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
251-- >>> -- (Applicant :--> Stranger)
252-- >>> -- (Applicant :--> Accepted)
253-- >>> , Accepted :--> Applicant -- Never happens
254-- >>> ]
255--