summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Network/DHT/Routing.hs90
1 files changed, 48 insertions, 42 deletions
diff --git a/src/Network/DHT/Routing.hs b/src/Network/DHT/Routing.hs
index 631761ac..2521f7b9 100644
--- a/src/Network/DHT/Routing.hs
+++ b/src/Network/DHT/Routing.hs
@@ -227,9 +227,7 @@ delta = 15 * 60
227-- | Should maintain a set of stable long running nodes. 227-- | Should maintain a set of stable long running nodes.
228-- 228--
229-- Note: pings are triggerd only when a bucket is full. 229-- Note: pings are triggerd only when a bucket is full.
230insertBucket :: (Alternative f, Ord ni) => -- (Eq ip, Alternative f, Ord (NodeId)) => 230updateBucketForInbound curTime info bucket
231 Timestamp -> Event ni -> Bucket ni -> f ([CheckPing ni], Bucket ni)
232insertBucket curTime (TryInsert info) bucket
233 -- Just update timestamp if a node is already in bucket. 231 -- Just update timestamp if a node is already in bucket.
234 -- 232 --
235 -- Note PingResult events should only occur for nodes we requested a ping for, 233 -- Note PingResult events should only occur for nodes we requested a ping for,
@@ -247,7 +245,7 @@ insertBucket curTime (TryInsert info) bucket
247 -- then one is replaced by the new node in the next insertBucket 245 -- then one is replaced by the new node in the next insertBucket
248 -- iteration. 246 -- iteration.
249 | not (L.null stales) 247 | not (L.null stales)
250 = pure ( [CheckPing stales] 248 = pure ( stales
251 , bucket { -- Update timestamps so that we don't redundantly ping. 249 , bucket { -- Update timestamps so that we don't redundantly ping.
252 bktNodes = updateStamps curTime stales $ bktNodes bucket 250 bktNodes = updateStamps curTime stales $ bktNodes bucket
253 -- Update queue with the pending NodeInfo in case of ping fail. 251 -- Update queue with the pending NodeInfo in case of ping fail.
@@ -272,15 +270,24 @@ insertBucket curTime (TryInsert info) bucket
272 map_ns f = bucket { bktNodes = f (bktNodes bucket) } 270 map_ns f = bucket { bktNodes = f (bktNodes bucket) }
273 -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } 271 -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) }
274 272
275insertBucket curTime (PingResult bad_node got_response) bucket 273updateBucketForPingResult curTime bad_node got_response bucket
276 = pure ([], Bucket (upd $ bktNodes bucket) popped) 274 = pure ( replacements
275 , Bucket (foldr replace
276 (bktNodes bucket)
277 replacements)
278 popped
279 )
277 where 280 where
278 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) 281 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket)
279 upd | got_response = id -- Timestamp was already updated by TryInsert. 282 replacements | got_response = [] -- Timestamp was already updated by TryInsert.
280 | Just info <- top = \nodes -> 283 | Just info <- top = do
281 fromMaybe nodes $ do 284 -- Insert only if there's a removal.
282 _ <- PSQ.lookup bad_node nodes -- Insert only if there's a removal. 285 _ <- maybeToList $ PSQ.lookup bad_node (bktNodes bucket)
283 let nodes' = PSQ.delete bad_node nodes 286 return (bad_node, info)
287 | otherwise = []
288
289 replace (bad_node, info) =
290 PSQ.insert info curTime
284 -- XXX: curTime is the time somebody pinged out, not the time 291 -- XXX: curTime is the time somebody pinged out, not the time
285 -- of which the popped node was originally queued. That's not 292 -- of which the popped node was originally queued. That's not
286 -- quite right. 293 -- quite right.
@@ -290,8 +297,7 @@ insertBucket curTime (PingResult bad_node got_response) bucket
290 -- 297 --
291 -- This means that initial time-outs for replacement nodes are up to 298 -- This means that initial time-outs for replacement nodes are up to
292 -- twice as long as for the originals. 299 -- twice as long as for the originals.
293 pure $ PSQ.insert info curTime nodes' 300 . PSQ.delete bad_node
294 | otherwise = id
295 301
296 302
297updateStamps :: Ord ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp 303updateStamps :: Ord ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp
@@ -627,48 +633,48 @@ deriving instance ( Show ip
627 633
628#endif 634#endif
629 635
630-- | Atomic 'Table' update 636
637-- | Call on every inbound packet (including requested ping results).
638-- Returns a triple (was_inserted, to_ping, tbl') where
631-- 639--
632-- Deprecated, use these two functions instead: 640-- [ /was_inserted/ ] True if the node was added to the routing table.
633-- 641--
634-- [ 'updateForInbound' ] 642-- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'.
643-- This will be empty if /was_inserted/, but a non-inserted node
644-- may be added to a replacement queue and will be inserted if
645-- one of the items in this list time out.
635-- 646--
636-- [ 'updateForPingResult' ] 647-- [ /tbl'/ ] The updated routing 'Table'.
637-- 648--
638insert :: -- ( Eq ip , Applicative m , Ord (NodeId) , FiniteBits (NodeId)) =>
639 (Applicative m, Ord ni) =>
640 (nid -> Word -> Bool)
641 -> (ni -> nid)
642 -> Timestamp -> Event ni -> Table ni nid -> m ([CheckPing ni], Table ni nid)
643insert testIdBit nodeId tm event tbl
644 = pure $ fromMaybe ([],tbl)
645 $ modifyBucket testIdBit (\ni -> testIdBit $ nodeId ni)
646 (eventId nodeId event)
647 (insertBucket tm event)
648 tbl
649
650
651-- | Call on every inbound packet (including requested ping results). Returns
652-- a list of nodes to ping and an updated routing table. The caller should
653-- also invoke 'updateForPingResult' on each node in the to-ping list to update
654-- the routing table with that information as it becomes available.
655updateForInbound :: Ord ni => 649updateForInbound :: Ord ni =>
656 (nid -> Word -> Bool) 650 (nid -> Word -> Bool)
657 -> (ni -> nid) 651 -> (ni -> nid)
658 -> Timestamp -> ni -> Table ni nid -> ([ni], Table ni nid) 652 -> Timestamp -> ni -> Table ni nid -> (Bool, [ni], Table ni nid)
659updateForInbound testIdBit nodeId tm ni tbl = 653updateForInbound testIdBit nodeId tm ni tbl =
660 let Identity (ps, tbl') = Network.DHT.Routing.insert testIdBit nodeId tm (TryInsert ni) tbl 654 maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl'))
661 decon (CheckPing ns) = ns 655 $ modifyBucket testIdBit (\ni -> testIdBit $ nodeId ni)
662 in (concatMap decon ps, tbl') 656 (nodeId ni)
657 (updateBucketForInbound tm ni)
658 tbl
663 659
660-- | Update the routing table with the results of a ping.
661--
662-- Each (a,b) in the returned list indicates that the node /a/ was deleted from the
663-- routing table and the node /b/ has taken its place.
664updateForPingResult :: Ord ni => 664updateForPingResult :: Ord ni =>
665 (nid -> Word -> Bool) 665 (nid -> Word -> Bool)
666 -> (ni -> nid) 666 -> (ni -> nid)
667 -> Timestamp -> ni -> Bool -> Table ni nid -> Table ni nid 667 -> Timestamp
668 -> ni -- ^ The pinged node.
669 -> Bool -- ^ True if we got a reply, False if it timed out.
670 -> Table ni nid -- ^ The routing table.
671 -> ( [(ni,ni)], Table ni nid )
668updateForPingResult testIdBit nodeId tm ni got_reply tbl = 672updateForPingResult testIdBit nodeId tm ni got_reply tbl =
669 -- The _ should always be empty in this case. 673 fromMaybe ([],tbl)
670 let Identity (_, tbl') = Network.DHT.Routing.insert testIdBit nodeId tm (PingResult ni got_reply) tbl 674 $ modifyBucket testIdBit (\ni -> testIdBit $ nodeId ni)
671 in tbl' 675 (nodeId ni)
676 (updateBucketForPingResult tm ni got_reply)
677 tbl
672 678
673 679
674{----------------------------------------------------------------------- 680{-----------------------------------------------------------------------