diff options
author | joe <joe@jerkface.net> | 2017-01-05 12:18:43 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-05 12:18:43 -0500 |
commit | 5e2f43d967aa2d07368b7d5552f65a69b3979ab5 (patch) | |
tree | 7c84933442cdcd4b6e52c644842e64e34ae906b0 | |
parent | 990296703f511efe2bc2899d514dbe2a20247c88 (diff) |
Routing Table : use STM and per-bucket ping queues
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 54 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 182 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 6 |
3 files changed, 163 insertions, 79 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 7f20ad6d..99d8cdaf 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -73,6 +73,7 @@ import Network.BitTorrent.Address | |||
73 | import Network.BitTorrent.DHT.Message | 73 | import Network.BitTorrent.DHT.Message |
74 | import Network.BitTorrent.DHT.Routing as R | 74 | import Network.BitTorrent.DHT.Routing as R |
75 | import Network.BitTorrent.DHT.Session | 75 | import Network.BitTorrent.DHT.Session |
76 | import Control.Concurrent.STM | ||
76 | 77 | ||
77 | {----------------------------------------------------------------------- | 78 | {----------------------------------------------------------------------- |
78 | -- Handlers | 79 | -- Handlers |
@@ -104,7 +105,10 @@ findNodeH = nodeHandler $ \ _ (FindNode nid) -> do | |||
104 | -- | Default 'GetPeers' handler. | 105 | -- | Default 'GetPeers' handler. |
105 | getPeersH :: Address ip => NodeHandler ip | 106 | getPeersH :: Address ip => NodeHandler ip |
106 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do | 107 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do |
107 | GotPeers <$> getPeerList ih <*> grantToken naddr | 108 | ps <- getPeerList ih |
109 | tok <- grantToken naddr | ||
110 | $(logDebugS) "getPeersH" $ "INFO-HASH " <> T.pack (show (ih,fmap fromAddr naddr :: NodeAddr (Maybe IP))) | ||
111 | return $ GotPeers ps tok | ||
108 | 112 | ||
109 | -- | Default 'Announce' handler. | 113 | -- | Default 'Announce' handler. |
110 | announceH :: Address ip => NodeHandler ip | 114 | announceH :: Address ip => NodeHandler ip |
@@ -236,20 +240,33 @@ refreshNodes nid = do | |||
236 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId | 240 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId |
237 | insertNode info = fork $ do | 241 | insertNode info = fork $ do |
238 | var <- asks routingTable | 242 | var <- asks routingTable |
239 | t <- takeMVar var | 243 | tm <- getTimestamp |
240 | t' <- do -- modifyMVar_ var $ \ t -> do | 244 | let showTable = do |
241 | result <- routing (R.insert info t) | 245 | t <- liftIO $ atomically $ readTVar var |
242 | case result of | 246 | let logMsg = "Routing table: " <> pPrint t |
243 | Nothing -> do | 247 | $(logDebugS) "insertNode" (T.pack (render logMsg)) |
244 | $(logDebugS) "insertNode" $ "Routing table is full: " | 248 | t <- liftIO $ atomically $ readTVar var |
245 | <> T.pack (show (pPrint t)) | 249 | let arrival = TryInsert info |
246 | return t | 250 | arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) |
247 | Just t' -> do | 251 | $(logDebugS) "insertNode" $ T.pack (show arrival4) |
248 | let logMsg = "Routing table updated: " | 252 | ps <- liftIO $ atomically $ do |
249 | <> pPrint t <> " -> " <> pPrint t' | 253 | t <- readTVar var |
250 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | 254 | (ps,t') <- R.insert tm arrival t |
251 | return t' | 255 | writeTVar var t' |
252 | putMVar var t' | 256 | return ps |
257 | showTable | ||
258 | fork $ forM_ ps $ \(CheckPing ns)-> do | ||
259 | forM_ ns $ \n -> do | ||
260 | alive <- PingResult n <$> probeNode (nodeAddr n) | ||
261 | let PingResult _ b = alive | ||
262 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) | ||
263 | tm <- getTimestamp | ||
264 | liftIO $ atomically $ do | ||
265 | t <- readTVar var | ||
266 | (_,t') <- R.insert tm alive t | ||
267 | writeTVar var t' | ||
268 | showTable | ||
269 | return () | ||
253 | 270 | ||
254 | -- | Throws exception if node is not responding. | 271 | -- | Throws exception if node is not responding. |
255 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 272 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
@@ -268,9 +285,4 @@ q <@> addr = snd <$> queryNode addr q | |||
268 | {-# INLINE (<@>) #-} | 285 | {-# INLINE (<@>) #-} |
269 | 286 | ||
270 | restoreTable :: Address ip => Table ip -> DHT ip () | 287 | restoreTable :: Address ip => Table ip -> DHT ip () |
271 | restoreTable tbl = do | 288 | restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl |
272 | tblvar <- asks routingTable | ||
273 | tbl0 <- liftIO $ takeMVar tblvar | ||
274 | mb <- routing $ merge tbl tbl0 | ||
275 | maybe (return ()) (liftIO . putMVar tblvar) mb | ||
276 | |||
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 68edef56..14aec612 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -17,6 +17,7 @@ | |||
17 | {-# LANGUAGE ViewPatterns #-} | 17 | {-# LANGUAGE ViewPatterns #-} |
18 | {-# LANGUAGE TypeOperators #-} | 18 | {-# LANGUAGE TypeOperators #-} |
19 | {-# LANGUAGE DeriveGeneric #-} | 19 | {-# LANGUAGE DeriveGeneric #-} |
20 | {-# LANGUAGE ScopedTypeVariables #-} | ||
20 | {-# OPTIONS_GHC -fno-warn-orphans #-} | 21 | {-# OPTIONS_GHC -fno-warn-orphans #-} |
21 | module Network.BitTorrent.DHT.Routing | 22 | module Network.BitTorrent.DHT.Routing |
22 | ( -- * Table | 23 | ( -- * Table |
@@ -45,8 +46,9 @@ module Network.BitTorrent.DHT.Routing | |||
45 | 46 | ||
46 | -- * Construction | 47 | -- * Construction |
47 | , Network.BitTorrent.DHT.Routing.nullTable | 48 | , Network.BitTorrent.DHT.Routing.nullTable |
49 | , Event(..) | ||
50 | , CheckPing(..) | ||
48 | , Network.BitTorrent.DHT.Routing.insert | 51 | , Network.BitTorrent.DHT.Routing.insert |
49 | , Network.BitTorrent.DHT.Routing.merge | ||
50 | 52 | ||
51 | -- * Conversion | 53 | -- * Conversion |
52 | , Network.BitTorrent.DHT.Routing.TableEntry | 54 | , Network.BitTorrent.DHT.Routing.TableEntry |
@@ -62,17 +64,19 @@ import Control.Applicative as A | |||
62 | import Control.Arrow | 64 | import Control.Arrow |
63 | import Control.Monad | 65 | import Control.Monad |
64 | import Data.Function | 66 | import Data.Function |
67 | import Data.Functor.Identity | ||
65 | import Data.List as L hiding (insert) | 68 | import Data.List as L hiding (insert) |
66 | import Data.Maybe | 69 | import Data.Maybe |
67 | import Data.Monoid | 70 | import Data.Monoid |
68 | import Data.PSQueue as PSQ | 71 | import Data.PSQueue as PSQ |
69 | import Data.Serialize as S hiding (Result, Done) | 72 | import Data.Serialize as S hiding (Result, Done) |
73 | import qualified Data.Sequence as Seq | ||
70 | import Data.Time | 74 | import Data.Time |
71 | import Data.Time.Clock.POSIX | 75 | import Data.Time.Clock.POSIX |
72 | import Data.Word | 76 | import Data.Word |
73 | import GHC.Generics | 77 | import GHC.Generics |
74 | import Text.PrettyPrint as PP hiding ((<>)) | 78 | import Text.PrettyPrint as PP hiding ((<>)) |
75 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | 79 | import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) |
76 | 80 | ||
77 | import Data.Torrent | 81 | import Data.Torrent |
78 | import Network.BitTorrent.Address | 82 | import Network.BitTorrent.Address |
@@ -199,6 +203,37 @@ type BucketSize = Int | |||
199 | defaultBucketSize :: BucketSize | 203 | defaultBucketSize :: BucketSize |
200 | defaultBucketSize = 8 | 204 | defaultBucketSize = 8 |
201 | 205 | ||
206 | data QueueMethods m elem fifo = QueueMethods | ||
207 | { pushBack :: elem -> fifo -> m fifo | ||
208 | , popFront :: fifo -> m (Maybe elem, fifo) | ||
209 | , emptyQueue :: m fifo | ||
210 | } | ||
211 | |||
212 | fromQ :: Functor m => | ||
213 | ( a -> b ) | ||
214 | -> ( b -> a ) | ||
215 | -> QueueMethods m elem a | ||
216 | -> QueueMethods m elem b | ||
217 | fromQ embed project QueueMethods{..} = | ||
218 | QueueMethods { pushBack = \e -> fmap embed . pushBack e . project | ||
219 | , popFront = fmap (second embed) . popFront . project | ||
220 | , emptyQueue = fmap embed emptyQueue | ||
221 | } | ||
222 | |||
223 | seqQ :: QueueMethods Identity (NodeInfo ip) (Seq.Seq (NodeInfo ip)) | ||
224 | seqQ = QueueMethods | ||
225 | { pushBack = \e fifo -> pure (fifo Seq.|> e) | ||
226 | , popFront = \fifo -> case Seq.viewl fifo of | ||
227 | e Seq.:< fifo' -> pure (Just e, fifo') | ||
228 | Seq.EmptyL -> pure (Nothing, Seq.empty) | ||
229 | , emptyQueue = pure Seq.empty | ||
230 | } | ||
231 | |||
232 | type BucketQueue ip = Seq.Seq (NodeInfo ip) | ||
233 | |||
234 | bucketQ :: QueueMethods Identity (NodeInfo ip) (BucketQueue ip) | ||
235 | bucketQ = seqQ | ||
236 | |||
202 | -- | Bucket is also limited in its length — thus it's called k-bucket. | 237 | -- | Bucket is also limited in its length — thus it's called k-bucket. |
203 | -- When bucket becomes full, we should split it in two lists by | 238 | -- When bucket becomes full, we should split it in two lists by |
204 | -- current span bit. Span bit is defined by depth in the routing | 239 | -- current span bit. Span bit is defined by depth in the routing |
@@ -206,7 +241,11 @@ defaultBucketSize = 8 | |||
206 | -- very unlikely that all nodes in bucket fail within an hour of | 241 | -- very unlikely that all nodes in bucket fail within an hour of |
207 | -- each other. | 242 | -- each other. |
208 | -- | 243 | -- |
209 | type Bucket ip = PSQ (NodeInfo ip) Timestamp | 244 | data Bucket ip = Bucket { bktNodes :: PSQ (NodeInfo ip) Timestamp |
245 | , bktQ :: BucketQueue ip | ||
246 | } deriving (Show,Generic) | ||
247 | |||
248 | instance (Eq ip, Serialize ip) => Serialize (Bucket ip) | ||
210 | 249 | ||
211 | instance (Serialize k, Serialize v, Ord k, Ord v) | 250 | instance (Serialize k, Serialize v, Ord k, Ord v) |
212 | => Serialize (PSQ k v) where | 251 | => Serialize (PSQ k v) where |
@@ -219,59 +258,77 @@ lastChanged bucket | |||
219 | | L.null timestamps = Nothing | 258 | | L.null timestamps = Nothing |
220 | | otherwise = Just (L.maximumBy (compare `on` prio) timestamps) | 259 | | otherwise = Just (L.maximumBy (compare `on` prio) timestamps) |
221 | where | 260 | where |
222 | timestamps = PSQ.toList bucket | 261 | timestamps = PSQ.toList $ bktNodes bucket |
223 | 262 | ||
224 | leastRecently :: Eq ip => Bucket ip -> Maybe (NodeEntry ip, Bucket ip) | 263 | leastRecently :: Eq ip => Bucket ip -> Maybe (NodeEntry ip, Bucket ip) |
225 | leastRecently = minView | 264 | leastRecently b = fmap (\(e,ns) -> (e, b { bktNodes = ns })) $ minView $ bktNodes b |
226 | 265 | ||
227 | -- | Update interval, in seconds. | 266 | -- | Update interval, in seconds. |
228 | delta :: NominalDiffTime | 267 | delta :: NominalDiffTime |
229 | delta = 15 * 60 | 268 | delta = 15 * 60 |
230 | 269 | ||
231 | -- | Should maintain a set of stable long running nodes. | 270 | -- | Should maintain a set of stable long running nodes. |
232 | insertBucket :: Eq ip => Timestamp -> NodeInfo ip -> Bucket ip | 271 | -- |
233 | -> ip `Routing` Bucket ip | 272 | -- Note: pings are triggerd only when a bucket is full. |
234 | insertBucket curTime info bucket | 273 | insertBucket :: (Eq ip, Alternative f) => Timestamp -> Event ip -> Bucket ip |
274 | -> f ([CheckPing ip], Bucket ip) | ||
275 | insertBucket curTime (TryInsert info) bucket | ||
235 | -- just update timestamp if a node is already in bucket | 276 | -- just update timestamp if a node is already in bucket |
236 | | Just _ <- PSQ.lookup info bucket = do | 277 | | already_have |
237 | return $ PSQ.insertWith max info curTime bucket | 278 | = pure ( [], map_ns $ PSQ.insertWith max info curTime ) |
238 | 279 | -- bucket is good, but not full => we can insert a new node | |
239 | -- Buckets that have not been changed in 15 minutes should be "refreshed." | 280 | | PSQ.size (bktNodes bucket) < defaultBucketSize |
240 | | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket | 281 | = pure ( [], map_ns $ PSQ.insert info curTime ) |
241 | , curTime - lastSeen > delta = do | ||
242 | refresh nodeId | ||
243 | insertBucket curTime info bucket | ||
244 | |||
245 | -- If there are any questionable nodes in the bucket have not been | 282 | -- If there are any questionable nodes in the bucket have not been |
246 | -- seen in the last 15 minutes, the least recently seen node is | 283 | -- seen in the last 15 minutes, the least recently seen node is |
247 | -- pinged. If any nodes in the bucket are known to have become bad, | 284 | -- pinged. If any nodes in the bucket are known to have become bad, |
248 | -- then one is replaced by the new node in the next insertBucket | 285 | -- then one is replaced by the new node in the next insertBucket |
249 | -- iteration. | 286 | -- iteration. |
250 | | Just ((old @ NodeInfo {..} :-> leastSeen), rest) <- leastRecently bucket | 287 | | not (L.null stales) |
251 | , curTime - leastSeen > delta = do | 288 | = pure ( [CheckPing stales], map_q $ pushBack bucketQ info ) |
252 | pong <- needPing nodeAddr | 289 | -- When the bucket is full of good nodes, the new node is simply discarded. |
253 | pongTime <- getTime | 290 | -- We must return 'A.empty' here to ensure that bucket splitting happens |
254 | let newBucket = if pong then PSQ.insert old pongTime bucket else rest | 291 | -- inside 'modifyBucket'. |
255 | insertBucket pongTime info newBucket | 292 | | otherwise = A.empty |
293 | where | ||
294 | stales = map key $ PSQ.atMost (curTime - delta) $ bktNodes bucket | ||
256 | 295 | ||
257 | -- bucket is good, but not full => we can insert a new node | 296 | already_have = maybe False (const True) $ PSQ.lookup info (bktNodes bucket) |
258 | | PSQ.size bucket < defaultBucketSize = do | ||
259 | return $ PSQ.insert info curTime bucket | ||
260 | 297 | ||
261 | -- When the bucket is full of good nodes, the new node is simply discarded. | 298 | map_ns f = bucket { bktNodes = f (bktNodes bucket) } |
262 | | otherwise = Full | 299 | map_q f = bucket { bktQ = runIdentity $ f (bktQ bucket) } |
263 | 300 | ||
264 | insertNode :: Eq ip => NodeInfo ip -> Bucket ip -> ip `Routing` Bucket ip | 301 | insertBucket curTime (PingResult bad_node got_response) bucket |
265 | insertNode info bucket = do | 302 | = pure ([], Bucket (update $ bktNodes bucket) popped) |
266 | curTime <- getTime | 303 | where |
267 | insertBucket curTime info bucket | 304 | (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) |
305 | update | got_response = id | ||
306 | | Just info <- top = PSQ.insert info curTime . PSQ.delete bad_node | ||
307 | | otherwise = id | ||
268 | 308 | ||
269 | type BitIx = Word | 309 | type BitIx = Word |
270 | 310 | ||
311 | partitionQ imp pred q = do | ||
312 | pass <- emptyQueue imp | ||
313 | fail <- emptyQueue imp | ||
314 | let flipfix a b f = fix f a b | ||
315 | flipfix q (pass,fail) $ \loop q qs -> do | ||
316 | (mb,q') <- popFront imp q | ||
317 | case mb of | ||
318 | Nothing -> return qs | ||
319 | Just e -> do qs' <- select (pushBack imp e) qs | ||
320 | loop q' qs' | ||
321 | where | ||
322 | select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) | ||
323 | select f = if pred e then \(a,b) -> flip (,) b <$> f a | ||
324 | else \(a,b) -> (,) a <$> f b | ||
325 | |||
271 | split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip) | 326 | split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip) |
272 | split i = (PSQ.fromList *** PSQ.fromList) . partition spanBit . PSQ.toList | 327 | split i b = (Bucket ns qs, Bucket ms rs) |
273 | where | 328 | where |
274 | spanBit entry = testIdBit (nodeId (key entry)) i | 329 | (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . key) . PSQ.toList $ bktNodes b |
330 | (qs,rs) = runIdentity $ partitionQ bucketQ spanBit $ bktQ b | ||
331 | spanBit entry = testIdBit (nodeId entry) i | ||
275 | 332 | ||
276 | {----------------------------------------------------------------------- | 333 | {----------------------------------------------------------------------- |
277 | -- Table | 334 | -- Table |
@@ -335,22 +392,22 @@ instance Pretty (Table ip) where | |||
335 | 392 | ||
336 | -- | Empty table with specified /spine/ node id. | 393 | -- | Empty table with specified /spine/ node id. |
337 | nullTable :: Eq ip => NodeId -> BucketCount -> Table ip | 394 | nullTable :: Eq ip => NodeId -> BucketCount -> Table ip |
338 | nullTable nid n = Tip nid (bucketCount (pred n)) PSQ.empty | 395 | nullTable nid n = Tip nid (bucketCount (pred n)) (Bucket PSQ.empty (runIdentity $ emptyQueue bucketQ)) |
339 | where | 396 | where |
340 | bucketCount x = max 0 (min 159 x) | 397 | bucketCount x = max 0 (min 159 x) |
341 | 398 | ||
342 | -- | Test if table is empty. In this case DHT should start | 399 | -- | Test if table is empty. In this case DHT should start |
343 | -- bootstrapping process until table becomes 'full'. | 400 | -- bootstrapping process until table becomes 'full'. |
344 | null :: Table ip -> Bool | 401 | null :: Table ip -> Bool |
345 | null (Tip _ _ b) = PSQ.null b | 402 | null (Tip _ _ b) = PSQ.null $ bktNodes b |
346 | null _ = False | 403 | null _ = False |
347 | 404 | ||
348 | -- | Test if table have maximum number of nodes. No more nodes can be | 405 | -- | Test if table have maximum number of nodes. No more nodes can be |
349 | -- 'insert'ed, except old ones becomes bad. | 406 | -- 'insert'ed, except old ones becomes bad. |
350 | full :: Table ip -> Bool | 407 | full :: Table ip -> Bool |
351 | full (Tip _ n _) = n == 0 | 408 | full (Tip _ n _) = n == 0 |
352 | full (Zero t b) = PSQ.size b == defaultBucketSize && full t | 409 | full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t |
353 | full (One b t) = PSQ.size b == defaultBucketSize && full t | 410 | full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t |
354 | 411 | ||
355 | -- | Get the /spine/ node id. | 412 | -- | Get the /spine/ node id. |
356 | thisId :: Table ip -> NodeId | 413 | thisId :: Table ip -> NodeId |
@@ -364,7 +421,7 @@ type NodeCount = Int | |||
364 | -- | Internally, routing table is similar to list of buckets or a | 421 | -- | Internally, routing table is similar to list of buckets or a |
365 | -- /matrix/ of nodes. This function returns the shape of the matrix. | 422 | -- /matrix/ of nodes. This function returns the shape of the matrix. |
366 | shape :: Table ip -> [BucketSize] | 423 | shape :: Table ip -> [BucketSize] |
367 | shape = map PSQ.size . toBucketList | 424 | shape = map (PSQ.size . bktNodes) . toBucketList |
368 | 425 | ||
369 | -- | Get number of nodes in the table. | 426 | -- | Get number of nodes in the table. |
370 | size :: Table ip -> NodeCount | 427 | size :: Table ip -> NodeCount |
@@ -409,6 +466,7 @@ kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] | |||
409 | kclosest k (toNodeId -> nid) | 466 | kclosest k (toNodeId -> nid) |
410 | = L.take k . rank nid | 467 | = L.take k . rank nid |
411 | . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty | 468 | . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty |
469 | . fmap bktNodes | ||
412 | . lookupBucket nid | 470 | . lookupBucket nid |
413 | 471 | ||
414 | {----------------------------------------------------------------------- | 472 | {----------------------------------------------------------------------- |
@@ -427,20 +485,41 @@ splitTip nid n i bucket | |||
427 | -- TODO: Kademlia non-empty subtrees should should split if they have less than | 485 | -- TODO: Kademlia non-empty subtrees should should split if they have less than |
428 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia | 486 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia |
429 | -- paper. The rule requiring additional splits is in section 2.4. | 487 | -- paper. The rule requiring additional splits is in section 2.4. |
430 | insert :: Eq ip => NodeInfo ip -> Table ip -> ip `Routing` Table ip | 488 | modifyBucket |
431 | insert info @ NodeInfo {..} = go (0 :: BitIx) | 489 | :: forall f ip xs. (Alternative f, Eq ip, Monoid xs) => |
490 | NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip) | ||
491 | modifyBucket nodeId f = go (0 :: BitIx) | ||
432 | where | 492 | where |
493 | go :: BitIx -> Table ip -> f (xs, Table ip) | ||
433 | go i (Zero table bucket) | 494 | go i (Zero table bucket) |
434 | | testIdBit nodeId i = Zero table <$> insertNode info bucket | 495 | | testIdBit nodeId i = second (Zero table) <$> f bucket |
435 | | otherwise = (`Zero` bucket) <$> go (succ i) table | 496 | | otherwise = second (`Zero` bucket) <$> go (succ i) table |
436 | go i (One bucket table ) | 497 | go i (One bucket table ) |
437 | | testIdBit nodeId i = One bucket <$> go (succ i) table | 498 | | testIdBit nodeId i = second (One bucket) <$> go (succ i) table |
438 | | otherwise = (`One` table) <$> insertNode info bucket | 499 | | otherwise = second (`One` table) <$> f bucket |
439 | go i (Tip nid n bucket) | 500 | go i (Tip nid n bucket) |
440 | | n == 0 = Tip nid n <$> insertNode info bucket | 501 | | n == 0 = second (Tip nid n) <$> f bucket |
441 | | otherwise = Tip nid n <$> insertNode info bucket | 502 | | otherwise = second (Tip nid n) <$> f bucket |
442 | <|> go i (splitTip nid n i bucket) | 503 | <|> go i (splitTip nid n i bucket) |
443 | 504 | ||
505 | -- | Triggering event for atomic table update | ||
506 | data Event ip = TryInsert (NodeInfo ip) | ||
507 | | PingResult (NodeInfo ip) Bool | ||
508 | deriving (Eq,Ord,Show) | ||
509 | |||
510 | eventId (TryInsert NodeInfo{..}) = nodeId | ||
511 | eventId (PingResult NodeInfo{..} _) = nodeId | ||
512 | |||
513 | -- | Actions requested by atomic table update | ||
514 | data CheckPing ip = CheckPing [NodeInfo ip] | ||
515 | deriving (Eq,Ord,Show) | ||
516 | |||
517 | |||
518 | -- | Atomic 'Table' update | ||
519 | insert :: (Alternative m, Eq ip) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) | ||
520 | insert tm event = modifyBucket (eventId event) (insertBucket tm event) | ||
521 | |||
522 | |||
444 | {----------------------------------------------------------------------- | 523 | {----------------------------------------------------------------------- |
445 | -- Conversion | 524 | -- Conversion |
446 | -----------------------------------------------------------------------} | 525 | -----------------------------------------------------------------------} |
@@ -457,11 +536,4 @@ toBucketList (Zero t b) = b : toBucketList t | |||
457 | toBucketList (One b t) = b : toBucketList t | 536 | toBucketList (One b t) = b : toBucketList t |
458 | 537 | ||
459 | toList :: Eq ip => Table ip -> [[TableEntry ip]] | 538 | toList :: Eq ip => Table ip -> [[TableEntry ip]] |
460 | toList = L.map (L.map tableEntry . PSQ.toList) . toBucketList | 539 | toList = L.map (L.map tableEntry . PSQ.toList . bktNodes) . toBucketList |
461 | |||
462 | merge :: Eq ip => Table ip -> Table ip -> Routing ip (Table ip) | ||
463 | merge a b = do | ||
464 | let ns = concatMap PSQ.toList $ toBucketList a | ||
465 | -- TODO: merge timestamps as well and let refresh take care of ping. | ||
466 | as <- filterM (needPing . nodeAddr . PSQ.key) ns | ||
467 | foldM (flip $ Network.BitTorrent.DHT.Routing.insert) b $ map PSQ.key as | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 2bb3ce85..5a8d64ef 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -243,7 +243,7 @@ data Node ip = Node | |||
243 | 243 | ||
244 | , resources :: !InternalState | 244 | , resources :: !InternalState |
245 | , manager :: !(Manager (DHT ip)) -- ^ RPC manager; | 245 | , manager :: !(Manager (DHT ip)) -- ^ RPC manager; |
246 | , routingTable :: !(MVar (Table ip)) -- ^ search table; | 246 | , routingTable :: !(TVar (Table ip)) -- ^ search table; |
247 | , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; | 247 | , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; |
248 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; | 248 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; |
249 | , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs. | 249 | , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs. |
@@ -323,7 +323,7 @@ newNode hs opts naddr logger mbid = do | |||
323 | liftIO $ do | 323 | liftIO $ do |
324 | myId <- maybe genNodeId return mbid | 324 | myId <- maybe genNodeId return mbid |
325 | node <- Node opts myId s m | 325 | node <- Node opts myId s m |
326 | <$> newMVar (nullTable myId (optBucketCount opts)) | 326 | <$> atomically (newTVar (nullTable myId (optBucketCount opts))) |
327 | <*> newTVarIO def | 327 | <*> newTVarIO def |
328 | <*> newTVarIO S.empty | 328 | <*> newTVarIO S.empty |
329 | <*> (newTVarIO =<< nullSessionTokens) | 329 | <*> (newTVarIO =<< nullSessionTokens) |
@@ -381,7 +381,7 @@ checkToken addr questionableToken = do | |||
381 | getTable :: DHT ip (Table ip) | 381 | getTable :: DHT ip (Table ip) |
382 | getTable = do | 382 | getTable = do |
383 | var <- asks routingTable | 383 | var <- asks routingTable |
384 | liftIO (readMVar var) | 384 | liftIO (atomically $ readTVar var) |
385 | 385 | ||
386 | -- | Find a set of closest nodes from routing table of this node. (in | 386 | -- | Find a set of closest nodes from routing table of this node. (in |
387 | -- no particular order) | 387 | -- no particular order) |