1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2{-# LANGUAGE KindSignatures #-}
3{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
4-- {-# LANGUAGE TypeFamilies #-}
6{-# LANGUAGE PatternSynonyms #-}
7module Network.Kademlia where
9import Data.Function
10import Data.Maybe
11import qualified Data.Set as Set
12import Data.Time.Clock (getCurrentTime)
13import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
14import Network.DHT.Routing as R
16import Control.Concurrent.Lifted.Instrument
18import Control.Concurrent.Lifted
19import GHC.Conc (labelThread)
21import Control.Concurrent.STM
22import Control.Monad
23import Data.Bits
24import Data.Hashable
25import Data.IP
26import Data.Monoid
27import Data.Serialize (Serialize)
28import Data.Time.Clock.POSIX (POSIXTime)
29import qualified Data.Wrapper.PSQInt as Int
30 ;import Data.Wrapper.PSQInt (pattern (:->))
31import Network.Address (bucketRange,genBucketSample)
32import Network.BitTorrent.DHT.Search
33import System.Entropy
34import System.Timeout
35import Text.PrettyPrint as PP hiding (($$), (<>))
36import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
37import System.IO
38import Tasks
40-- | The status of a given node with respect to a given routint table.
41data RoutingStatus
42 = Stranger -- ^ The node is unknown to the Kademlia routing table.
43 | Applicant -- ^ The node may be inserted pending a ping timeout.
44 | Accepted -- ^ The node has a slot in one of the Kademlia buckets.
45 deriving (Eq,Ord,Enum,Show,Read)
47-- | A change occured in the kademlia routing table.
48data RoutingTransition ni = RoutingTransition
49 { transitioningNode :: ni
50 , transitionedTo :: !RoutingStatus
51 }
52 deriving (Eq,Ord,Show,Read)
54data InsertionReporter ni = InsertionReporter
55 { -- | Called on every inbound packet.
56 reportArrival :: POSIXTime
57 -> ni -- ^ Origin of packet.
58 -> [ni] -- ^ These will be pinged as a result.
59 -> IO ()
60 -- | Called on every ping probe.
61 , reportPingResult :: POSIXTime
62 -> ni -- ^ Who was pinged.
63 -> Bool -- ^ True if they ponged.
64 -> IO ()
65 }
67quietInsertions :: InsertionReporter ni
68quietInsertions = InsertionReporter
69 { reportArrival = \_ _ _ -> return ()
70 , reportPingResult = \_ _ _ -> return ()
71 }
73contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t
74contramapIR f ir = InsertionReporter
75 { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
76 , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
77 }
79-- | All the IO operations neccessary to maintain a Kademlia routing table.
80data TableStateIO ni = TableStateIO
81 { -- | Write the routing table. Typically 'writeTVar'.
82 tblWrite :: R.BucketList ni -> STM ()
84 -- | Read the routing table. Typically 'readTVar'.
85 , tblRead :: STM (R.BucketList ni)
87 -- | Issue a ping to a remote node and report 'True' if the node
88 -- responded within an acceptable time and 'False' otherwise.
89 , tblPing :: ni -> IO Bool
91 -- | Convenience method provided to assist in maintaining state
92 -- consistent with the routing table. It will be invoked in the same
93 -- transaction that 'tblRead'\/'tblWrite' occured but only when there was
94 -- an interesting change. The returned IO action will be triggered soon
95 -- afterward.
96 --
97 -- It is not necessary to do anything interesting here. The following
98 -- trivial implementation is fine:
99 --
100 -- > tblTransition = const $ return $ return ()
101 , tblTransition :: RoutingTransition ni -> STM (IO ())
102 }
104vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
105vanillaIO var ping = TableStateIO
106 { tblRead = readTVar var
107 , tblWrite = writeTVar var
108 , tblPing = ping
109 , tblTransition = const $ return $ return ()
110 }
112-- | Everything neccessary to maintain a routing table of /ni/ (node
113-- information) entries.
114data Kademlia nid ni = Kademlia (InsertionReporter ni)
115 (KademliaSpace nid ni)
116 (TableStateIO ni)
119-- Helper to 'insertNode'.
121-- Adapt return value from 'updateForPingResult' into a
122-- more easily groked list of transitions.
123transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni]
124transition (x,m) =
125 -- | Just _ <- m = Node transition: Accepted --> Stranger
126 -- | Nothing <- m = Node transition: Applicant --> Stranger
127 RoutingTransition x Stranger
128 : maybeToList (accepted <$> m)
130-- Helper to 'transition'
132-- Node transition: Applicant --> Accepted
133accepted :: (t,ni) -> RoutingTransition ni
134accepted (_,y) = RoutingTransition y Accepted
137insertNode :: Kademlia nid ni -> ni -> IO ()
138insertNode (Kademlia reporter space io) node = do
140 tm <- utcTimeToPOSIXSeconds <$> getCurrentTime
142 (ps,reaction) <- atomically $ do
143 tbl <- tblRead io
144 let (inserted, ps,t') = R.updateForInbound space tm node tbl
145 tblWrite io t'
146 reaction <- case ps of
147 _ | inserted -> -- Node transition: Stranger --> Accepted
148 tblTransition io $ RoutingTransition node Accepted
149 (_:_) -> -- Node transition: Stranger --> Applicant
150 tblTransition io $ RoutingTransition node Applicant
151 _ -> return $ return ()
152 return (ps, reaction)
154 reportArrival reporter tm node ps
155 reaction
157 _ <- fork $ do
158 myThreadId >>= flip labelThread "pingResults"
159 forM_ ps $ \n -> do
160 b <- tblPing io n
161 reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
162 join $ atomically $ do
163 tbl <- tblRead io
164 let (replacements, t') = R.updateForPingResult space n b tbl
165 tblWrite io t'
166 ios <- sequence $ concatMap
167 (map (tblTransition io) . transition)
168 replacements
169 return $ sequence_ ios
171 return ()
174-- TODO: Bootstrap/Refresh
176-- From BEP 05:
178-- Each bucket should maintain a "last changed" property to indicate how
179-- "fresh" the contents are.
181-- Note: We will use a "time to next refresh" property instead and store it in
182-- a priority search queue.
184-- When...
186-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
187-- >>> bucketEvents =
188-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
189-- >>>
190-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
191-- >>>
192-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
193-- >>> , Applicant :--> Accepted -- with another node,
194-- >>> ]
196-- the bucket's last changed property should be updated. Buckets
197-- that have not been changed in 15 minutes should be "refreshed." This is done
198-- by picking a random ID in the range of the bucket and performing a
199-- find_nodes search on it.
201-- The only other possible BucketTouchEvents are as follows:
203-- >>> not_handled =
204-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
205-- >>> -- (Applicant :--> Stranger)
206-- >>> -- (Applicant :--> Accepted)
207-- >>> , Accepted :--> Applicant -- Never happens
208-- >>> ]
211-- XXX: This will be redundantly triggered twice upon every node replacement
212-- because we do not currently distinguish between standalone
213-- insertion/deletion events and an insertion/deletion pair constituting
214-- replacement.
216-- It might also be better to pass the timestamp of the transition here and
217-- keep the refresh queue in better sync with the routing table by updating it
218-- within the STM monad.
219touchBucket :: KademliaSpace nid ni
220 -> POSIXTime
221 -> TVar (BucketList ni)
222 -> TVar (Int.PSQ POSIXTime)
223 -> RoutingTransition ni
224 -> STM (IO ())
225touchBucket space interval bkts psq tr
226 | (transitionedTo tr == Applicant)
227 = return $ return ()
228 | otherwise = return $ do
229 now <- getPOSIXTime
230 atomically $ do
231 let nid = kademliaLocation space (transitioningNode tr)
232 num <- R.bucketNumber space nid <$> readTVar bkts
233 modifyTVar' psq $ Int.insert num (now + interval)
235-- | > pollForRefresh interval queue refresh
237-- Fork a refresh loop. Kill the returned thread to terminate it. The
238-- arguments are: a staleness threshold (if a bucket goes this long without
239-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
240-- schedule for each bucket, and a refresh action to be forked when a bucket
241-- excedes the staleness threshold.
243-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
244-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
245-- TVar.
246forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
247forkPollForRefresh interval psq refresh = do
248 fork $ do
249 myThreadId >>= flip labelThread "pollForRefresh"
250 fix $ \again -> do
251 join $ atomically $ do
252 nextup <- Int.findMin <$> readTVar psq
253 maybe retry (return . go again) nextup
254 where
255 go again ( bktnum :-> refresh_time ) = do
256 now <- getPOSIXTime
257 case fromEnum (refresh_time - now) of
258 x | x <= 0 -> do -- Refresh time!
259 -- Move it to the back of the refresh queue.
260 atomically $ modifyTVar' psq
261 $ Int.insert bktnum (now + interval)
262 -- Now fork the refresh operation.
263 -- TODO: We should probably propogate the kill signal to this thread.
264 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
265 _ <- refresh bktnum
266 return ()
267 return ()
268 seconds -> threadDelay ( seconds * 1000000 )
269 again
271refreshBucket :: forall nid tok ni addr.
272 ( Show nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) =>
273 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
274refreshBucket sch var n = do
275 tbl <- atomically (readTVar var)
276 let count = bktCount tbl
277 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
278 sample <- if n+1 >= count -- Is this the last bucket?
279 then return nid -- Yes? Search our own id.
280 else kademliaSample (searchSpace sch) -- No? Generate a random id.
281 getEntropy
282 nid
283 (bucketRange n (n + 1 < count))
284 fin <- atomically $ newTVar False
285 resultCounter <- atomically $ newTVar Set.empty
286 let fullcount = R.defaultBucketSize
287 saveit True = writeTVar fin True >> return True
288 saveit _ = return False
289 checkBucketFull :: ni -> STM Bool
290 checkBucketFull found_node = do
291 tbl <- readTVar var
292 let counts = R.shape tbl
293 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
294 $ modifyTVar' resultCounter (Set.insert found_node)
295 resultCount <- readTVar resultCounter
296 saveit $ case drop (n - 1) counts of
297 (cnt:_) | cnt < fullcount -> True
298 _ | Set.size resultCount < fullcount -> True
299 _ -> False
301 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
303 -- Set 15 minute timeout in order to avoid overlapping refreshes.
304 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
305 then const $ return True -- Never short-circuit the last bucket.
306 else checkBucketFull
307 _ <- timeout (15*60*1000000) $ do
308 atomically $ searchIsFinished s >>= check
309 atomically $ searchCancel s
310 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
311 rcount <- atomically $ do
312 c <- Set.size <$> readTVar resultCounter
313 b <- readTVar fin
314 return $ if b then 1 else c
315 return rcount
317bootstrap ::
318 ( Show nid
319 , Serialize nid
320 -- , FiniteBits nid
321 , Hashable ni
322 , Hashable nid
323 , Ord ni
324 , Ord addr
325 , Ord nid
326 , Traversable t1
327 , Traversable t
328 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
329bootstrap sch var ping ns ns0 = do
330 gotPing <- atomically $ newTVar False
332 -- First, ping the given nodes so that they are added to
333 -- our routing table.
334 withTaskGroup "bootstrap.resume" 20 $ \g -> do
335 forM_ ns $ \n -> do
336 let lbl = show $ kademliaLocation (searchSpace sch) n
337 forkTask g lbl $ do
338 b <- ping n
339 when b $ atomically $ writeTVar gotPing True
341 -- We resort to the hardcoded fallback nodes only when we got no
342 -- responses. This is to lesson the burden on well-known boostrap
343 -- nodes.
344 fallback <- atomically (readTVar gotPing) >>= return . when . not
345 fallback $ withTaskGroup "" 20 $ \g -> do
346 forM_ ns0 $ \n -> do
347 forkTask g (show $ kademliaLocation (searchSpace sch) n)
348 (void $ ping n)
349 hPutStrLn stderr "Finished bootstrap pings."
351 -- Now run searches until all the buckets are full. On a small network,
352 -- this may never quit.
353 --
354 -- TODO: For small networks, we should give up on filling a nearby bucket
355 -- at some point and move on to one farther away.
356 flip fix 1 $ \again cnt -> do
357 when (cnt==0) $ do
358 -- Force a delay in case the search returns too quickly
359 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
360 threadDelay (60 * 1000000)
361 tbl <- atomically $ readTVar var
362 let shp = zip (R.shape tbl) [0 .. ]
363 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
364 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
365 [] -> do
366 when (length shp < R.defaultBucketCount) $ do
367 -- Not enough buckets, keep trying.
368 hPutStrLn stderr
369 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
370 cnt <- refreshBucket sch var
371 (R.defaultBucketCount - 1)
372 again cnt
373 (size,num):_ -> do
374 -- If we don't yet have enough buckets, we need to search our own id.
375 -- We indicate that by setting the bucket number to the target.
376 let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1
377 | otherwise = num
378 hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp)
379 cnt <- refreshBucket sch var num'
380 again cnt