diff options
-rw-r--r-- | Kademlia.hs | 110 |
1 files changed, 102 insertions, 8 deletions
diff --git a/Kademlia.hs b/Kademlia.hs index c8eb2b93..9316f135 100644 --- a/Kademlia.hs +++ b/Kademlia.hs | |||
@@ -3,12 +3,14 @@ | |||
3 | {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} | 3 | {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} |
4 | -- {-# LANGUAGE TypeFamilies #-} | 4 | -- {-# LANGUAGE TypeFamilies #-} |
5 | {-# LANGUAGE GADTs #-} | 5 | {-# LANGUAGE GADTs #-} |
6 | {-# LANGUAGE PatternSynonyms #-} | ||
6 | module Kademlia where | 7 | module Kademlia where |
7 | 8 | ||
9 | import Data.Function | ||
8 | import Data.Maybe | 10 | import Data.Maybe |
9 | import Network.DHT.Routing as R | 11 | import Network.DHT.Routing as R |
10 | import Data.Time.Clock (getCurrentTime) | 12 | import Data.Time.Clock (getCurrentTime) |
11 | import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) | 13 | import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, getPOSIXTime) |
12 | #ifdef THREAD_DEBUG | 14 | #ifdef THREAD_DEBUG |
13 | import Control.Concurrent.Lifted.Instrument | 15 | import Control.Concurrent.Lifted.Instrument |
14 | #else | 16 | #else |
@@ -24,7 +26,8 @@ import Control.Concurrent.STM | |||
24 | import Control.Monad | 26 | import Control.Monad |
25 | import Data.Monoid | 27 | import Data.Monoid |
26 | import Data.Time.Clock.POSIX (POSIXTime) | 28 | import Data.Time.Clock.POSIX (POSIXTime) |
27 | 29 | import Data.Wrapper.PSQInt ( pattern (:->) ) | |
30 | import qualified Data.Wrapper.PSQInt as Int | ||
28 | 31 | ||
29 | -- | The status of a given node with respect to a given routint table. | 32 | -- | The status of a given node with respect to a given routint table. |
30 | data RoutingStatus | 33 | data RoutingStatus |
@@ -64,7 +67,7 @@ contramapIR f ir = InsertionReporter | |||
64 | } | 67 | } |
65 | 68 | ||
66 | -- | All the IO operations neccessary to maintain a Kademlia routing table. | 69 | -- | All the IO operations neccessary to maintain a Kademlia routing table. |
67 | data TableStateIO nid ni = TableStateIO | 70 | data TableStateIO ni = TableStateIO |
68 | { -- | Write the routing table. Typically 'writeTVar'. | 71 | { -- | Write the routing table. Typically 'writeTVar'. |
69 | tblWrite :: R.BucketList ni -> STM () | 72 | tblWrite :: R.BucketList ni -> STM () |
70 | 73 | ||
@@ -88,7 +91,7 @@ data TableStateIO nid ni = TableStateIO | |||
88 | , tblTransition :: RoutingTransition ni -> STM (IO ()) | 91 | , tblTransition :: RoutingTransition ni -> STM (IO ()) |
89 | } | 92 | } |
90 | 93 | ||
91 | vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO nid ni | 94 | vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni |
92 | vanillaIO var ping = TableStateIO | 95 | vanillaIO var ping = TableStateIO |
93 | { tblRead = readTVar var | 96 | { tblRead = readTVar var |
94 | , tblWrite = writeTVar var | 97 | , tblWrite = writeTVar var |
@@ -100,7 +103,7 @@ vanillaIO var ping = TableStateIO | |||
100 | -- information) entries. | 103 | -- information) entries. |
101 | data Kademlia nid ni = Kademlia (InsertionReporter ni) | 104 | data Kademlia nid ni = Kademlia (InsertionReporter ni) |
102 | (KademliaSpace nid ni) | 105 | (KademliaSpace nid ni) |
103 | (TableStateIO nid ni) | 106 | (TableStateIO ni) |
104 | 107 | ||
105 | 108 | ||
106 | -- Helper to 'insertNode'. | 109 | -- Helper to 'insertNode'. |
@@ -132,8 +135,8 @@ insertNode (Kademlia reporter space io) node = do | |||
132 | tblWrite io t' | 135 | tblWrite io t' |
133 | reaction <- case ps of | 136 | reaction <- case ps of |
134 | _ | inserted -> -- Node transition: Stranger --> Accepted | 137 | _ | inserted -> -- Node transition: Stranger --> Accepted |
135 | tblTransition io $ RoutingTransition node Accepted | 138 | tblTransition io $ RoutingTransition node Accepted |
136 | (_:_) -> -- Node transition: Stranger --> Applicant | 139 | (_:_) -> -- Node transition: Stranger --> Applicant |
137 | tblTransition io $ RoutingTransition node Applicant | 140 | tblTransition io $ RoutingTransition node Applicant |
138 | _ -> return $ return () | 141 | _ -> return $ return () |
139 | return (ps, reaction) | 142 | return (ps, reaction) |
@@ -145,7 +148,7 @@ insertNode (Kademlia reporter space io) node = do | |||
145 | myThreadId >>= flip labelThread "pingResults" | 148 | myThreadId >>= flip labelThread "pingResults" |
146 | forM_ ps $ \n -> do | 149 | forM_ ps $ \n -> do |
147 | b <- tblPing io n | 150 | b <- tblPing io n |
148 | reportPingResult reporter tm n b | 151 | reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result |
149 | join $ atomically $ do | 152 | join $ atomically $ do |
150 | tbl <- tblRead io | 153 | tbl <- tblRead io |
151 | let (replacements, t') = R.updateForPingResult space n b tbl | 154 | let (replacements, t') = R.updateForPingResult space n b tbl |
@@ -156,3 +159,94 @@ insertNode (Kademlia reporter space io) node = do | |||
156 | return $ sequence_ ios | 159 | return $ sequence_ ios |
157 | 160 | ||
158 | return () | 161 | return () |
162 | |||
163 | |||
164 | -- TODO: Bootstrap/Refresh | ||
165 | -- | ||
166 | -- From BEP 05: | ||
167 | -- | ||
168 | -- Each bucket should maintain a "last changed" property to indicate how | ||
169 | -- "fresh" the contents are. | ||
170 | -- | ||
171 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
172 | -- a priority search queue. | ||
173 | -- | ||
174 | -- When... | ||
175 | -- | ||
176 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
177 | -- >>> bucketEvents = | ||
178 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
179 | -- >>> | ||
180 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
181 | -- >>> | ||
182 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
183 | -- >>> , Applicant :--> Accepted -- with another node, | ||
184 | -- >>> ] | ||
185 | -- | ||
186 | -- the bucket's last changed property should be updated. Buckets | ||
187 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
188 | -- by picking a random ID in the range of the bucket and performing a | ||
189 | -- find_nodes search on it. | ||
190 | -- | ||
191 | -- The only other possible BucketTouchEvents are as follows: | ||
192 | -- | ||
193 | -- >>> not_handled = | ||
194 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
195 | -- >>> -- (Applicant :--> Stranger) | ||
196 | -- >>> -- (Applicant :--> Accepted) | ||
197 | -- >>> , Accepted :--> Applicant -- Never happens | ||
198 | -- >>> ] | ||
199 | -- | ||
200 | |||
201 | -- XXX: This will be redundantly triggered twice upon every node replacement | ||
202 | -- because we do not currently distinguish between standalone | ||
203 | -- insertion/deletion events and an insertion/deletion pair constituting | ||
204 | -- replacement. | ||
205 | -- | ||
206 | -- It might also be better to pass the timestamp of the transition here and | ||
207 | -- keep the refresh queue in better sync with the routing table by updating it | ||
208 | -- within the STM monad. | ||
209 | touchBucket :: POSIXTime -> (ni -> Int) -> TVar (Int.PSQ POSIXTime) -> RoutingTransition ni -> STM (IO ()) | ||
210 | touchBucket interval bktnum psq tr | ||
211 | | (transitionedTo tr == Applicant) | ||
212 | = return $ return () | ||
213 | | otherwise = return $ do | ||
214 | now <- getPOSIXTime | ||
215 | atomically $ modifyTVar' psq | ||
216 | $ Int.insert (bktnum $ transitioningNode tr) | ||
217 | (now + interval) | ||
218 | |||
219 | -- | > pollForRefresh interval queue refresh | ||
220 | -- | ||
221 | -- Fork a refresh loop. Kill the returned thread to terminate it. The | ||
222 | -- arguments are: a staleness threshold (if a bucket goes this long without | ||
223 | -- being touched, a refresh will be triggered), a TVar with the time-to-refresh | ||
224 | -- schedule for each bucket, and a refresh action to be forked when a bucket | ||
225 | -- excedes the staleness threshold. | ||
226 | -- | ||
227 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's | ||
228 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the | ||
229 | -- TVar. | ||
230 | pollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO ()) -> IO ThreadId | ||
231 | pollForRefresh interval psq refresh = do | ||
232 | fork $ do | ||
233 | myThreadId >>= flip labelThread "pollForRefresh" | ||
234 | fix $ \again -> do | ||
235 | join $ atomically $ do | ||
236 | nextup <- Int.findMin <$> readTVar psq | ||
237 | maybe retry (return . go again) nextup | ||
238 | where | ||
239 | go again ( bktnum :-> refresh_time ) = do | ||
240 | now <- getPOSIXTime | ||
241 | case fromEnum (refresh_time - now) of | ||
242 | x | x <= 0 -> do -- Refresh time! | ||
243 | -- Move it to the back of the refresh queue. | ||
244 | atomically $ modifyTVar' psq | ||
245 | $ Int.insert bktnum (now + interval) | ||
246 | -- Now fork the refresh operation. | ||
247 | -- TODO: We should probably propogate the kill signal to this thread. | ||
248 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | ||
249 | refresh bktnum | ||
250 | return () | ||
251 | seconds -> threadDelay ( seconds * 1000000 ) | ||
252 | again | ||