summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Network/Kademlia.hs380
1 files changed, 380 insertions, 0 deletions
diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs
new file mode 100644
index 00000000..c6c59ae6
--- /dev/null
+++ b/src/Network/Kademlia.hs
@@ -0,0 +1,380 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2{-# LANGUAGE KindSignatures #-}
3{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
4-- {-# LANGUAGE TypeFamilies #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE PatternSynonyms #-}
7module Network.Kademlia where
8
9import Data.Function
10import Data.Maybe
11import qualified Data.Set as Set
12import Data.Time.Clock (getCurrentTime)
13import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
14import Network.DHT.Routing as R
15#ifdef THREAD_DEBUG
16import Control.Concurrent.Lifted.Instrument
17#else
18import Control.Concurrent.Lifted
19import GHC.Conc (labelThread)
20#endif
21import Control.Concurrent.STM
22import Control.Monad
23import Data.Bits
24import Data.Hashable
25import Data.IP
26import Data.Monoid
27import Data.Serialize (Serialize)
28import Data.Time.Clock.POSIX (POSIXTime)
29import qualified Data.Wrapper.PSQInt as Int
30 ;import Data.Wrapper.PSQInt (pattern (:->))
31import Network.Address (bucketRange,genBucketSample)
32import Network.BitTorrent.DHT.Search
33import System.Entropy
34import System.Timeout
35import Text.PrettyPrint as PP hiding (($$), (<>))
36import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
37import System.IO
38import Tasks
39
40-- | The status of a given node with respect to a given routint table.
41data RoutingStatus
42 = Stranger -- ^ The node is unknown to the Kademlia routing table.
43 | Applicant -- ^ The node may be inserted pending a ping timeout.
44 | Accepted -- ^ The node has a slot in one of the Kademlia buckets.
45 deriving (Eq,Ord,Enum,Show,Read)
46
47-- | A change occured in the kademlia routing table.
48data RoutingTransition ni = RoutingTransition
49 { transitioningNode :: ni
50 , transitionedTo :: !RoutingStatus
51 }
52 deriving (Eq,Ord,Show,Read)
53
54data InsertionReporter ni = InsertionReporter
55 { -- | Called on every inbound packet.
56 reportArrival :: POSIXTime
57 -> ni -- ^ Origin of packet.
58 -> [ni] -- ^ These will be pinged as a result.
59 -> IO ()
60 -- | Called on every ping probe.
61 , reportPingResult :: POSIXTime
62 -> ni -- ^ Who was pinged.
63 -> Bool -- ^ True if they ponged.
64 -> IO ()
65 }
66
67quietInsertions :: InsertionReporter ni
68quietInsertions = InsertionReporter
69 { reportArrival = \_ _ _ -> return ()
70 , reportPingResult = \_ _ _ -> return ()
71 }
72
73contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t
74contramapIR f ir = InsertionReporter
75 { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
76 , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
77 }
78
79-- | All the IO operations neccessary to maintain a Kademlia routing table.
80data TableStateIO ni = TableStateIO
81 { -- | Write the routing table. Typically 'writeTVar'.
82 tblWrite :: R.BucketList ni -> STM ()
83
84 -- | Read the routing table. Typically 'readTVar'.
85 , tblRead :: STM (R.BucketList ni)
86
87 -- | Issue a ping to a remote node and report 'True' if the node
88 -- responded within an acceptable time and 'False' otherwise.
89 , tblPing :: ni -> IO Bool
90
91 -- | Convenience method provided to assist in maintaining state
92 -- consistent with the routing table. It will be invoked in the same
93 -- transaction that 'tblRead'\/'tblWrite' occured but only when there was
94 -- an interesting change. The returned IO action will be triggered soon
95 -- afterward.
96 --
97 -- It is not necessary to do anything interesting here. The following
98 -- trivial implementation is fine:
99 --
100 -- > tblTransition = const $ return $ return ()
101 , tblTransition :: RoutingTransition ni -> STM (IO ())
102 }
103
104vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
105vanillaIO var ping = TableStateIO
106 { tblRead = readTVar var
107 , tblWrite = writeTVar var
108 , tblPing = ping
109 , tblTransition = const $ return $ return ()
110 }
111
112-- | Everything neccessary to maintain a routing table of /ni/ (node
113-- information) entries.
114data Kademlia nid ni = Kademlia (InsertionReporter ni)
115 (KademliaSpace nid ni)
116 (TableStateIO ni)
117
118
119-- Helper to 'insertNode'.
120--
121-- Adapt return value from 'updateForPingResult' into a
122-- more easily groked list of transitions.
123transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni]
124transition (x,m) =
125 -- | Just _ <- m = Node transition: Accepted --> Stranger
126 -- | Nothing <- m = Node transition: Applicant --> Stranger
127 RoutingTransition x Stranger
128 : maybeToList (accepted <$> m)
129
130-- Helper to 'transition'
131--
132-- Node transition: Applicant --> Accepted
133accepted :: (t,ni) -> RoutingTransition ni
134accepted (_,y) = RoutingTransition y Accepted
135
136
137insertNode :: Kademlia nid ni -> ni -> IO ()
138insertNode (Kademlia reporter space io) node = do
139
140 tm <- utcTimeToPOSIXSeconds <$> getCurrentTime
141
142 (ps,reaction) <- atomically $ do
143 tbl <- tblRead io
144 let (inserted, ps,t') = R.updateForInbound space tm node tbl
145 tblWrite io t'
146 reaction <- case ps of
147 _ | inserted -> -- Node transition: Stranger --> Accepted
148 tblTransition io $ RoutingTransition node Accepted
149 (_:_) -> -- Node transition: Stranger --> Applicant
150 tblTransition io $ RoutingTransition node Applicant
151 _ -> return $ return ()
152 return (ps, reaction)
153
154 reportArrival reporter tm node ps
155 reaction
156
157 _ <- fork $ do
158 myThreadId >>= flip labelThread "pingResults"
159 forM_ ps $ \n -> do
160 b <- tblPing io n
161 reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
162 join $ atomically $ do
163 tbl <- tblRead io
164 let (replacements, t') = R.updateForPingResult space n b tbl
165 tblWrite io t'
166 ios <- sequence $ concatMap
167 (map (tblTransition io) . transition)
168 replacements
169 return $ sequence_ ios
170
171 return ()
172
173
174-- TODO: Bootstrap/Refresh
175--
176-- From BEP 05:
177--
178-- Each bucket should maintain a "last changed" property to indicate how
179-- "fresh" the contents are.
180--
181-- Note: We will use a "time to next refresh" property instead and store it in
182-- a priority search queue.
183--
184-- When...
185--
186-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
187-- >>> bucketEvents =
188-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
189-- >>>
190-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
191-- >>>
192-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
193-- >>> , Applicant :--> Accepted -- with another node,
194-- >>> ]
195--
196-- the bucket's last changed property should be updated. Buckets
197-- that have not been changed in 15 minutes should be "refreshed." This is done
198-- by picking a random ID in the range of the bucket and performing a
199-- find_nodes search on it.
200--
201-- The only other possible BucketTouchEvents are as follows:
202--
203-- >>> not_handled =
204-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
205-- >>> -- (Applicant :--> Stranger)
206-- >>> -- (Applicant :--> Accepted)
207-- >>> , Accepted :--> Applicant -- Never happens
208-- >>> ]
209--
210
211-- XXX: This will be redundantly triggered twice upon every node replacement
212-- because we do not currently distinguish between standalone
213-- insertion/deletion events and an insertion/deletion pair constituting
214-- replacement.
215--
216-- It might also be better to pass the timestamp of the transition here and
217-- keep the refresh queue in better sync with the routing table by updating it
218-- within the STM monad.
219touchBucket :: KademliaSpace nid ni
220 -> POSIXTime
221 -> TVar (BucketList ni)
222 -> TVar (Int.PSQ POSIXTime)
223 -> RoutingTransition ni
224 -> STM (IO ())
225touchBucket space interval bkts psq tr
226 | (transitionedTo tr == Applicant)
227 = return $ return ()
228 | otherwise = return $ do
229 now <- getPOSIXTime
230 atomically $ do
231 let nid = kademliaLocation space (transitioningNode tr)
232 num <- R.bucketNumber space nid <$> readTVar bkts
233 modifyTVar' psq $ Int.insert num (now + interval)
234
235-- | > pollForRefresh interval queue refresh
236--
237-- Fork a refresh loop. Kill the returned thread to terminate it. The
238-- arguments are: a staleness threshold (if a bucket goes this long without
239-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
240-- schedule for each bucket, and a refresh action to be forked when a bucket
241-- excedes the staleness threshold.
242--
243-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
244-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
245-- TVar.
246forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
247forkPollForRefresh interval psq refresh = do
248 fork $ do
249 myThreadId >>= flip labelThread "pollForRefresh"
250 fix $ \again -> do
251 join $ atomically $ do
252 nextup <- Int.findMin <$> readTVar psq
253 maybe retry (return . go again) nextup
254 where
255 go again ( bktnum :-> refresh_time ) = do
256 now <- getPOSIXTime
257 case fromEnum (refresh_time - now) of
258 x | x <= 0 -> do -- Refresh time!
259 -- Move it to the back of the refresh queue.
260 atomically $ modifyTVar' psq
261 $ Int.insert bktnum (now + interval)
262 -- Now fork the refresh operation.
263 -- TODO: We should probably propogate the kill signal to this thread.
264 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
265 _ <- refresh bktnum
266 return ()
267 return ()
268 seconds -> threadDelay ( seconds * 1000000 )
269 again
270
271refreshBucket :: forall nid tok ni addr.
272 ( Show nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) =>
273 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
274refreshBucket sch var n = do
275 tbl <- atomically (readTVar var)
276 let count = bktCount tbl
277 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
278 sample <- if n+1 >= count -- Is this the last bucket?
279 then return nid -- Yes? Search our own id.
280 else kademliaSample (searchSpace sch) -- No? Generate a random id.
281 getEntropy
282 nid
283 (bucketRange n (n + 1 < count))
284 fin <- atomically $ newTVar False
285 resultCounter <- atomically $ newTVar Set.empty
286 let fullcount = R.defaultBucketSize
287 saveit True = writeTVar fin True >> return True
288 saveit _ = return False
289 checkBucketFull :: ni -> STM Bool
290 checkBucketFull found_node = do
291 tbl <- readTVar var
292 let counts = R.shape tbl
293 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
294 $ modifyTVar' resultCounter (Set.insert found_node)
295 resultCount <- readTVar resultCounter
296 saveit $ case drop (n - 1) counts of
297 (cnt:_) | cnt < fullcount -> True
298 _ | Set.size resultCount < fullcount -> True
299 _ -> False
300
301 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
302
303 -- Set 15 minute timeout in order to avoid overlapping refreshes.
304 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
305 then const $ return True -- Never short-circuit the last bucket.
306 else checkBucketFull
307 _ <- timeout (15*60*1000000) $ do
308 atomically $ searchIsFinished s >>= check
309 atomically $ searchCancel s
310 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
311 rcount <- atomically $ do
312 c <- Set.size <$> readTVar resultCounter
313 b <- readTVar fin
314 return $ if b then 1 else c
315 return rcount
316
317bootstrap ::
318 ( Show nid
319 , Serialize nid
320 -- , FiniteBits nid
321 , Hashable ni
322 , Hashable nid
323 , Ord ni
324 , Ord addr
325 , Ord nid
326 , Traversable t1
327 , Traversable t
328 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
329bootstrap sch var ping ns ns0 = do
330 gotPing <- atomically $ newTVar False
331
332 -- First, ping the given nodes so that they are added to
333 -- our routing table.
334 withTaskGroup "bootstrap.resume" 20 $ \g -> do
335 forM_ ns $ \n -> do
336 let lbl = show $ kademliaLocation (searchSpace sch) n
337 forkTask g lbl $ do
338 b <- ping n
339 when b $ atomically $ writeTVar gotPing True
340
341 -- We resort to the hardcoded fallback nodes only when we got no
342 -- responses. This is to lesson the burden on well-known boostrap
343 -- nodes.
344 fallback <- atomically (readTVar gotPing) >>= return . when . not
345 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
346 forM_ ns0 $ \n -> do
347 forkTask g (show $ kademliaLocation (searchSpace sch) n)
348 (void $ ping n)
349 hPutStrLn stderr "Finished bootstrap pings."
350
351 -- Now run searches until all the buckets are full. On a small network,
352 -- this may never quit.
353 --
354 -- TODO: For small networks, we should give up on filling a nearby bucket
355 -- at some point and move on to one farther away.
356 flip fix 1 $ \again cnt -> do
357 when (cnt==0) $ do
358 -- Force a delay in case the search returns too quickly
359 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
360 threadDelay (60 * 1000000)
361 tbl <- atomically $ readTVar var
362 let shp = zip (R.shape tbl) [0 .. ]
363 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
364 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
365 [] -> do
366 when (length shp < R.defaultBucketCount) $ do
367 -- Not enough buckets, keep trying.
368 hPutStrLn stderr
369 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
370 cnt <- refreshBucket sch var
371 (R.defaultBucketCount - 1)
372 again cnt
373 (size,num):_ -> do
374 -- If we don't yet have enough buckets, we need to search our own id.
375 -- We indicate that by setting the bucket number to the target.
376 let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1
377 | otherwise = num
378 hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp)
379 cnt <- refreshBucket sch var num'
380 again cnt