2{-# LANGUAGE DeriveFunctor #-}
3{-# LANGUAGE DeriveTraversable #-}
4{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE KindSignatures #-}
7{-# LANGUAGE PartialTypeSignatures #-}
8{-# LANGUAGE PatternSynonyms #-}
9{-# LANGUAGE ScopedTypeVariables #-}
10module Network.Kademlia.Bootstrap where
12import Data.Function
13import Data.Maybe
14import qualified Data.Set as Set
15import Data.Time.Clock (getCurrentTime)
16import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
17import Network.Kademlia.Routing as R
19import Control.Concurrent.Lifted.Instrument
21import Control.Concurrent.Lifted
22import GHC.Conc (labelThread)
24import Control.Concurrent.STM
25import Control.Monad
26import Data.Bits
27import Data.Hashable
28import Data.IP
29import Data.Monoid
30import Data.Serialize (Serialize)
31import Data.Time.Clock.POSIX (POSIXTime)
32import System.Entropy
33import System.Timeout
34import Text.PrettyPrint as PP hiding (($$), (<>))
35import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
36import System.IO
38import qualified Data.Wrapper.PSQInt as Int
39 ;import Data.Wrapper.PSQInt (pattern (:->))
40import Network.Address (bucketRange,genBucketSample)
41import Network.Kademlia.Search
42import Control.Concurrent.Tasks
43import Network.Kademlia
46-- | > pollForRefresh interval queue refresh
48-- Fork a refresh loop. Kill the returned thread to terminate it. The
49-- arguments are: a staleness threshold (if a bucket goes this long without
50-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
51-- schedule for each bucket, and a refresh action to be forked when a bucket
52-- excedes the staleness threshold.
54-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
55-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
56-- TVar.
57forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
58forkPollForRefresh interval psq refresh = do
59 fork $ do
60 myThreadId >>= flip labelThread "pollForRefresh"
61 fix $ \again -> do
62 join $ atomically $ do
63 nextup <- Int.findMin <$> readTVar psq
64 maybe retry (return . go again) nextup
65 where
66 go again ( bktnum :-> refresh_time ) = do
67 now <- getPOSIXTime
68 case fromEnum (refresh_time - now) of
69 x | x <= 0 -> do -- Refresh time!
70 -- Move it to the back of the refresh queue.
71 atomically $ modifyTVar' psq
72 $ Int.insert bktnum (now + interval)
73 -- Now fork the refresh operation.
74 -- TODO: We should probably propogate the kill signal to this thread.
75 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
76 _ <- refresh bktnum
77 return ()
78 return ()
79 seconds -> threadDelay ( seconds * 1000000 )
80 again
82refreshBucket :: forall nid tok ni addr.
83 ( Show nid
84 , Serialize nid
85 , Ord nid
86 , Ord ni
87 , Ord addr
88 , Hashable nid
89 , Hashable ni ) =>
90 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
91refreshBucket sch var n = do
92 tbl <- atomically (readTVar var)
93 let count = bktCount tbl
94 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
95 sample <- if n+1 >= count -- Is this the last bucket?
96 then return nid -- Yes? Search our own id.
97 else kademliaSample (searchSpace sch) -- No? Generate a random id.
98 getEntropy
99 nid
100 (bucketRange n (n + 1 < count))
101 fin <- atomically $ newTVar False
102 resultCounter <- atomically $ newTVar Set.empty
103 let fullcount = R.defaultBucketSize
104 saveit True = writeTVar fin True >> return True
105 saveit _ = return False
106 checkBucketFull :: ni -> STM Bool
107 checkBucketFull found_node = do
108 tbl <- readTVar var
109 let counts = R.shape tbl
110 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
111 $ modifyTVar' resultCounter (Set.insert found_node)
112 resultCount <- readTVar resultCounter
113 saveit $ case drop (n - 1) counts of
114 (cnt:_) | cnt < fullcount -> True
115 _ | Set.size resultCount < fullcount -> True
116 _ -> False
118 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
120 -- Set 15 minute timeout in order to avoid overlapping refreshes.
121 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
122 then const $ return True -- Never short-circuit the last bucket.
123 else checkBucketFull
124 _ <- timeout (15*60*1000000) $ do
125 atomically $ searchIsFinished s >>= check
126 atomically $ searchCancel s
127 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
128 rcount <- atomically $ do
129 c <- Set.size <$> readTVar resultCounter
130 b <- readTVar fin
131 return $ if b then 1 else c
132 return rcount
135bootstrap ::
136 ( Show nid
137 , Serialize nid
138 -- , FiniteBits nid
139 , Hashable ni
140 , Hashable nid
141 , Ord ni
142 , Ord addr
143 , Ord nid
144 , Traversable t1
145 , Traversable t
146 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
147bootstrap sch var ping ns ns0 = do
148 gotPing <- atomically $ newTVar False
150 -- First, ping the given nodes so that they are added to
151 -- our routing table.
152 withTaskGroup "bootstrap.resume" 20 $ \g -> do
153 forM_ ns $ \n -> do
154 let lbl = show $ kademliaLocation (searchSpace sch) n
155 forkTask g lbl $ do
156 b <- ping n
157 when b $ atomically $ writeTVar gotPing True
159 -- We resort to the hardcoded fallback nodes only when we got no
160 -- responses. This is to lesson the burden on well-known boostrap
161 -- nodes.
162 fallback <- atomically (readTVar gotPing) >>= return . when . not
163 fallback $ withTaskGroup "" 20 $ \g -> do
164 forM_ ns0 $ \n -> do
165 forkTask g (show $ kademliaLocation (searchSpace sch) n)
166 (void $ ping n)
167 hPutStrLn stderr "Finished bootstrap pings."
169 -- Now run searches until all the buckets are full. On a small network,
170 -- this may never quit.
171 --
172 -- TODO: For small networks, we should give up on filling a nearby bucket
173 -- at some point and move on to one farther away.
174 flip fix 1 $ \again cnt -> do
175 when (cnt==0) $ do
176 -- Force a delay in case the search returns too quickly
177 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
178 threadDelay (60 * 1000000)
179 tbl <- atomically $ readTVar var
180 let shp = zip (R.shape tbl) [0 .. ]
181 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
182 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
183 [] -> do
184 when (length shp < R.defaultBucketCount) $ do
185 -- Not enough buckets, keep trying.
186 hPutStrLn stderr
187 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
188 cnt <- refreshBucket sch var
189 (R.defaultBucketCount - 1)
190 again cnt
191 (size,num):_ -> do
192 hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp)
193 cnt <- refreshBucket sch var num
194 again cnt
196-- XXX: This will be redundantly triggered twice upon every node replacement
197-- because we do not currently distinguish between standalone
198-- insertion/deletion events and an insertion/deletion pair constituting
199-- replacement.
201-- It might also be better to pass the timestamp of the transition here and
202-- keep the refresh queue in better sync with the routing table by updating it
203-- within the STM monad.
204touchBucket :: KademliaSpace nid ni
205 -> POSIXTime
206 -> TVar (BucketList ni)
207 -> TVar (Int.PSQ POSIXTime)
208 -> RoutingTransition ni
209 -> STM (IO ())
210touchBucket space interval bkts psq tr
211 | (transitionedTo tr == Applicant)
212 = return $ return ()
213 | otherwise = return $ do
214 now <- getPOSIXTime
215 atomically $ do
216 let nid = kademliaLocation space (transitioningNode tr)
217 num <- R.bucketNumber space nid <$> readTVar bkts
218 modifyTVar' psq $ Int.insert num (now + interval)
220-- TODO: Bootstrap/Refresh
222-- From BEP 05:
224-- Each bucket should maintain a "last changed" property to indicate how
225-- "fresh" the contents are.
227-- Note: We will use a "time to next refresh" property instead and store it in
228-- a priority search queue.
230-- When...
232-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
233-- >>> bucketEvents =
234-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
235-- >>>
236-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
237-- >>>
238-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
239-- >>> , Applicant :--> Accepted -- with another node,
240-- >>> ]
242-- the bucket's last changed property should be updated. Buckets
243-- that have not been changed in 15 minutes should be "refreshed." This is done
244-- by picking a random ID in the range of the bucket and performing a
245-- find_nodes search on it.
247-- The only other possible BucketTouchEvents are as follows:
249-- >>> not_handled =
250-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
251-- >>> -- (Applicant :--> Stranger)
252-- >>> -- (Applicant :--> Accepted)
253-- >>> , Accepted :--> Applicant -- Never happens
254-- >>> ]