From 5e2f43d967aa2d07368b7d5552f65a69b3979ab5 Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 5 Jan 2017 12:18:43 -0500 Subject: Routing Table : use STM and per-bucket ping queues --- src/Network/BitTorrent/DHT/Query.hs | 54 ++++++---- src/Network/BitTorrent/DHT/Routing.hs | 182 ++++++++++++++++++++++++---------- src/Network/BitTorrent/DHT/Session.hs | 6 +- 3 files changed, 163 insertions(+), 79 deletions(-) (limited to 'src/Network/BitTorrent/DHT') diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 7f20ad6d..99d8cdaf 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -73,6 +73,7 @@ import Network.BitTorrent.Address import Network.BitTorrent.DHT.Message import Network.BitTorrent.DHT.Routing as R import Network.BitTorrent.DHT.Session +import Control.Concurrent.STM {----------------------------------------------------------------------- -- Handlers @@ -104,7 +105,10 @@ findNodeH = nodeHandler $ \ _ (FindNode nid) -> do -- | Default 'GetPeers' handler. getPeersH :: Address ip => NodeHandler ip getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do - GotPeers <$> getPeerList ih <*> grantToken naddr + ps <- getPeerList ih + tok <- grantToken naddr + $(logDebugS) "getPeersH" $ "INFO-HASH " <> T.pack (show (ih,fmap fromAddr naddr :: NodeAddr (Maybe IP))) + return $ GotPeers ps tok -- | Default 'Announce' handler. announceH :: Address ip => NodeHandler ip @@ -236,20 +240,33 @@ refreshNodes nid = do insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId insertNode info = fork $ do var <- asks routingTable - t <- takeMVar var - t' <- do -- modifyMVar_ var $ \ t -> do - result <- routing (R.insert info t) - case result of - Nothing -> do - $(logDebugS) "insertNode" $ "Routing table is full: " - <> T.pack (show (pPrint t)) - return t - Just t' -> do - let logMsg = "Routing table updated: " - <> pPrint t <> " -> " <> pPrint t' - $(logDebugS) "insertNode" (T.pack (render logMsg)) - return t' - putMVar var t' + tm <- getTimestamp + let showTable = do + t <- liftIO $ atomically $ readTVar var + let logMsg = "Routing table: " <> pPrint t + $(logDebugS) "insertNode" (T.pack (render logMsg)) + t <- liftIO $ atomically $ readTVar var + let arrival = TryInsert info + arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) + $(logDebugS) "insertNode" $ T.pack (show arrival4) + ps <- liftIO $ atomically $ do + t <- readTVar var + (ps,t') <- R.insert tm arrival t + writeTVar var t' + return ps + showTable + fork $ forM_ ps $ \(CheckPing ns)-> do + forM_ ns $ \n -> do + alive <- PingResult n <$> probeNode (nodeAddr n) + let PingResult _ b = alive + $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) + tm <- getTimestamp + liftIO $ atomically $ do + t <- readTVar var + (_,t') <- R.insert tm alive t + writeTVar var t' + showTable + return () -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) @@ -268,9 +285,4 @@ q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-} restoreTable :: Address ip => Table ip -> DHT ip () -restoreTable tbl = do - tblvar <- asks routingTable - tbl0 <- liftIO $ takeMVar tblvar - mb <- routing $ merge tbl tbl0 - maybe (return ()) (liftIO . putMVar tblvar) mb - +restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 68edef56..14aec612 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs @@ -17,6 +17,7 @@ {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module Network.BitTorrent.DHT.Routing ( -- * Table @@ -45,8 +46,9 @@ module Network.BitTorrent.DHT.Routing -- * Construction , Network.BitTorrent.DHT.Routing.nullTable + , Event(..) + , CheckPing(..) , Network.BitTorrent.DHT.Routing.insert - , Network.BitTorrent.DHT.Routing.merge -- * Conversion , Network.BitTorrent.DHT.Routing.TableEntry @@ -62,17 +64,19 @@ import Control.Applicative as A import Control.Arrow import Control.Monad import Data.Function +import Data.Functor.Identity import Data.List as L hiding (insert) import Data.Maybe import Data.Monoid import Data.PSQueue as PSQ import Data.Serialize as S hiding (Result, Done) +import qualified Data.Sequence as Seq import Data.Time import Data.Time.Clock.POSIX import Data.Word import GHC.Generics import Text.PrettyPrint as PP hiding ((<>)) -import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) +import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) import Data.Torrent import Network.BitTorrent.Address @@ -199,6 +203,37 @@ type BucketSize = Int defaultBucketSize :: BucketSize defaultBucketSize = 8 +data QueueMethods m elem fifo = QueueMethods + { pushBack :: elem -> fifo -> m fifo + , popFront :: fifo -> m (Maybe elem, fifo) + , emptyQueue :: m fifo + } + +fromQ :: Functor m => + ( a -> b ) + -> ( b -> a ) + -> QueueMethods m elem a + -> QueueMethods m elem b +fromQ embed project QueueMethods{..} = + QueueMethods { pushBack = \e -> fmap embed . pushBack e . project + , popFront = fmap (second embed) . popFront . project + , emptyQueue = fmap embed emptyQueue + } + +seqQ :: QueueMethods Identity (NodeInfo ip) (Seq.Seq (NodeInfo ip)) +seqQ = QueueMethods + { pushBack = \e fifo -> pure (fifo Seq.|> e) + , popFront = \fifo -> case Seq.viewl fifo of + e Seq.:< fifo' -> pure (Just e, fifo') + Seq.EmptyL -> pure (Nothing, Seq.empty) + , emptyQueue = pure Seq.empty + } + +type BucketQueue ip = Seq.Seq (NodeInfo ip) + +bucketQ :: QueueMethods Identity (NodeInfo ip) (BucketQueue ip) +bucketQ = seqQ + -- | Bucket is also limited in its length — thus it's called k-bucket. -- When bucket becomes full, we should split it in two lists by -- current span bit. Span bit is defined by depth in the routing @@ -206,7 +241,11 @@ defaultBucketSize = 8 -- very unlikely that all nodes in bucket fail within an hour of -- each other. -- -type Bucket ip = PSQ (NodeInfo ip) Timestamp +data Bucket ip = Bucket { bktNodes :: PSQ (NodeInfo ip) Timestamp + , bktQ :: BucketQueue ip + } deriving (Show,Generic) + +instance (Eq ip, Serialize ip) => Serialize (Bucket ip) instance (Serialize k, Serialize v, Ord k, Ord v) => Serialize (PSQ k v) where @@ -219,59 +258,77 @@ lastChanged bucket | L.null timestamps = Nothing | otherwise = Just (L.maximumBy (compare `on` prio) timestamps) where - timestamps = PSQ.toList bucket + timestamps = PSQ.toList $ bktNodes bucket leastRecently :: Eq ip => Bucket ip -> Maybe (NodeEntry ip, Bucket ip) -leastRecently = minView +leastRecently b = fmap (\(e,ns) -> (e, b { bktNodes = ns })) $ minView $ bktNodes b -- | Update interval, in seconds. delta :: NominalDiffTime delta = 15 * 60 -- | Should maintain a set of stable long running nodes. -insertBucket :: Eq ip => Timestamp -> NodeInfo ip -> Bucket ip - -> ip `Routing` Bucket ip -insertBucket curTime info bucket +-- +-- Note: pings are triggerd only when a bucket is full. +insertBucket :: (Eq ip, Alternative f) => Timestamp -> Event ip -> Bucket ip + -> f ([CheckPing ip], Bucket ip) +insertBucket curTime (TryInsert info) bucket -- just update timestamp if a node is already in bucket - | Just _ <- PSQ.lookup info bucket = do - return $ PSQ.insertWith max info curTime bucket - - -- Buckets that have not been changed in 15 minutes should be "refreshed." - | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket - , curTime - lastSeen > delta = do - refresh nodeId - insertBucket curTime info bucket - + | already_have + = pure ( [], map_ns $ PSQ.insertWith max info curTime ) + -- bucket is good, but not full => we can insert a new node + | PSQ.size (bktNodes bucket) < defaultBucketSize + = pure ( [], map_ns $ PSQ.insert info curTime ) -- If there are any questionable nodes in the bucket have not been -- seen in the last 15 minutes, the least recently seen node is -- pinged. If any nodes in the bucket are known to have become bad, -- then one is replaced by the new node in the next insertBucket -- iteration. - | Just ((old @ NodeInfo {..} :-> leastSeen), rest) <- leastRecently bucket - , curTime - leastSeen > delta = do - pong <- needPing nodeAddr - pongTime <- getTime - let newBucket = if pong then PSQ.insert old pongTime bucket else rest - insertBucket pongTime info newBucket + | not (L.null stales) + = pure ( [CheckPing stales], map_q $ pushBack bucketQ info ) + -- When the bucket is full of good nodes, the new node is simply discarded. + -- We must return 'A.empty' here to ensure that bucket splitting happens + -- inside 'modifyBucket'. + | otherwise = A.empty + where + stales = map key $ PSQ.atMost (curTime - delta) $ bktNodes bucket - -- bucket is good, but not full => we can insert a new node - | PSQ.size bucket < defaultBucketSize = do - return $ PSQ.insert info curTime bucket + already_have = maybe False (const True) $ PSQ.lookup info (bktNodes bucket) - -- When the bucket is full of good nodes, the new node is simply discarded. - | otherwise = Full + map_ns f = bucket { bktNodes = f (bktNodes bucket) } + map_q f = bucket { bktQ = runIdentity $ f (bktQ bucket) } -insertNode :: Eq ip => NodeInfo ip -> Bucket ip -> ip `Routing` Bucket ip -insertNode info bucket = do - curTime <- getTime - insertBucket curTime info bucket +insertBucket curTime (PingResult bad_node got_response) bucket + = pure ([], Bucket (update $ bktNodes bucket) popped) + where + (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) + update | got_response = id + | Just info <- top = PSQ.insert info curTime . PSQ.delete bad_node + | otherwise = id type BitIx = Word +partitionQ imp pred q = do + pass <- emptyQueue imp + fail <- emptyQueue imp + let flipfix a b f = fix f a b + flipfix q (pass,fail) $ \loop q qs -> do + (mb,q') <- popFront imp q + case mb of + Nothing -> return qs + Just e -> do qs' <- select (pushBack imp e) qs + loop q' qs' + where + select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) + select f = if pred e then \(a,b) -> flip (,) b <$> f a + else \(a,b) -> (,) a <$> f b + split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip) -split i = (PSQ.fromList *** PSQ.fromList) . partition spanBit . PSQ.toList +split i b = (Bucket ns qs, Bucket ms rs) where - spanBit entry = testIdBit (nodeId (key entry)) i + (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . key) . PSQ.toList $ bktNodes b + (qs,rs) = runIdentity $ partitionQ bucketQ spanBit $ bktQ b + spanBit entry = testIdBit (nodeId entry) i {----------------------------------------------------------------------- -- Table @@ -335,22 +392,22 @@ instance Pretty (Table ip) where -- | Empty table with specified /spine/ node id. nullTable :: Eq ip => NodeId -> BucketCount -> Table ip -nullTable nid n = Tip nid (bucketCount (pred n)) PSQ.empty +nullTable nid n = Tip nid (bucketCount (pred n)) (Bucket PSQ.empty (runIdentity $ emptyQueue bucketQ)) where bucketCount x = max 0 (min 159 x) -- | Test if table is empty. In this case DHT should start -- bootstrapping process until table becomes 'full'. null :: Table ip -> Bool -null (Tip _ _ b) = PSQ.null b +null (Tip _ _ b) = PSQ.null $ bktNodes b null _ = False -- | Test if table have maximum number of nodes. No more nodes can be -- 'insert'ed, except old ones becomes bad. full :: Table ip -> Bool full (Tip _ n _) = n == 0 -full (Zero t b) = PSQ.size b == defaultBucketSize && full t -full (One b t) = PSQ.size b == defaultBucketSize && full t +full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t +full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t -- | Get the /spine/ node id. thisId :: Table ip -> NodeId @@ -364,7 +421,7 @@ type NodeCount = Int -- | Internally, routing table is similar to list of buckets or a -- /matrix/ of nodes. This function returns the shape of the matrix. shape :: Table ip -> [BucketSize] -shape = map PSQ.size . toBucketList +shape = map (PSQ.size . bktNodes) . toBucketList -- | Get number of nodes in the table. size :: Table ip -> NodeCount @@ -409,6 +466,7 @@ kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] kclosest k (toNodeId -> nid) = L.take k . rank nid . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty + . fmap bktNodes . lookupBucket nid {----------------------------------------------------------------------- @@ -427,20 +485,41 @@ splitTip nid n i bucket -- TODO: Kademlia non-empty subtrees should should split if they have less than -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia -- paper. The rule requiring additional splits is in section 2.4. -insert :: Eq ip => NodeInfo ip -> Table ip -> ip `Routing` Table ip -insert info @ NodeInfo {..} = go (0 :: BitIx) +modifyBucket + :: forall f ip xs. (Alternative f, Eq ip, Monoid xs) => + NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip) +modifyBucket nodeId f = go (0 :: BitIx) where + go :: BitIx -> Table ip -> f (xs, Table ip) go i (Zero table bucket) - | testIdBit nodeId i = Zero table <$> insertNode info bucket - | otherwise = (`Zero` bucket) <$> go (succ i) table + | testIdBit nodeId i = second (Zero table) <$> f bucket + | otherwise = second (`Zero` bucket) <$> go (succ i) table go i (One bucket table ) - | testIdBit nodeId i = One bucket <$> go (succ i) table - | otherwise = (`One` table) <$> insertNode info bucket + | testIdBit nodeId i = second (One bucket) <$> go (succ i) table + | otherwise = second (`One` table) <$> f bucket go i (Tip nid n bucket) - | n == 0 = Tip nid n <$> insertNode info bucket - | otherwise = Tip nid n <$> insertNode info bucket + | n == 0 = second (Tip nid n) <$> f bucket + | otherwise = second (Tip nid n) <$> f bucket <|> go i (splitTip nid n i bucket) +-- | Triggering event for atomic table update +data Event ip = TryInsert (NodeInfo ip) + | PingResult (NodeInfo ip) Bool + deriving (Eq,Ord,Show) + +eventId (TryInsert NodeInfo{..}) = nodeId +eventId (PingResult NodeInfo{..} _) = nodeId + +-- | Actions requested by atomic table update +data CheckPing ip = CheckPing [NodeInfo ip] + deriving (Eq,Ord,Show) + + +-- | Atomic 'Table' update +insert :: (Alternative m, Eq ip) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) +insert tm event = modifyBucket (eventId event) (insertBucket tm event) + + {----------------------------------------------------------------------- -- Conversion -----------------------------------------------------------------------} @@ -457,11 +536,4 @@ toBucketList (Zero t b) = b : toBucketList t toBucketList (One b t) = b : toBucketList t toList :: Eq ip => Table ip -> [[TableEntry ip]] -toList = L.map (L.map tableEntry . PSQ.toList) . toBucketList - -merge :: Eq ip => Table ip -> Table ip -> Routing ip (Table ip) -merge a b = do - let ns = concatMap PSQ.toList $ toBucketList a - -- TODO: merge timestamps as well and let refresh take care of ping. - as <- filterM (needPing . nodeAddr . PSQ.key) ns - foldM (flip $ Network.BitTorrent.DHT.Routing.insert) b $ map PSQ.key as +toList = L.map (L.map tableEntry . PSQ.toList . bktNodes) . toBucketList diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 2bb3ce85..5a8d64ef 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -243,7 +243,7 @@ data Node ip = Node , resources :: !InternalState , manager :: !(Manager (DHT ip)) -- ^ RPC manager; - , routingTable :: !(MVar (Table ip)) -- ^ search table; + , routingTable :: !(TVar (Table ip)) -- ^ search table; , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs. @@ -323,7 +323,7 @@ newNode hs opts naddr logger mbid = do liftIO $ do myId <- maybe genNodeId return mbid node <- Node opts myId s m - <$> newMVar (nullTable myId (optBucketCount opts)) + <$> atomically (newTVar (nullTable myId (optBucketCount opts))) <*> newTVarIO def <*> newTVarIO S.empty <*> (newTVarIO =<< nullSessionTokens) @@ -381,7 +381,7 @@ checkToken addr questionableToken = do getTable :: DHT ip (Table ip) getTable = do var <- asks routingTable - liftIO (readMVar var) + liftIO (atomically $ readTVar var) -- | Find a set of closest nodes from routing table of this node. (in -- no particular order) -- cgit v1.2.3