summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Kademlia.hs110
1 files changed, 102 insertions, 8 deletions
diff --git a/Kademlia.hs b/Kademlia.hs
index c8eb2b93..9316f135 100644
--- a/Kademlia.hs
+++ b/Kademlia.hs
@@ -3,12 +3,14 @@
3{-# LANGUAGE DeriveFunctor, DeriveTraversable #-} 3{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
4-- {-# LANGUAGE TypeFamilies #-} 4-- {-# LANGUAGE TypeFamilies #-}
5{-# LANGUAGE GADTs #-} 5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE PatternSynonyms #-}
6module Kademlia where 7module Kademlia where
7 8
9import Data.Function
8import Data.Maybe 10import Data.Maybe
9import Network.DHT.Routing as R 11import Network.DHT.Routing as R
10import Data.Time.Clock (getCurrentTime) 12import Data.Time.Clock (getCurrentTime)
11import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) 13import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, getPOSIXTime)
12#ifdef THREAD_DEBUG 14#ifdef THREAD_DEBUG
13import Control.Concurrent.Lifted.Instrument 15import Control.Concurrent.Lifted.Instrument
14#else 16#else
@@ -24,7 +26,8 @@ import Control.Concurrent.STM
24import Control.Monad 26import Control.Monad
25import Data.Monoid 27import Data.Monoid
26import Data.Time.Clock.POSIX (POSIXTime) 28import Data.Time.Clock.POSIX (POSIXTime)
27 29import Data.Wrapper.PSQInt ( pattern (:->) )
30import qualified Data.Wrapper.PSQInt as Int
28 31
29-- | The status of a given node with respect to a given routint table. 32-- | The status of a given node with respect to a given routint table.
30data RoutingStatus 33data RoutingStatus
@@ -64,7 +67,7 @@ contramapIR f ir = InsertionReporter
64 } 67 }
65 68
66-- | All the IO operations neccessary to maintain a Kademlia routing table. 69-- | All the IO operations neccessary to maintain a Kademlia routing table.
67data TableStateIO nid ni = TableStateIO 70data TableStateIO ni = TableStateIO
68 { -- | Write the routing table. Typically 'writeTVar'. 71 { -- | Write the routing table. Typically 'writeTVar'.
69 tblWrite :: R.BucketList ni -> STM () 72 tblWrite :: R.BucketList ni -> STM ()
70 73
@@ -88,7 +91,7 @@ data TableStateIO nid ni = TableStateIO
88 , tblTransition :: RoutingTransition ni -> STM (IO ()) 91 , tblTransition :: RoutingTransition ni -> STM (IO ())
89 } 92 }
90 93
91vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO nid ni 94vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
92vanillaIO var ping = TableStateIO 95vanillaIO var ping = TableStateIO
93 { tblRead = readTVar var 96 { tblRead = readTVar var
94 , tblWrite = writeTVar var 97 , tblWrite = writeTVar var
@@ -100,7 +103,7 @@ vanillaIO var ping = TableStateIO
100-- information) entries. 103-- information) entries.
101data Kademlia nid ni = Kademlia (InsertionReporter ni) 104data Kademlia nid ni = Kademlia (InsertionReporter ni)
102 (KademliaSpace nid ni) 105 (KademliaSpace nid ni)
103 (TableStateIO nid ni) 106 (TableStateIO ni)
104 107
105 108
106-- Helper to 'insertNode'. 109-- Helper to 'insertNode'.
@@ -132,8 +135,8 @@ insertNode (Kademlia reporter space io) node = do
132 tblWrite io t' 135 tblWrite io t'
133 reaction <- case ps of 136 reaction <- case ps of
134 _ | inserted -> -- Node transition: Stranger --> Accepted 137 _ | inserted -> -- Node transition: Stranger --> Accepted
135 tblTransition io $ RoutingTransition node Accepted 138 tblTransition io $ RoutingTransition node Accepted
136 (_:_) -> -- Node transition: Stranger --> Applicant 139 (_:_) -> -- Node transition: Stranger --> Applicant
137 tblTransition io $ RoutingTransition node Applicant 140 tblTransition io $ RoutingTransition node Applicant
138 _ -> return $ return () 141 _ -> return $ return ()
139 return (ps, reaction) 142 return (ps, reaction)
@@ -145,7 +148,7 @@ insertNode (Kademlia reporter space io) node = do
145 myThreadId >>= flip labelThread "pingResults" 148 myThreadId >>= flip labelThread "pingResults"
146 forM_ ps $ \n -> do 149 forM_ ps $ \n -> do
147 b <- tblPing io n 150 b <- tblPing io n
148 reportPingResult reporter tm n b 151 reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
149 join $ atomically $ do 152 join $ atomically $ do
150 tbl <- tblRead io 153 tbl <- tblRead io
151 let (replacements, t') = R.updateForPingResult space n b tbl 154 let (replacements, t') = R.updateForPingResult space n b tbl
@@ -156,3 +159,94 @@ insertNode (Kademlia reporter space io) node = do
156 return $ sequence_ ios 159 return $ sequence_ ios
157 160
158 return () 161 return ()
162
163
164-- TODO: Bootstrap/Refresh
165--
166-- From BEP 05:
167--
168-- Each bucket should maintain a "last changed" property to indicate how
169-- "fresh" the contents are.
170--
171-- Note: We will use a "time to next refresh" property instead and store it in
172-- a priority search queue.
173--
174-- When...
175--
176-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
177-- >>> bucketEvents =
178-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
179-- >>>
180-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
181-- >>>
182-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
183-- >>> , Applicant :--> Accepted -- with another node,
184-- >>> ]
185--
186-- the bucket's last changed property should be updated. Buckets
187-- that have not been changed in 15 minutes should be "refreshed." This is done
188-- by picking a random ID in the range of the bucket and performing a
189-- find_nodes search on it.
190--
191-- The only other possible BucketTouchEvents are as follows:
192--
193-- >>> not_handled =
194-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
195-- >>> -- (Applicant :--> Stranger)
196-- >>> -- (Applicant :--> Accepted)
197-- >>> , Accepted :--> Applicant -- Never happens
198-- >>> ]
199--
200
201-- XXX: This will be redundantly triggered twice upon every node replacement
202-- because we do not currently distinguish between standalone
203-- insertion/deletion events and an insertion/deletion pair constituting
204-- replacement.
205--
206-- It might also be better to pass the timestamp of the transition here and
207-- keep the refresh queue in better sync with the routing table by updating it
208-- within the STM monad.
209touchBucket :: POSIXTime -> (ni -> Int) -> TVar (Int.PSQ POSIXTime) -> RoutingTransition ni -> STM (IO ())
210touchBucket interval bktnum psq tr
211 | (transitionedTo tr == Applicant)
212 = return $ return ()
213 | otherwise = return $ do
214 now <- getPOSIXTime
215 atomically $ modifyTVar' psq
216 $ Int.insert (bktnum $ transitioningNode tr)
217 (now + interval)
218
219-- | > pollForRefresh interval queue refresh
220--
221-- Fork a refresh loop. Kill the returned thread to terminate it. The
222-- arguments are: a staleness threshold (if a bucket goes this long without
223-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
224-- schedule for each bucket, and a refresh action to be forked when a bucket
225-- excedes the staleness threshold.
226--
227-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
228-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
229-- TVar.
230pollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO ()) -> IO ThreadId
231pollForRefresh interval psq refresh = do
232 fork $ do
233 myThreadId >>= flip labelThread "pollForRefresh"
234 fix $ \again -> do
235 join $ atomically $ do
236 nextup <- Int.findMin <$> readTVar psq
237 maybe retry (return . go again) nextup
238 where
239 go again ( bktnum :-> refresh_time ) = do
240 now <- getPOSIXTime
241 case fromEnum (refresh_time - now) of
242 x | x <= 0 -> do -- Refresh time!
243 -- Move it to the back of the refresh queue.
244 atomically $ modifyTVar' psq
245 $ Int.insert bktnum (now + interval)
246 -- Now fork the refresh operation.
247 -- TODO: We should probably propogate the kill signal to this thread.
248 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
249 refresh bktnum
250 return ()
251 seconds -> threadDelay ( seconds * 1000000 )
252 again