diff options
Diffstat (limited to 'kad')
-rw-r--r-- | kad/src/Network/Kademlia/Bootstrap.hs | 44 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/CommonAPI.hs | 8 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Persistence.hs | 2 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 9 |
4 files changed, 52 insertions, 11 deletions
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs index 08ba3318..c07b3c6c 100644 --- a/kad/src/Network/Kademlia/Bootstrap.hs +++ b/kad/src/Network/Kademlia/Bootstrap.hs | |||
@@ -16,6 +16,8 @@ | |||
16 | module Network.Kademlia.Bootstrap where | 16 | module Network.Kademlia.Bootstrap where |
17 | 17 | ||
18 | import Data.Function | 18 | import Data.Function |
19 | import qualified Data.IntMap.Strict as IntMap | ||
20 | ;import Data.IntMap.Strict (IntMap) | ||
19 | import Data.Maybe | 21 | import Data.Maybe |
20 | import qualified Data.Set as Set | 22 | import qualified Data.Set as Set |
21 | import Data.Time.Clock.POSIX (getPOSIXTime) | 23 | import Data.Time.Clock.POSIX (getPOSIXTime) |
@@ -71,10 +73,12 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | |||
71 | , -- | Timestamp of last bucket event. | 73 | , -- | Timestamp of last bucket event. |
72 | refreshLastTouch :: TVar POSIXTime | 74 | refreshLastTouch :: TVar POSIXTime |
73 | , -- | This variable indicates whether or not we are in bootstrapping mode. | 75 | , -- | This variable indicates whether or not we are in bootstrapping mode. |
74 | bootstrapMode :: TVar Bool | 76 | bootstrapMode :: TVar Bool |
75 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on | 77 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on |
76 | -- every finished refresh. | 78 | -- every finished refresh. |
77 | bootstrapCountdown :: TVar (Maybe Int) | 79 | bootstrapCountdown :: TVar (Maybe Int) |
80 | -- | Internal state of background searches. Exposed for debugging purposes. | ||
81 | , refreshState :: TVar (IntMap [BucketSearch nid ni]) | ||
78 | } | 82 | } |
79 | 83 | ||
80 | newBucketRefresher :: ( Ord addr, Hashable addr | 84 | newBucketRefresher :: ( Ord addr, Hashable addr |
@@ -91,6 +95,7 @@ newBucketRefresher bkts sch ping = do | |||
91 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... | 95 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... |
92 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. | 96 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. |
93 | bootstrapCnt <- newTVar Nothing | 97 | bootstrapCnt <- newTVar Nothing |
98 | st <- newTVar IntMap.empty | ||
94 | return BucketRefresher | 99 | return BucketRefresher |
95 | { refreshInterval = 15 * 60 | 100 | { refreshInterval = 15 * 60 |
96 | , refreshQueue = sched | 101 | , refreshQueue = sched |
@@ -100,6 +105,7 @@ newBucketRefresher bkts sch ping = do | |||
100 | , refreshLastTouch = lasttouch | 105 | , refreshLastTouch = lasttouch |
101 | , bootstrapMode = bootstrapVar | 106 | , bootstrapMode = bootstrapVar |
102 | , bootstrapCountdown = bootstrapCnt | 107 | , bootstrapCountdown = bootstrapCnt |
108 | , refreshState = st | ||
103 | } | 109 | } |
104 | 110 | ||
105 | -- | This was added to avoid the compile error "Record update for | 111 | -- | This was added to avoid the compile error "Record update for |
@@ -118,6 +124,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | |||
118 | , refreshLastTouch = refreshLastTouch | 124 | , refreshLastTouch = refreshLastTouch |
119 | , bootstrapMode = bootstrapMode | 125 | , bootstrapMode = bootstrapMode |
120 | , bootstrapCountdown = bootstrapCountdown | 126 | , bootstrapCountdown = bootstrapCountdown |
127 | , refreshState = refreshState | ||
121 | } | 128 | } |
122 | 129 | ||
123 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | 130 | -- | Fork a refresh loop. Kill the returned thread to terminate it. |
@@ -228,10 +235,29 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown | |||
228 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." | 235 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." |
229 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) | 236 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) |
230 | 237 | ||
238 | data BucketSearch nid ni = forall addr tok. BucketSearch | ||
239 | { bucketSample :: nid | ||
240 | , bucketResults :: TVar (Set.Set ni) | ||
241 | , bucketFinFlag :: TVar Bool | ||
242 | , bucketState :: SearchState nid addr tok ni ni | ||
243 | , bucketThread :: ThreadId | ||
244 | } | ||
245 | |||
246 | removeBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] | ||
247 | removeBucketState bst Nothing = Nothing | ||
248 | removeBucketState bst (Just xs) = case filter (\b -> bucketThread b /= bucketThread bst) xs of | ||
249 | [] -> Nothing | ||
250 | ys -> Just ys | ||
251 | |||
252 | insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] | ||
253 | insertBucketState bst Nothing = Just [bst] | ||
254 | insertBucketState bst (Just xs) = Just (bst : xs) | ||
255 | |||
231 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => | 256 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => |
232 | BucketRefresher nid ni -> Int -> IO Int | 257 | BucketRefresher nid ni -> Int -> IO Int |
233 | refreshBucket r@BucketRefresher{ refreshSearch = sch | 258 | refreshBucket r@BucketRefresher{ refreshSearch = sch |
234 | , refreshBuckets = var } | 259 | , refreshBuckets = var |
260 | , refreshState = rstate } | ||
235 | n = do | 261 | n = do |
236 | tbl <- atomically (readTVar var) | 262 | tbl <- atomically (readTVar var) |
237 | let count = bktCount tbl | 263 | let count = bktCount tbl |
@@ -248,13 +274,21 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch | |||
248 | dput XRefresh $ "Start refresh " ++ show (n,sample) | 274 | dput XRefresh $ "Start refresh " ++ show (n,sample) |
249 | 275 | ||
250 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | 276 | -- Set 15 minute timeout in order to avoid overlapping refreshes. |
251 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | 277 | (s,thread) <- search sch tbl sample $ if n+1 == R.defaultBucketCount |
252 | then const $ return True -- Never short-circuit the last bucket. | 278 | then const $ return True -- Never short-circuit the last bucket. |
253 | else checkBucketFull (searchSpace sch) var resultCounter fin n | 279 | else checkBucketFull (searchSpace sch) var resultCounter fin n |
280 | let bstate = BucketSearch sample resultCounter fin s thread | ||
281 | atomically $ modifyTVar' rstate $ IntMap.alter (insertBucketState bstate) n | ||
254 | _ <- timeout (15*60*1000000) $ do | 282 | _ <- timeout (15*60*1000000) $ do |
255 | atomically $ searchIsFinished s >>= check | 283 | atomically $ searchIsFinished s >>= check |
256 | atomically $ searchCancel s | 284 | atomically $ searchCancel s |
257 | dput XDHT $ "Finish refresh " ++ show (n,sample) | 285 | dput XDHT $ "Finish refresh " ++ show (n,sample) |
286 | bg <- forkIO $ do | ||
287 | atomically $ do | ||
288 | searchIsFinished s >>= check | ||
289 | modifyTVar' rstate $ IntMap.alter (removeBucketState bstate) n | ||
290 | |||
291 | labelThread bg ("backgrounded." ++ show n ++ "." ++ show sample) | ||
258 | now <- getPOSIXTime | 292 | now <- getPOSIXTime |
259 | join $ atomically $ onFinishedRefresh r n now | 293 | join $ atomically $ onFinishedRefresh r n now |
260 | rcount <- atomically $ do | 294 | rcount <- atomically $ do |
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs index 4714cecc..6d3fd16c 100644 --- a/kad/src/Network/Kademlia/CommonAPI.hs +++ b/kad/src/Network/Kademlia/CommonAPI.hs | |||
@@ -1,5 +1,8 @@ | |||
1 | {-# LANGUAGE ExistentialQuantification #-} | 1 | {-# LANGUAGE ExistentialQuantification #-} |
2 | module Network.Kademlia.CommonAPI where | 2 | module Network.Kademlia.CommonAPI |
3 | ( module Network.Kademlia.CommonAPI | ||
4 | , refreshBuckets | ||
5 | ) where | ||
3 | 6 | ||
4 | 7 | ||
5 | import Control.Concurrent | 8 | import Control.Concurrent |
@@ -12,6 +15,7 @@ import qualified Data.Set as Set | |||
12 | import Data.Time.Clock.POSIX | 15 | import Data.Time.Clock.POSIX |
13 | import Data.Typeable | 16 | import Data.Typeable |
14 | 17 | ||
18 | import Network.Kademlia.Bootstrap | ||
15 | import Network.Kademlia.Search | 19 | import Network.Kademlia.Search |
16 | import Network.Kademlia.Routing as R | 20 | import Network.Kademlia.Routing as R |
17 | import Crypto.Tox (SecretKey,PublicKey) | 21 | import Crypto.Tox (SecretKey,PublicKey) |
@@ -29,7 +33,7 @@ data DHT = forall nid ni. ( Show ni | |||
29 | , S.Serialize nid | 33 | , S.Serialize nid |
30 | ) => | 34 | ) => |
31 | DHT | 35 | DHT |
32 | { dhtBuckets :: TVar (BucketList ni) | 36 | { dhtBuckets :: BucketRefresher nid ni |
33 | , dhtSecretKey :: STM (Maybe SecretKey) | 37 | , dhtSecretKey :: STM (Maybe SecretKey) |
34 | , dhtPing :: Map.Map String (DHTPing ni) | 38 | , dhtPing :: Map.Map String (DHTPing ni) |
35 | , dhtQuery :: Map.Map String (DHTQuery nid ni) | 39 | , dhtQuery :: Map.Map String (DHTQuery nid ni) |
diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs index 32ec169d..f89287fe 100644 --- a/kad/src/Network/Kademlia/Persistence.hs +++ b/kad/src/Network/Kademlia/Persistence.hs | |||
@@ -16,7 +16,7 @@ import System.IO.Error | |||
16 | 16 | ||
17 | saveNodes :: String -> DHT -> IO () | 17 | saveNodes :: String -> DHT -> IO () |
18 | saveNodes netname DHT{dhtBuckets} = do | 18 | saveNodes netname DHT{dhtBuckets} = do |
19 | bkts <- atomically $ readTVar dhtBuckets | 19 | bkts <- atomically $ readTVar (refreshBuckets dhtBuckets) |
20 | let ns = map fst $ concat $ R.toList bkts | 20 | let ns = map fst $ concat $ R.toList bkts |
21 | bs = J.encode ns | 21 | bs = J.encode ns |
22 | fname = nodesFileName netname | 22 | fname = nodesFileName netname |
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 5b60c303..856a7cfc 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -194,12 +194,15 @@ search :: | |||
194 | , PSQKey nid | 194 | , PSQKey nid |
195 | , PSQKey ni | 195 | , PSQKey ni |
196 | , Show nid | 196 | , Show nid |
197 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) | 197 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) |
198 | search sch buckets target result = do | 198 | search sch buckets target result = do |
199 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets | 199 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets |
200 | st <- atomically $ newSearch sch target ns | 200 | st <- atomically $ newSearch sch target ns |
201 | t <- forkIO $ searchLoop sch target result st | 201 | v <- newTVarIO False |
202 | return st | 202 | t <- forkIO $ atomically (check =<< readTVar v) >> searchLoop sch target result st |
203 | labelThread t ("search.pending." ++ show target) | ||
204 | atomically $ writeTVar v True | ||
205 | return (st,t) | ||
203 | 206 | ||
204 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) | 207 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) |
205 | => Search nid addr tok ni r -- ^ Query and distance methods. | 208 | => Search nid addr tok ni r -- ^ Query and distance methods. |