From 13b2eb08cb4651a913849d96f516ed97bad53003 Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 9 Jun 2017 01:10:19 -0400 Subject: Rename Network.BitTorrent.DHT.Routing -> Network.DHT.Routing --- src/Network/DHT/Routing.hs | 595 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 src/Network/DHT/Routing.hs (limited to 'src/Network/DHT') diff --git a/src/Network/DHT/Routing.hs b/src/Network/DHT/Routing.hs new file mode 100644 index 00000000..c0a431fa --- /dev/null +++ b/src/Network/DHT/Routing.hs @@ -0,0 +1,595 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Every node maintains a routing table of known good nodes. The +-- nodes in the routing table are used as starting points for +-- queries in the DHT. Nodes from the routing table are returned in +-- response to queries from other nodes. +-- +-- For more info see: +-- +-- +{-# LANGUAGE CPP #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} +module Network.DHT.Routing + ( -- * Table + Table + , Info(..) + + -- * Attributes + , BucketCount + , defaultBucketCount + , BucketSize + , defaultBucketSize + , NodeCount + + -- * Query + , Network.DHT.Routing.null + , Network.DHT.Routing.full + , thisId + , shape + , Network.DHT.Routing.size + , Network.DHT.Routing.depth + , compatibleNodeId + + -- * Lookup + , K + , defaultK + , TableKey (..) + , kclosest + + -- * Construction + , Network.DHT.Routing.nullTable + , Event(..) + , CheckPing(..) + , Network.DHT.Routing.insert + + -- * Conversion + , Network.DHT.Routing.TableEntry + , Network.DHT.Routing.toList + + -- * Routing + , Timestamp + ) where + +import Control.Applicative as A +import Control.Arrow +import Control.Monad +import Data.Function +import Data.Functor.Identity +import Data.List as L hiding (insert) +import Data.Maybe +import Data.Monoid +import Data.Wrapper.PSQ as PSQ +import Data.Serialize as S hiding (Result, Done) +import qualified Data.Sequence as Seq +import Data.Time +import Data.Time.Clock.POSIX +import Data.Word +import GHC.Generics +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) +import qualified Data.ByteString as BS +import Data.Bits + +import Network.BitTorrent.Address + + +{----------------------------------------------------------------------- +-- Routing monad +-----------------------------------------------------------------------} + +-- | Last time the node was responding to our queries. +-- +-- Not all nodes that we learn about are equal. Some are \"good\" and +-- some are not. Many nodes using the DHT are able to send queries +-- and receive responses, but are not able to respond to queries +-- from other nodes. It is important that each node's routing table +-- must contain only known good nodes. A good node is a node has +-- responded to one of our queries within the last 15 minutes. A +-- node is also good if it has ever responded to one of our queries +-- and has sent us a query within the last 15 minutes. After 15 +-- minutes of inactivity, a node becomes questionable. Nodes become +-- bad when they fail to respond to multiple queries in a row. Nodes +-- that we know are good are given priority over nodes with unknown +-- status. +-- +type Timestamp = POSIXTime + +{----------------------------------------------------------------------- + Bucket +-----------------------------------------------------------------------} +-- TODO: add replacement cache to the bucket +-- +-- When a k-bucket is full and a new node is discovered for that +-- k-bucket, the least recently seen node in the k-bucket is +-- PINGed. If the node is found to be still alive, the new node is +-- place in a secondary list, a replacement cache. The replacement +-- cache is used only if a node in the k-bucket stops responding. In +-- other words: new nodes are used only when older nodes disappear. + +-- | Timestamp - last time this node is pinged. +type NodeEntry dht ip u = Binding (NodeInfo dht ip u) Timestamp + +-- TODO instance Pretty where + +-- | Number of nodes in a bucket. +type BucketSize = Int + +-- | Maximum number of 'NodeInfo's stored in a bucket. Most clients +-- use this value. +defaultBucketSize :: BucketSize +defaultBucketSize = 8 + +data QueueMethods m elem fifo = QueueMethods + { pushBack :: elem -> fifo -> m fifo + , popFront :: fifo -> m (Maybe elem, fifo) + , emptyQueue :: m fifo + } + +{- +fromQ :: Functor m => + ( a -> b ) + -> ( b -> a ) + -> QueueMethods m elem a + -> QueueMethods m elem b +fromQ embed project QueueMethods{..} = + QueueMethods { pushBack = \e -> fmap embed . pushBack e . project + , popFront = fmap (second embed) . popFront . project + , emptyQueue = fmap embed emptyQueue + } +-} + +seqQ :: QueueMethods Identity (NodeInfo dht ip u) (Seq.Seq (NodeInfo dht ip u)) +seqQ = QueueMethods + { pushBack = \e fifo -> pure (fifo Seq.|> e) + , popFront = \fifo -> case Seq.viewl fifo of + e Seq.:< fifo' -> pure (Just e, fifo') + Seq.EmptyL -> pure (Nothing, Seq.empty) + , emptyQueue = pure Seq.empty + } + +type BucketQueue dht ip u = Seq.Seq (NodeInfo dht ip u) + +bucketQ :: QueueMethods Identity (NodeInfo dht ip u) (BucketQueue dht ip u) +bucketQ = seqQ + +-- | Bucket is also limited in its length — thus it's called k-bucket. +-- When bucket becomes full, we should split it in two lists by +-- current span bit. Span bit is defined by depth in the routing +-- table tree. Size of the bucket should be choosen such that it's +-- very unlikely that all nodes in bucket fail within an hour of +-- each other. +-- +data Bucket dht ip u = Bucket { bktNodes :: !(PSQ (NodeInfo dht ip u) Timestamp) + , bktQ :: !(BucketQueue dht ip u) + } deriving Generic + +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Bucket dht ip u) + + +getGenericNode :: ( Serialize (NodeId dht) + , Serialize ip + , Serialize u + ) => Get (NodeInfo dht ip u) +getGenericNode = do + nid <- get + naddr <- get + u <- get + return NodeInfo + { nodeId = nid + , nodeAddr = naddr + , nodeAnnotation = u + } + +putGenericNode :: ( Serialize (NodeId dht) + , Serialize ip + , Serialize u + ) => NodeInfo dht ip u -> Put +putGenericNode (NodeInfo nid naddr u) = do + put nid + put naddr + put u + +instance (Eq ip, Ord (NodeId dht), Serialize (NodeId dht), Serialize ip, Serialize u) => Serialize (Bucket dht ip u) where + get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) + put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes + + +psqFromPairList :: (Ord p, Ord k) => [(k, p)] -> OrdPSQ k p () +psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs + +psqToPairList :: OrdPSQ t t1 () -> [(t, t1)] +psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq + +-- | Update interval, in seconds. +delta :: NominalDiffTime +delta = 15 * 60 + +-- | Should maintain a set of stable long running nodes. +-- +-- Note: pings are triggerd only when a bucket is full. +insertBucket :: (Eq ip, Alternative f, Ord (NodeId dht)) => Timestamp -> Event dht ip u -> Bucket dht ip u + -> f ([CheckPing dht ip u], Bucket dht ip u) +insertBucket curTime (TryInsert info) bucket + -- just update timestamp if a node is already in bucket + | already_have + = pure ( [], map_ns $ PSQ.insertWith max info curTime ) + -- bucket is good, but not full => we can insert a new node + | PSQ.size (bktNodes bucket) < defaultBucketSize + = pure ( [], map_ns $ PSQ.insert info curTime ) + -- If there are any questionable nodes in the bucket have not been + -- seen in the last 15 minutes, the least recently seen node is + -- pinged. If any nodes in the bucket are known to have become bad, + -- then one is replaced by the new node in the next insertBucket + -- iteration. + | not (L.null stales) + = pure ( [CheckPing stales] + , bucket { -- Update timestamps so that we don't redundantly ping. + bktNodes = updateStamps curTime stales $ bktNodes bucket + -- Update queue with the pending NodeInfo in case of ping fail. + , bktQ = runIdentity $ pushBack bucketQ info $ bktQ bucket } ) + -- When the bucket is full of good nodes, the new node is simply discarded. + -- We must return 'A.empty' here to ensure that bucket splitting happens + -- inside 'modifyBucket'. + | otherwise = A.empty + where + -- We (take 1) to keep a 1-to-1 correspondence between pending pings and + -- waiting nodes in the bktQ. This way, we don't have to worry about what + -- to do with failed pings for which there is no ready replacements. + stales = -- One stale: + do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) + guard (t < curTime - delta) + return n + -- All stale: + -- map key $ PSQ.atMost (curTime - delta) $ bktNodes bucket + + already_have = maybe False (const True) $ PSQ.lookup info (bktNodes bucket) + + map_ns f = bucket { bktNodes = f (bktNodes bucket) } + -- map_q f = bucket { bktQ = runIdentity $ f (bktQ bucket) } + +insertBucket curTime (PingResult bad_node got_response) bucket + = pure ([], Bucket (upd $ bktNodes bucket) popped) + where + (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) + upd | got_response = id + | Just info <- top = \nodes -> + fromMaybe nodes $ do + _ <- PSQ.lookup bad_node nodes -- Insert only if there's a removal. + let nodes' = PSQ.delete bad_node nodes + pure $ PSQ.insert info curTime nodes' + | otherwise = id + +updateStamps :: ( Eq ip + , Ord (NodeId dht) + ) => Timestamp -> [NodeInfo dht ip u] -> PSQ (NodeInfo dht ip u) Timestamp -> PSQ (NodeInfo dht ip u) Timestamp +updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales + + +type BitIx = Word + +partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) +partitionQ imp test q0 = do + pass0 <- emptyQueue imp + fail0 <- emptyQueue imp + let flipfix a b f = fix f a b + flipfix q0 (pass0,fail0) $ \rec q qs -> do + (mb,q') <- popFront imp q + case mb of + Nothing -> return qs + Just e -> do qs' <- select (pushBack imp e) qs + rec q' qs' + where + select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) + select f = if test e then \(a,b) -> flip (,) b <$> f a + else \(a,b) -> (,) a <$> f b + +split :: forall dht ip u. + ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => BitIx -> Bucket dht ip u -> (Bucket dht ip u, Bucket dht ip u) +split i b = (Bucket ns qs, Bucket ms rs) + where + (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . key) . PSQ.toList $ bktNodes b + (qs,rs) = runIdentity $ partitionQ bucketQ spanBit $ bktQ b + {- + spanBit :: forall (dht :: * -> *) addr u. + FiniteBits (Network.DatagramServer.Types.NodeId dht) => + NodeInfo dht addr u -> Bool + -} + spanBit :: NodeInfo dht addr u -> Bool + spanBit entry = testIdBit (nodeId entry) i + +{----------------------------------------------------------------------- +-- Table +-----------------------------------------------------------------------} + +-- | Number of buckets in a routing table. +type BucketCount = Int + +defaultBucketCount :: BucketCount +defaultBucketCount = 20 + +data Info dht ip u = Info + { myBuckets :: Table dht ip u + , myNodeId :: NodeId dht + , myAddress :: SockAddr + } + deriving Generic + +deriving instance (Eq ip, Eq u, Eq (NodeId dht)) => Eq (Info dht ip u) +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Info dht ip u) + +-- instance (Eq ip, Serialize ip) => Serialize (Info ip) + +-- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ +-- 160. The routing table is subdivided into 'Bucket's that each cover +-- a portion of the space. An empty table has one bucket with an ID +-- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" +-- is inserted into the table, it is placed within the bucket that has +-- @min <= N < max@. An empty table has only one bucket so any node +-- must fit within it. Each bucket can only hold 'K' nodes, currently +-- eight, before becoming 'Full'. When a bucket is full of known good +-- nodes, no more nodes may be added unless our own 'NodeId' falls +-- within the range of the 'Bucket'. In that case, the bucket is +-- replaced by two new buckets each with half the range of the old +-- bucket and the nodes from the old bucket are distributed among the +-- two new ones. For a new table with only one bucket, the full bucket +-- is always split into two new buckets covering the ranges @0..2 ^ +-- 159@ and @2 ^ 159..2 ^ 160@. +-- +data Table dht ip u + -- most nearest bucket + = Tip (NodeId dht) BucketCount (Bucket dht ip u) + + -- left biased tree branch + | Zero (Table dht ip u) (Bucket dht ip u) + + -- right biased tree branch + | One (Bucket dht ip u) (Table dht ip u) + deriving Generic + +instance (Eq ip, Eq (NodeId dht)) => Eq (Table dht ip u) where + (==) = (==) `on` Network.DHT.Routing.toList + +instance Serialize NominalDiffTime where + put = putWord32be . fromIntegral . fromEnum + get = (toEnum . fromIntegral) <$> getWord32be + +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Table dht ip u) + +-- | Normally, routing table should be saved between invocations of +-- the client software. Note that you don't need to store /this/ +-- 'NodeId' since it is already included in routing table. +instance (Eq ip, Serialize ip, Ord (NodeId dht), Serialize (NodeId dht), Serialize u) => Serialize (Table dht ip u) + +-- | Shape of the table. +instance Pretty (Table dht ip u) where + pPrint t + | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss + | otherwise = brackets $ + PP.int (L.sum ss) <> " nodes, " <> + PP.int bucketCount <> " buckets" + where + bucketCount = L.length ss + ss = shape t + +-- | Empty table with specified /spine/ node id. +nullTable :: Eq ip => NodeId dht -> BucketCount -> Table dht ip u +nullTable nid n = Tip nid (bucketCount (pred n)) (Bucket PSQ.empty (runIdentity $ emptyQueue bucketQ)) + where + bucketCount x = max 0 (min 159 x) + +-- | Test if table is empty. In this case DHT should start +-- bootstrapping process until table becomes 'full'. +null :: Table dht ip u -> Bool +null (Tip _ _ b) = PSQ.null $ bktNodes b +null _ = False + +-- | Test if table have maximum number of nodes. No more nodes can be +-- 'insert'ed, except old ones becomes bad. +full :: Table dht ip u -> Bool +full (Tip _ n _) = n == 0 +full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t +full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t + +-- | Get the /spine/ node id. +thisId :: Table dht ip u -> NodeId dht +thisId (Tip nid _ _) = nid +thisId (Zero table _) = thisId table +thisId (One _ table) = thisId table + +-- | Number of nodes in a bucket or a table. +type NodeCount = Int + +-- | Internally, routing table is similar to list of buckets or a +-- /matrix/ of nodes. This function returns the shape of the matrix. +shape :: Table dht ip u -> [BucketSize] +shape = map (PSQ.size . bktNodes) . toBucketList + +-- | Get number of nodes in the table. +size :: Table dht ip u -> NodeCount +size = L.sum . shape + +-- | Get number of buckets in the table. +depth :: Table dht ip u -> BucketCount +depth = L.length . shape + +lookupBucket :: ( FiniteBits (NodeId dht) + ) => NodeId dht -> Table dht ip u -> [Bucket dht ip u] +lookupBucket nid = go 0 [] + where + go i bs (Zero table bucket) + | testIdBit nid i = bucket : toBucketList table ++ bs + | otherwise = go (succ i) (bucket:bs) table + go i bs (One bucket table) + | testIdBit nid i = go (succ i) (bucket:bs) table + | otherwise = bucket : toBucketList table ++ bs + go _ bs (Tip _ _ bucket) = bucket : bs + +compatibleNodeId :: forall dht ip u. + ( Serialize (NodeId dht) + , FiniteBits (NodeId dht) + ) => Table dht ip u -> IO (NodeId dht) +compatibleNodeId tbl = genBucketSample prefix br + where + br = bucketRange (L.length (shape tbl) - 1) True + nodeIdSize = finiteBitSize (undefined :: NodeId dht) `div` 8 + bs = BS.pack $ take nodeIdSize $ tablePrefix tbl ++ repeat 0 + prefix = either error id $ S.decode bs + +tablePrefix :: Table dht ip u -> [Word8] +tablePrefix = map (packByte . take 8 . (++repeat False)) + . chunksOf 8 + . tableBits + where + packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] + bitmask ix True = bit ix + bitmask _ _ = 0 + +tableBits :: Table dht ip u -> [Bool] +tableBits (One _ tbl) = True : tableBits tbl +tableBits (Zero tbl _) = False : tableBits tbl +tableBits (Tip _ _ _) = [] + +chunksOf :: Int -> [e] -> [[e]] +chunksOf i ls = map (take i) (build (splitter ls)) where + splitter :: [e] -> ([e] -> a -> a) -> a -> a + splitter [] _ n = n + splitter l c n = l `c` splitter (drop i l) c n + +build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] +build g = g (:) [] + +-- | Count of closest nodes in find_node request. +type K = Int + +-- | Default 'K' is equal to 'defaultBucketSize'. +defaultK :: K +defaultK = 8 + +class TableKey dht k where + toNodeId :: k -> NodeId dht + +instance TableKey dht (NodeId dht) where + toNodeId = id + +-- | Get a list of /K/ closest nodes using XOR metric. Used in +-- 'find_node' and 'get_peers' queries. +kclosest :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => TableKey dht a => K -> a -> Table dht ip u -> [NodeInfo dht ip u] +kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) + ++ rank nodeId nid (L.concat everyone) + where + (bucket,everyone) = + L.splitAt 1 + . L.map (L.map PSQ.key . PSQ.toList . bktNodes) + . lookupBucket nid + $ tbl + +{----------------------------------------------------------------------- +-- Routing +-----------------------------------------------------------------------} + +splitTip :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => NodeId dht -> BucketCount -> BitIx -> Bucket dht ip u -> Table dht ip u +splitTip nid n i bucket + | testIdBit nid i = (One zeros (Tip nid (pred n) ones)) + | otherwise = (Zero (Tip nid (pred n) zeros) ones) + where + (ones, zeros) = split i bucket + +-- | Used in each query. +-- +-- TODO: Kademlia non-empty subtrees should should split if they have less than +-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia +-- paper. The rule requiring additional splits is in section 2.4. +modifyBucket + :: forall xs dht ip u. + ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => + NodeId dht -> (Bucket dht ip u -> Maybe (xs, Bucket dht ip u)) -> Table dht ip u -> Maybe (xs,Table dht ip u) +modifyBucket nodeId f = go (0 :: BitIx) + where + go :: BitIx -> Table dht ip u -> Maybe (xs, Table dht ip u) + go !i (Zero table bucket) + | testIdBit nodeId i = second (Zero table) <$> f bucket + | otherwise = second (`Zero` bucket) <$> go (succ i) table + go !i (One bucket table ) + | testIdBit nodeId i = second (One bucket) <$> go (succ i) table + | otherwise = second (`One` table) <$> f bucket + go !i (Tip nid n bucket) + | n == 0 = second (Tip nid n) <$> f bucket + | otherwise = second (Tip nid n) <$> f bucket + <|> go i (splitTip nid n i bucket) + +-- | Triggering event for atomic table update +data Event dht ip u = TryInsert { foreignNode :: NodeInfo dht ip u } + | PingResult { foreignNode :: NodeInfo dht ip u + , ponged :: Bool + } +deriving instance Eq (NodeId dht) => Eq (Event dht ip u) +deriving instance ( Show ip + , Show (NodeId dht) + , Show u + ) => Show (Event dht ip u) + +eventId :: Event dht ip u -> NodeId dht +eventId (TryInsert NodeInfo{..}) = nodeId +eventId (PingResult NodeInfo{..} _) = nodeId + +-- | Actions requested by atomic table update +data CheckPing dht ip u = CheckPing [NodeInfo dht ip u] + +deriving instance Eq (NodeId dht) => Eq (CheckPing dht ip u) +deriving instance ( Show ip + , Show (NodeId dht) + , Show u + ) => Show (CheckPing dht ip u) + + +-- | Atomic 'Table' update +insert :: ( Eq ip + , Applicative m + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => Timestamp -> Event dht ip u -> Table dht ip u -> m ([CheckPing dht ip u], Table dht ip u) +insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) (insertBucket tm event) tbl + + +{----------------------------------------------------------------------- +-- Conversion +-----------------------------------------------------------------------} + +type TableEntry dht ip u = (NodeInfo dht ip u, Timestamp) + +tableEntry :: NodeEntry dht ip u -> TableEntry dht ip u +tableEntry (a :-> b) = (a, b) + +-- | Non-empty list of buckets. +toBucketList :: Table dht ip u -> [Bucket dht ip u] +toBucketList (Tip _ _ b) = [b] +toBucketList (Zero t b) = b : toBucketList t +toBucketList (One b t) = b : toBucketList t + +toList :: Eq ip => Table dht ip u -> [[TableEntry dht ip u]] +toList = L.map (L.map tableEntry . PSQ.toList . bktNodes) . toBucketList -- cgit v1.2.3