From 8c33deac14ca92ef67afc7fbcd3f67bc19317f88 Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 8 Jun 2017 03:07:13 -0400 Subject: WIP: Adapting DHT to Tox network (part 6). --- src/Network/BitTorrent/DHT/Routing.hs | 268 ++++++++++++++++++---------------- 1 file changed, 141 insertions(+), 127 deletions(-) (limited to 'src/Network/BitTorrent/DHT/Routing.hs') diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 6cf7f122..42728a53 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs @@ -13,12 +13,14 @@ -- For more info see: -- -- +{-# LANGUAGE CPP #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module Network.BitTorrent.DHT.Routing ( -- * Table @@ -59,8 +61,6 @@ module Network.BitTorrent.DHT.Routing -- * Routing , Timestamp - , Routing - , runRouting ) where import Control.Applicative as A @@ -83,10 +83,16 @@ import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) import qualified Data.ByteString as BS import Data.Bits -import Network.KRPC.Message (KMessageOf) import Data.Torrent import Network.BitTorrent.Address -import Network.DHT.Mainline +#ifdef VERSION_bencoding +import Network.DHT.Mainline () +import Network.KRPC.Message (KMessageOf) +#else +import Data.Tox as Tox +type KMessageOf = Tox.Message +#endif + {----------------------------------------------------------------------- -- Routing monad @@ -109,66 +115,6 @@ import Network.DHT.Mainline -- type Timestamp = POSIXTime --- | Some routing operations might need to perform additional IO. -data Routing ip result - = Full - | Done result - | GetTime ( Timestamp -> Routing ip result) - | NeedPing (NodeAddr ip) ( Bool -> Routing ip result) - | Refresh NodeId (Routing ip result) - -instance Functor (Routing ip) where - fmap _ Full = Full - fmap f (Done r) = Done ( f r) - fmap f (GetTime g) = GetTime (fmap f . g) - fmap f (NeedPing addr g) = NeedPing addr (fmap f . g) - fmap f (Refresh nid g) = Refresh nid (fmap f g) - -instance Monad (Routing ip) where - return = Done - - Full >>= _ = Full - Done r >>= m = m r - GetTime f >>= m = GetTime $ \ t -> f t >>= m - NeedPing a f >>= m = NeedPing a $ \ p -> f p >>= m - Refresh n f >>= m = Refresh n $ f >>= m - -instance Applicative (Routing ip) where - pure = return - (<*>) = ap - -instance Alternative (Routing ip) where - empty = Full - - Full <|> m = m - Done a <|> _ = Done a - GetTime f <|> m = GetTime $ \ t -> f t <|> m - NeedPing a f <|> m = NeedPing a $ \ p -> f p <|> m - Refresh n f <|> m = Refresh n (f <|> m) - --- | Run routing table operation. -runRouting :: Monad m - => (NodeAddr ip -> m Bool) -- ^ ping the specific node; - -> (NodeId -> m ()) -- ^ refresh nodes; - -> m Timestamp -- ^ get current time; - -> Routing ip f -- ^ operation to run; - -> m (Maybe f) -- ^ operation result; -runRouting ping_node find_nodes timestamper = go - where - go Full = return (Nothing) - go (Done r) = return (Just r) - go (GetTime f) = do - t <- timestamper - go (f t) - - go (NeedPing addr f) = do - pong <- ping_node addr - go (f pong) - - go (Refresh nid f) = do - find_nodes nid - go f - {----------------------------------------------------------------------- Bucket -----------------------------------------------------------------------} @@ -182,7 +128,7 @@ runRouting ping_node find_nodes timestamper = go -- other words: new nodes are used only when older nodes disappear. -- | Timestamp - last time this node is pinged. -type NodeEntry ip = Binding (NodeInfo KMessageOf ip ()) Timestamp +type NodeEntry dht ip u = Binding (NodeInfo dht ip u) Timestamp -- TODO instance Pretty where @@ -213,7 +159,7 @@ fromQ embed project QueueMethods{..} = } -} -seqQ :: QueueMethods Identity (NodeInfo KMessageOf ip ()) (Seq.Seq (NodeInfo KMessageOf ip ())) +seqQ :: QueueMethods Identity (NodeInfo dht ip u) (Seq.Seq (NodeInfo dht ip u)) seqQ = QueueMethods { pushBack = \e fifo -> pure (fifo Seq.|> e) , popFront = \fifo -> case Seq.viewl fifo of @@ -222,9 +168,9 @@ seqQ = QueueMethods , emptyQueue = pure Seq.empty } -type BucketQueue ip = Seq.Seq (NodeInfo KMessageOf ip ()) +type BucketQueue dht ip u = Seq.Seq (NodeInfo dht ip u) -bucketQ :: QueueMethods Identity (NodeInfo KMessageOf ip ()) (BucketQueue ip) +bucketQ :: QueueMethods Identity (NodeInfo dht ip u) (BucketQueue dht ip u) bucketQ = seqQ -- | Bucket is also limited in its length — thus it's called k-bucket. @@ -234,16 +180,45 @@ bucketQ = seqQ -- very unlikely that all nodes in bucket fail within an hour of -- each other. -- -data Bucket ip = Bucket { bktNodes :: !(PSQ (NodeInfo KMessageOf ip ()) Timestamp) - , bktQ :: !(BucketQueue ip) - } deriving (Show,Generic) - -instance (Eq ip, Serialize ip) => Serialize (Bucket ip) where - get = Bucket . psqFromPairList <$> get <*> pure (runIdentity $ emptyQueue bucketQ) - put = put . psqToPairList . bktNodes - +data Bucket dht ip u = Bucket { bktNodes :: !(PSQ (NodeInfo dht ip u) Timestamp) + , bktQ :: !(BucketQueue dht ip u) + } deriving Generic + +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Bucket dht ip u) + + +getGenericNode :: ( Serialize (NodeId dht) + , Serialize ip + , Serialize u + ) => Get (NodeInfo dht ip u) +getGenericNode = do + nid <- get + naddr <- get + u <- get + return NodeInfo + { nodeId = nid + , nodeAddr = naddr + , nodeAnnotation = u + } + +putGenericNode :: ( Serialize (NodeId dht) + , Serialize ip + , Serialize u + ) => NodeInfo dht ip u -> Put +putGenericNode (NodeInfo nid naddr u) = do + put nid + put naddr + put u + +instance (Eq ip, Ord (NodeId dht), Serialize (NodeId dht), Serialize ip, Serialize u) => Serialize (Bucket dht ip u) where + get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) + put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes + + +psqFromPairList :: (Ord p, Ord k) => [(k, p)] -> OrdPSQ k p () psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs +psqToPairList :: OrdPSQ t t1 () -> [(t, t1)] psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq -- | Update interval, in seconds. @@ -253,8 +228,8 @@ delta = 15 * 60 -- | Should maintain a set of stable long running nodes. -- -- Note: pings are triggerd only when a bucket is full. -insertBucket :: (Eq ip, Alternative f) => Timestamp -> Event ip -> Bucket ip - -> f ([CheckPing ip], Bucket ip) +insertBucket :: (Eq ip, Alternative f, Ord (NodeId dht)) => Timestamp -> Event dht ip u -> Bucket dht ip u + -> f ([CheckPing dht ip u], Bucket dht ip u) insertBucket curTime (TryInsert info) bucket -- just update timestamp if a node is already in bucket | already_have @@ -305,7 +280,9 @@ insertBucket curTime (PingResult bad_node got_response) bucket pure $ PSQ.insert info curTime nodes' | otherwise = id -updateStamps :: Eq ip => Timestamp -> [NodeInfo KMessageOf ip ()] -> PSQ (NodeInfo KMessageOf ip ()) Timestamp -> PSQ (NodeInfo KMessageOf ip ()) Timestamp +updateStamps :: ( Eq ip + , Ord (NodeId dht) + ) => Timestamp -> [NodeInfo dht ip u] -> PSQ (NodeInfo dht ip u) Timestamp -> PSQ (NodeInfo dht ip u) Timestamp updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales @@ -327,7 +304,11 @@ partitionQ imp test q0 = do select f = if test e then \(a,b) -> flip (,) b <$> f a else \(a,b) -> (,) a <$> f b -split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip) +split :: forall dht ip u. + ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => BitIx -> Bucket dht ip u -> (Bucket dht ip u, Bucket dht ip u) split i b = (Bucket ns qs, Bucket ms rs) where (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . key) . PSQ.toList $ bktNodes b @@ -337,7 +318,7 @@ split i b = (Bucket ns qs, Bucket ms rs) FiniteBits (Network.RPC.NodeId dht) => NodeInfo dht addr u -> Bool -} - spanBit :: NodeInfo KMessageOf addr () -> Bool + spanBit :: NodeInfo dht addr u -> Bool spanBit entry = testIdBit (nodeId entry) i {----------------------------------------------------------------------- @@ -350,12 +331,15 @@ type BucketCount = Int defaultBucketCount :: BucketCount defaultBucketCount = 20 -data Info ip = Info - { myBuckets :: Table ip - , myNodeId :: NodeId +data Info dht ip u = Info + { myBuckets :: Table dht ip u + , myNodeId :: NodeId dht , myAddress :: SockAddr } - deriving (Eq, Show, Generic) + deriving Generic + +deriving instance (Eq ip, Eq u, Eq (NodeId dht)) => Eq (Info dht ip u) +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Info dht ip u) -- instance (Eq ip, Serialize ip) => Serialize (Info ip) @@ -375,31 +359,33 @@ data Info ip = Info -- is always split into two new buckets covering the ranges @0..2 ^ -- 159@ and @2 ^ 159..2 ^ 160@. -- -data Table ip +data Table dht ip u -- most nearest bucket - = Tip NodeId BucketCount (Bucket ip) + = Tip (NodeId dht) BucketCount (Bucket dht ip u) -- left biased tree branch - | Zero (Table ip) (Bucket ip) + | Zero (Table dht ip u) (Bucket dht ip u) -- right biased tree branch - | One (Bucket ip) (Table ip) - deriving (Show, Generic) + | One (Bucket dht ip u) (Table dht ip u) + deriving Generic -instance Eq ip => Eq (Table ip) where +instance (Eq ip, Eq (NodeId dht)) => Eq (Table dht ip u) where (==) = (==) `on` Network.BitTorrent.DHT.Routing.toList instance Serialize NominalDiffTime where put = putWord32be . fromIntegral . fromEnum get = (toEnum . fromIntegral) <$> getWord32be +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Table dht ip u) + -- | Normally, routing table should be saved between invocations of -- the client software. Note that you don't need to store /this/ -- 'NodeId' since it is already included in routing table. -instance (Eq ip, Serialize ip) => Serialize (Table ip) +instance (Eq ip, Serialize ip, Ord (NodeId dht), Serialize (NodeId dht), Serialize u) => Serialize (Table dht ip u) -- | Shape of the table. -instance Pretty (Table ip) where +instance Pretty (Table dht ip u) where pPrint t | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss | otherwise = brackets $ @@ -410,26 +396,26 @@ instance Pretty (Table ip) where ss = shape t -- | Empty table with specified /spine/ node id. -nullTable :: Eq ip => NodeId -> BucketCount -> Table ip +nullTable :: Eq ip => NodeId dht -> BucketCount -> Table dht ip u nullTable nid n = Tip nid (bucketCount (pred n)) (Bucket PSQ.empty (runIdentity $ emptyQueue bucketQ)) where bucketCount x = max 0 (min 159 x) -- | Test if table is empty. In this case DHT should start -- bootstrapping process until table becomes 'full'. -null :: Table ip -> Bool +null :: Table dht ip u -> Bool null (Tip _ _ b) = PSQ.null $ bktNodes b null _ = False -- | Test if table have maximum number of nodes. No more nodes can be -- 'insert'ed, except old ones becomes bad. -full :: Table ip -> Bool +full :: Table dht ip u -> Bool full (Tip _ n _) = n == 0 full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t -- | Get the /spine/ node id. -thisId :: Table ip -> NodeId +thisId :: Table dht ip u -> NodeId dht thisId (Tip nid _ _) = nid thisId (Zero table _) = thisId table thisId (One _ table) = thisId table @@ -439,18 +425,19 @@ type NodeCount = Int -- | Internally, routing table is similar to list of buckets or a -- /matrix/ of nodes. This function returns the shape of the matrix. -shape :: Table ip -> [BucketSize] +shape :: Table dht ip u -> [BucketSize] shape = map (PSQ.size . bktNodes) . toBucketList -- | Get number of nodes in the table. -size :: Table ip -> NodeCount +size :: Table dht ip u -> NodeCount size = L.sum . shape -- | Get number of buckets in the table. -depth :: Table ip -> BucketCount +depth :: Table dht ip u -> BucketCount depth = L.length . shape -lookupBucket :: NodeId -> Table ip -> [Bucket ip] +lookupBucket :: ( FiniteBits (NodeId dht) + ) => NodeId dht -> Table dht ip u -> [Bucket dht ip u] lookupBucket nid = go 0 [] where go i bs (Zero table bucket) @@ -461,14 +448,18 @@ lookupBucket nid = go 0 [] | otherwise = bucket : toBucketList table ++ bs go _ bs (Tip _ _ bucket) = bucket : bs -compatibleNodeId :: Table ip -> IO NodeId +compatibleNodeId :: forall dht ip u. + ( Serialize (NodeId dht) + , FiniteBits (NodeId dht) + ) => Table dht ip u -> IO (NodeId dht) compatibleNodeId tbl = genBucketSample prefix br where br = bucketRange (L.length (shape tbl) - 1) True + nodeIdSize = finiteBitSize (undefined :: NodeId dht) `div` 8 bs = BS.pack $ take nodeIdSize $ tablePrefix tbl ++ repeat 0 prefix = either error id $ S.decode bs -tablePrefix :: Table ip -> [Word8] +tablePrefix :: Table dht ip u -> [Word8] tablePrefix = map (packByte . take 8 . (++repeat False)) . chunksOf 8 . tableBits @@ -477,7 +468,7 @@ tablePrefix = map (packByte . take 8 . (++repeat False)) bitmask ix True = bit ix bitmask _ _ = 0 -tableBits :: Table ip -> [Bool] +tableBits :: Table dht ip u -> [Bool] tableBits (One _ tbl) = True : tableBits tbl tableBits (Zero tbl _) = False : tableBits tbl tableBits (Tip _ _ _) = [] @@ -498,20 +489,23 @@ type K = Int defaultK :: K defaultK = 8 -class TableKey k where - toNodeId :: k -> NodeId +class TableKey dht k where + toNodeId :: k -> NodeId dht -instance TableKey NodeId where +instance TableKey dht (NodeId dht) where toNodeId = id -instance TableKey InfoHash where +instance TableKey KMessageOf InfoHash where toNodeId = either (error msg) id . S.decode . S.encode where -- TODO unsafe coerse? msg = "tableKey: impossible" -- | Get a list of /K/ closest nodes using XOR metric. Used in -- 'find_node' and 'get_peers' queries. -kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo KMessageOf ip ()] +kclosest :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => TableKey dht a => K -> a -> Table dht ip u -> [NodeInfo dht ip u] kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) ++ rank nodeId nid (L.concat everyone) where @@ -525,7 +519,10 @@ kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) -- Routing -----------------------------------------------------------------------} -splitTip :: Eq ip => NodeId -> BucketCount -> BitIx -> Bucket ip -> Table ip +splitTip :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => NodeId dht -> BucketCount -> BitIx -> Bucket dht ip u -> Table dht ip u splitTip nid n i bucket | testIdBit nid i = (One zeros (Tip nid (pred n) ones)) | otherwise = (Zero (Tip nid (pred n) zeros) ones) @@ -538,11 +535,15 @@ splitTip nid n i bucket -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia -- paper. The rule requiring additional splits is in section 2.4. modifyBucket - :: forall ip xs. (Eq ip) => - NodeId -> (Bucket ip -> Maybe (xs, Bucket ip)) -> Table ip -> Maybe (xs,Table ip) + :: forall xs dht ip u. + ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => + NodeId dht -> (Bucket dht ip u -> Maybe (xs, Bucket dht ip u)) -> Table dht ip u -> Maybe (xs,Table dht ip u) modifyBucket nodeId f = go (0 :: BitIx) where - go :: BitIx -> Table ip -> Maybe (xs, Table ip) + go :: BitIx -> Table dht ip u -> Maybe (xs, Table dht ip u) go !i (Zero table bucket) | testIdBit nodeId i = second (Zero table) <$> f bucket | otherwise = second (`Zero` bucket) <$> go (succ i) table @@ -555,23 +556,36 @@ modifyBucket nodeId f = go (0 :: BitIx) <|> go i (splitTip nid n i bucket) -- | Triggering event for atomic table update -data Event ip = TryInsert { foreignNode :: NodeInfo KMessageOf ip () } - | PingResult { foreignNode :: NodeInfo KMessageOf ip () - , ponged :: Bool - } - deriving (Eq,Show) -- Ord - -eventId :: Event ip -> NodeId +data Event dht ip u = TryInsert { foreignNode :: NodeInfo dht ip u } + | PingResult { foreignNode :: NodeInfo dht ip u + , ponged :: Bool + } +deriving instance Eq (NodeId dht) => Eq (Event dht ip u) +deriving instance ( Show ip + , Show (NodeId dht) + , Show u + ) => Show (Event dht ip u) + +eventId :: Event dht ip u -> NodeId dht eventId (TryInsert NodeInfo{..}) = nodeId eventId (PingResult NodeInfo{..} _) = nodeId -- | Actions requested by atomic table update -data CheckPing ip = CheckPing [NodeInfo KMessageOf ip ()] - deriving (Eq,Show) -- Ord +data CheckPing dht ip u = CheckPing [NodeInfo dht ip u] + +deriving instance Eq (NodeId dht) => Eq (CheckPing dht ip u) +deriving instance ( Show ip + , Show (NodeId dht) + , Show u + ) => Show (CheckPing dht ip u) -- | Atomic 'Table' update -insert :: (Eq ip, Applicative m) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) +insert :: ( Eq ip + , Applicative m + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => Timestamp -> Event dht ip u -> Table dht ip u -> m ([CheckPing dht ip u], Table dht ip u) insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) (insertBucket tm event) tbl @@ -579,16 +593,16 @@ insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) ( -- Conversion -----------------------------------------------------------------------} -type TableEntry ip = (NodeInfo KMessageOf ip (), Timestamp) +type TableEntry dht ip u = (NodeInfo dht ip u, Timestamp) -tableEntry :: NodeEntry ip -> TableEntry ip +tableEntry :: NodeEntry dht ip u -> TableEntry dht ip u tableEntry (a :-> b) = (a, b) -- | Non-empty list of buckets. -toBucketList :: Table ip -> [Bucket ip] +toBucketList :: Table dht ip u -> [Bucket dht ip u] toBucketList (Tip _ _ b) = [b] toBucketList (Zero t b) = b : toBucketList t toBucketList (One b t) = b : toBucketList t -toList :: Eq ip => Table ip -> [[TableEntry ip]] +toList :: Eq ip => Table dht ip u -> [[TableEntry dht ip u]] toList = L.map (L.map tableEntry . PSQ.toList . bktNodes) . toBucketList -- cgit v1.2.3