From 65f9152b7be0dc86a09870114c9e33ff4642f918 Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 15 Sep 2017 17:45:57 -0400 Subject: Moved Network.DHT.Routing -> Network.Kademlia.Routing --- bittorrent/bittorrent.cabal | 4 +- dht-client.cabal | 4 +- examples/dhtd.hs | 2 +- src/Data/Torrent.hs | 2 +- src/Network/BitTorrent/MainlineDHT.hs | 4 +- src/Network/DHT/Routing.hs | 798 ---------------------------------- src/Network/Kademlia.hs | 2 +- src/Network/Kademlia/Routing.hs | 798 ++++++++++++++++++++++++++++++++++ src/Network/Kademlia/Search.hs | 2 +- src/Network/Tox.hs | 2 +- src/Network/Tox/DHT/Handlers.hs | 2 +- src/Network/Tox/Onion/Handlers.hs | 2 +- 12 files changed, 811 insertions(+), 811 deletions(-) delete mode 100644 src/Network/DHT/Routing.hs create mode 100644 src/Network/Kademlia/Routing.hs diff --git a/bittorrent/bittorrent.cabal b/bittorrent/bittorrent.cabal index 555143f7..bb34e0ec 100644 --- a/bittorrent/bittorrent.cabal +++ b/bittorrent/bittorrent.cabal @@ -81,7 +81,7 @@ library Data.Wrapper.PSQInt Data.MinMaxPSQ Network.Address - Network.DHT.Routing + Network.Kademlia.Routing Data.Torrent Network.BitTorrent.DHT.ContactInfo Network.BitTorrent.DHT.Token @@ -243,7 +243,7 @@ test-suite spec Network.BitTorrent.DHT.TestData Network.BitTorrent.DHT.MessageSpec Network.BitTorrent.DHT.QuerySpec - Network.DHT.RoutingSpec + Network.Kademlia.RoutingSpec Network.BitTorrent.DHT.SessionSpec Network.BitTorrent.DHT.TokenSpec Network.BitTorrent.Internal.CacheSpec diff --git a/dht-client.cabal b/dht-client.cabal index 12f5cddd..afd3f8da 100644 --- a/dht-client.cabal +++ b/dht-client.cabal @@ -65,7 +65,7 @@ library Data.Wrapper.PSQInt Data.MinMaxPSQ Network.Address - Network.DHT.Routing + Network.Kademlia.Routing Data.Torrent Network.BitTorrent.DHT.ContactInfo Network.BitTorrent.DHT.Token @@ -138,7 +138,7 @@ library Build-depends: network >= 2.4 && < 2.6 - other-modules: Paths_bittorrent + other-modules: Paths_dht_client Crypto.Cipher.Salsa Crypto.Cipher.XSalsa Crypto.ECC.Class diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 5c002363..6c655458 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -52,7 +52,7 @@ import Network.StreamServer import Network.Kademlia import qualified Network.BitTorrent.MainlineDHT as Mainline import qualified Network.Tox as Tox -import Network.DHT.Routing as R +import Network.Kademlia.Routing as R import Data.Aeson as J (ToJSON, FromJSON) import qualified Data.Aeson as J import qualified Data.ByteString.Lazy as L diff --git a/src/Data/Torrent.hs b/src/Data/Torrent.hs index 55b34f98..4af583ed 100644 --- a/src/Data/Torrent.hs +++ b/src/Data/Torrent.hs @@ -195,7 +195,7 @@ import System.FilePath import System.Posix.Types import Network.Address -import Network.DHT.Routing +import Network.Kademlia.Routing {----------------------------------------------------------------------- diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index e7d702c3..4566471a 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs @@ -60,8 +60,8 @@ import Network.Address (Address, fromAddr, fromSockAddr, import Network.BitTorrent.DHT.ContactInfo as Peers import Network.Kademlia.Search (Search (..)) import Network.BitTorrent.DHT.Token as Token -import qualified Network.DHT.Routing as R - ;import Network.DHT.Routing (Timestamp, getTimestamp) +import qualified Network.Kademlia.Routing as R + ;import Network.Kademlia.Routing (Timestamp, getTimestamp) import Network.QueryResponse import Network.Socket import System.IO diff --git a/src/Network/DHT/Routing.hs b/src/Network/DHT/Routing.hs deleted file mode 100644 index 11dbd11e..00000000 --- a/src/Network/DHT/Routing.hs +++ /dev/null @@ -1,798 +0,0 @@ --- | --- Copyright : (c) Sam Truzjan 2013 --- License : BSD3 --- Maintainer : pxqr.sta@gmail.com --- Stability : experimental --- Portability : portable --- --- Every node maintains a routing table of known good nodes. The --- nodes in the routing table are used as starting points for --- queries in the DHT. Nodes from the routing table are returned in --- response to queries from other nodes. --- --- For more info see: --- --- -{-# LANGUAGE CPP #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE ViewPatterns #-} -{-# LANGUAGE TypeOperators #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE DeriveFunctor #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TupleSections #-} -{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} -{-# OPTIONS_GHC -fno-warn-orphans #-} -module Network.DHT.Routing - {- - ( -- * BucketList - BucketList - , Info(..) - - -- * Attributes - , BucketCount - , defaultBucketCount - , BucketSize - , defaultBucketSize - , NodeCount - - -- * Query - , Network.DHT.Routing.null - , Network.DHT.Routing.full - , thisId - , shape - , Network.DHT.Routing.size - , Network.DHT.Routing.depth - , compatibleNodeId - - -- * Lookup - , K - , defaultK - , TableKey (..) - , kclosest - - -- * Construction - , Network.DHT.Routing.nullTable - , Event(..) - , CheckPing(..) - , Network.DHT.Routing.insert - - -- * Conversion - , Network.DHT.Routing.TableEntry - , Network.DHT.Routing.toList - - -- * Routing - , Timestamp - , getTimestamp - ) -} where - -import Control.Applicative as A -import Control.Arrow -import Control.Monad -import Data.Function -import Data.Functor.Identity -import Data.List as L hiding (insert) -import Data.Maybe -import Data.Monoid -import Data.Wrapper.PSQ as PSQ -import Data.Serialize as S hiding (Result, Done) -import qualified Data.Sequence as Seq -import Data.Time -import Data.Time.Clock.POSIX -import Data.Word -import GHC.Generics -import Text.PrettyPrint as PP hiding ((<>)) -import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) -import qualified Data.ByteString as BS -import Data.Bits -import Data.Ord -import Data.Reflection -import Network.Address -import Data.Typeable -import Data.Coerce -import Data.Hashable - --- | Last time the node was responding to our queries. --- --- Not all nodes that we learn about are equal. Some are \"good\" and --- some are not. Many nodes using the DHT are able to send queries --- and receive responses, but are not able to respond to queries --- from other nodes. It is important that each node's routing table --- must contain only known good nodes. A good node is a node has --- responded to one of our queries within the last 15 minutes. A --- node is also good if it has ever responded to one of our queries --- and has sent us a query within the last 15 minutes. After 15 --- minutes of inactivity, a node becomes questionable. Nodes become --- bad when they fail to respond to multiple queries in a row. Nodes --- that we know are good are given priority over nodes with unknown --- status. --- -type Timestamp = POSIXTime - -getTimestamp :: IO Timestamp -getTimestamp = do - utcTime <- getCurrentTime - return $ utcTimeToPOSIXSeconds utcTime - - - -{----------------------------------------------------------------------- - Bucket ------------------------------------------------------------------------} --- --- When a k-bucket is full and a new node is discovered for that --- k-bucket, the least recently seen node in the k-bucket is --- PINGed. If the node is found to be still alive, the new node is --- place in a secondary list, a replacement cache. The replacement --- cache is used only if a node in the k-bucket stops responding. In --- other words: new nodes are used only when older nodes disappear. - --- | Timestamp - last time this node is pinged. -type NodeEntry ni = Binding ni Timestamp - - --- | Maximum number of 'NodeInfo's stored in a bucket. Most clients --- use this value. -defaultBucketSize :: Int -defaultBucketSize = 8 - -data QueueMethods m elem fifo = QueueMethods - { pushBack :: elem -> fifo -> m fifo - , popFront :: fifo -> m (Maybe elem, fifo) - , emptyQueue :: m fifo - } - -{- -fromQ :: Functor m => - ( a -> b ) - -> ( b -> a ) - -> QueueMethods m elem a - -> QueueMethods m elem b -fromQ embed project QueueMethods{..} = - QueueMethods { pushBack = \e -> fmap embed . pushBack e . project - , popFront = fmap (second embed) . popFront . project - , emptyQueue = fmap embed emptyQueue - } --} - -seqQ :: QueueMethods Identity ni (Seq.Seq ni) -seqQ = QueueMethods - { pushBack = \e fifo -> pure (fifo Seq.|> e) - , popFront = \fifo -> case Seq.viewl fifo of - e Seq.:< fifo' -> pure (Just e, fifo') - Seq.EmptyL -> pure (Nothing, Seq.empty) - , emptyQueue = pure Seq.empty - } - -type BucketQueue ni = Seq.Seq ni - -bucketQ :: QueueMethods Identity ni (BucketQueue ni) -bucketQ = seqQ - - -data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int) - -contramapC :: (b -> a) -> Compare a -> Compare b -contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b)) - (\s x -> hsh s (f x)) - -newtype Ordered' s a = Ordered a - deriving (Show) - --- | Hack to avoid UndecidableInstances -newtype Shrink a = Shrink a - deriving (Show) - -type Ordered s a = Ordered' s (Shrink a) - -instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where - a == b = (compare a b == EQ) - -instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where - compare a b = cmp (coerce a) (coerce b) - where Compare cmp _ = reflect (Proxy :: Proxy s) - -instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where - hashWithSalt salt x = hash salt (coerce x) - where Compare _ hash = reflect (Proxy :: Proxy s) - --- | Bucket is also limited in its length — thus it's called k-bucket. --- When bucket becomes full, we should split it in two lists by --- current span bit. Span bit is defined by depth in the routing --- table tree. Size of the bucket should be choosen such that it's --- very unlikely that all nodes in bucket fail within an hour of --- each other. -data Bucket s ni = Bucket - { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes - , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs - } deriving (Generic) - -#define CAN_SHOW_BUCKET 0 - -#if CAN_SHOW_BUCKET -deriving instance Show ni => Show (Bucket s ni) -#endif - -bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni -bucketCompare _ = reflect (Proxy :: Proxy s) - -mapBucket :: ( Reifies s (Compare a) - , Reifies t (Compare ni) - ) => (a -> ni) -> Bucket s a -> Bucket t ni -mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns) - (fmap (second f) q) - where f' = coerce . f . coerce - - -#if 0 - -{- -getGenericNode :: ( Serialize (NodeId) - , Serialize ip - , Serialize u - ) => Get (NodeInfo) -getGenericNode = do - nid <- get - naddr <- get - u <- get - return NodeInfo - { nodeId = nid - , nodeAddr = naddr - , nodeAnnotation = u - } - -putGenericNode :: ( Serialize (NodeId) - , Serialize ip - , Serialize u - ) => NodeInfo -> Put -putGenericNode (NodeInfo nid naddr u) = do - put nid - put naddr - put u - -instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where - get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) - put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes --} - -#endif - -psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p -psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs - -psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)] -psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq - --- | Update interval, in seconds. -delta :: NominalDiffTime -delta = 15 * 60 - --- | Should maintain a set of stable long running nodes. --- --- Note: pings are triggerd only when a bucket is full. -updateBucketForInbound curTime info bucket - -- Just update timestamp if a node is already in bucket. - -- - -- Note PingResult events should only occur for nodes we requested a ping for, - -- and those will always already be in the routing queue and will get their - -- timestamp updated here, since 'TryInsert' is called on every inbound packet, - -- including ping results. - | already_have - = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime ) - -- bucket is good, but not full => we can insert a new node - | PSQ.size (bktNodes bucket) < defaultBucketSize - = pure ( [], map_ns $ PSQ.insert (coerce info) curTime ) - -- If there are any questionable nodes in the bucket have not been - -- seen in the last 15 minutes, the least recently seen node is - -- pinged. If any nodes in the bucket are known to have become bad, - -- then one is replaced by the new node in the next insertBucket - -- iteration. - | not (L.null stales) - = pure ( stales - , bucket { -- Update timestamps so that we don't redundantly ping. - bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket - -- Update queue with the pending NodeInfo in case of ping fail. - , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } ) - -- When the bucket is full of good nodes, the new node is simply discarded. - -- We must return 'A.empty' here to ensure that bucket splitting happens - -- inside 'modifyBucket'. - | otherwise = A.empty - where - -- We (take 1) to keep a 1-to-1 correspondence between pending pings and - -- waiting nodes in the bktQ. This way, we don't have to worry about what - -- to do with failed pings for which there is no ready replacements. - stales = -- One stale: - do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) - guard (t < curTime - delta) - return $ coerce n - -- All stale: - -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket - - already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket) - - map_ns f = bucket { bktNodes = f (bktNodes bucket) } - -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } - -updateBucketForPingResult bad_node got_response bucket - = pure ( map (,Nothing) forgotten - ++ map (second Just) replacements - , Bucket (foldr replace - (bktNodes bucket) - replacements) - popped - ) - where - (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) - - -- Dropped from accepted, replaced by pending. - replacements | got_response = [] -- Timestamp was already updated by TryInsert. - | Just info <- top = do - -- Insert only if there's a removal. - _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket) - return (bad_node, info) - | otherwise = [] - - -- Dropped from the pending queue without replacing. - forgotten | got_response = maybeToList $ fmap snd top - | otherwise = [] - - - replace (bad_node, (tm, info)) = - PSQ.insert (coerce info) tm - . PSQ.delete (coerce bad_node) - - -updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp -updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales - -type BitIx = Word - -partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) -partitionQ imp test q0 = do - pass0 <- emptyQueue imp - fail0 <- emptyQueue imp - let flipfix a b f = fix f a b - flipfix q0 (pass0,fail0) $ \rec q qs -> do - (mb,q') <- popFront imp q - case mb of - Nothing -> return qs - Just e -> do qs' <- select (pushBack imp e) qs - rec q' qs' - where - select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) - select f = if test e then \(a,b) -> flip (,) b <$> f a - else \(a,b) -> (,) a <$> f b - - - -split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => - forall ni s. ( Reifies s (Compare ni) ) => - (ni -> Word -> Bool) - -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni) -split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs) - where - (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b - (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b - - spanBit :: ni -> Bool - spanBit entry = testNodeIdBit entry i - - -{----------------------------------------------------------------------- --- BucketList ------------------------------------------------------------------------} - -defaultBucketCount :: Int -defaultBucketCount = 20 - -defaultMaxBucketCount :: Word -defaultMaxBucketCount = 24 - -data Info ni nid = Info - { myBuckets :: BucketList ni - , myNodeId :: nid - , myAddress :: SockAddr - } - deriving Generic - -deriving instance (Eq ni, Eq nid) => Eq (Info ni nid) -deriving instance (Show ni, Show nid) => Show (Info ni nid) - --- instance (Eq ip, Serialize ip) => Serialize (Info ip) - --- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ --- 160. The routing table is subdivided into 'Bucket's that each cover --- a portion of the space. An empty table has one bucket with an ID --- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" --- is inserted into the table, it is placed within the bucket that has --- @min <= N < max@. An empty table has only one bucket so any node --- must fit within it. Each bucket can only hold 'K' nodes, currently --- eight, before becoming 'Full'. When a bucket is full of known good --- nodes, no more nodes may be added unless our own 'NodeId' falls --- within the range of the 'Bucket'. In that case, the bucket is --- replaced by two new buckets each with half the range of the old --- bucket and the nodes from the old bucket are distributed among the --- two new ones. For a new table with only one bucket, the full bucket --- is always split into two new buckets covering the ranges @0..2 ^ --- 159@ and @2 ^ 159..2 ^ 160@. --- -data BucketList ni = forall s. Reifies s (Compare ni) => - BucketList { thisNode :: !ni - -- | Non-empty list of buckets. - , buckets :: [Bucket s ni] - } - -mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b -mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts) - $ \p -> BucketList - { thisNode = f self - , buckets = map (resolve p . mapBucket f) bkts - } - where - resolve :: Proxy s -> Bucket s ni -> Bucket s ni - resolve = const id - -instance (Eq ni) => Eq (BucketList ni) where - (==) = (==) `on` Network.DHT.Routing.toList - -#if 0 - -instance Serialize NominalDiffTime where - put = putWord32be . fromIntegral . fromEnum - get = (toEnum . fromIntegral) <$> getWord32be - -#endif - -#if CAN_SHOW_BUCKET -deriving instance (Show ni) => Show (BucketList ni) -#else -instance Show ni => Show (BucketList ni) where - showsPrec d (BucketList self bkts) = - mappend "BucketList " - . showsPrec (d+1) self - . mappend " (fromList " - . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts) - . mappend ") " -#endif - -#if 0 - --- | Normally, routing table should be saved between invocations of --- the client software. Note that you don't need to store /this/ --- 'NodeId' since it is already included in routing table. -instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList) - -#endif - --- | Shape of the table. -instance Pretty (BucketList ni) where - pPrint t - | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss - | otherwise = brackets $ - PP.int (L.sum ss) <> " nodes, " <> - PP.int bucketCount <> " buckets" - where - bucketCount = L.length ss - ss = shape t - --- | Empty table with specified /spine/ node id. --- --- XXX: The comparison function argument is awkward here. -nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni -nullTable cmp hsh ni n = - reify (Compare cmp hsh) - $ \p -> BucketList - ni - [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)] - where - empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp - empty = const $ PSQ.empty - -#if 0 - --- | Test if table is empty. In this case DHT should start --- bootstrapping process until table becomes 'full'. -null :: BucketList -> Bool -null (Tip _ _ b) = PSQ.null $ bktNodes b -null _ = False - --- | Test if table have maximum number of nodes. No more nodes can be --- 'insert'ed, except old ones becomes bad. -full :: BucketList -> Bool -full (Tip _ n _) = n == 0 -full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t -full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t - --- | Get the /spine/ node id. -thisId :: BucketList -> NodeId -thisId (Tip nid _ _) = nid -thisId (Zero table _) = thisId table -thisId (One _ table) = thisId table - --- | Number of nodes in a bucket or a table. -type NodeCount = Int - -#endif - --- | Internally, routing table is similar to list of buckets or a --- /matrix/ of nodes. This function returns the shape of the matrix. -shape :: BucketList ni -> [Int] -shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl - -#if 0 - --- | Get number of nodes in the table. -size :: BucketList -> NodeCount -size = L.sum . shape - --- | Get number of buckets in the table. -depth :: BucketList -> BucketCount -depth = L.length . shape - -#endif - -lookupBucket :: forall ni nid x. - ( -- FiniteBits nid - Ord nid - ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x -lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts - where - d = kademliaXor space nid (kademliaLocation space self) - - go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] - go i bs (bucket : buckets) - | kademliaTestBit space d i = bucket : buckets ++ bs - | otherwise = go (succ i) (bucket:bs) buckets - go _ bs [] = bs - -bucketNumber :: forall ni nid. - KademliaSpace nid ni -> nid -> BucketList ni -> Int -bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts - where - d = kademliaXor space nid (kademliaLocation space self) - - go :: Word -> [Bucket s ni] -> Word - go i (bucket : buckets) - | kademliaTestBit space d i = i - | otherwise = go (succ i) buckets - go i [] = i - - -compatibleNodeId :: forall ni nid. - ( Serialize nid, FiniteBits nid) => - (ni -> nid) -> BucketList ni -> IO nid -compatibleNodeId nodeId tbl = genBucketSample prefix br - where - br = bucketRange (L.length (shape tbl) - 1) True - nodeIdSize = finiteBitSize (undefined :: nid) `div` 8 - bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0 - prefix = either error id $ S.decode bs - -tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8] -tablePrefix testbit = map (packByte . take 8 . (++repeat False)) - . chunksOf 8 - . tableBits testbit - where - packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] - bitmask ix True = bit ix - bitmask _ _ = 0 - -tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool] -tableBits testbit (BucketList self bkts) = - zipWith const (map (testbit self) [0..]) - bkts - -selfNode :: BucketList ni -> ni -selfNode (BucketList self _) = self - -chunksOf :: Int -> [e] -> [[e]] -chunksOf i ls = map (take i) (build (splitter ls)) where - splitter :: [e] -> ([e] -> a -> a) -> a -> a - splitter [] _ n = n - splitter l c n = l `c` splitter (drop i l) c n - -build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] -build g = g (:) [] - - - --- | Count of closest nodes in find_node reply. -type K = Int - --- | Default 'K' is equal to 'defaultBucketSize'. -defaultK :: K -defaultK = 8 - -#if 0 -class TableKey dht k where - toNodeId :: k -> NodeId - -instance TableKey dht (NodeId) where - toNodeId = id - -#endif - --- | In Kademlia, the distance metric is XOR and the result is --- interpreted as an unsigned integer. -newtype NodeDistance nodeid = NodeDistance nodeid - deriving (Eq, Ord) - --- | distance(A,B) = |A xor B| Smaller values are closer. -distance :: Bits nid => nid -> nid -> NodeDistance nid -distance a b = NodeDistance $ xor a b - --- | Order by closeness: nearest nodes first. -rank :: ( Ord nid - ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] -rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) - - --- | Get a list of /K/ closest nodes using XOR metric. Used in --- 'find_node' and 'get_peers' queries. -kclosest :: ( -- FiniteBits nid - Ord nid - ) => - KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] -kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) - ++ rank space nid (L.concat everyone) - where - (bucket,everyone) = - L.splitAt 1 - . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) - $ tbl - - - -{----------------------------------------------------------------------- --- Routing ------------------------------------------------------------------------} - -splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => - ( Reifies s (Compare ni) ) => - (ni -> Word -> Bool) - -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ] -splitTip testNodeBit ni i bucket - | testNodeBit ni i = [zeros , ones ] - | otherwise = [ones , zeros ] - where - (ones, zeros) = split testNodeBit i bucket - --- | Used in each query. --- --- TODO: Kademlia non-empty subtrees should should split if they have less than --- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia --- paper. The rule requiring additional splits is in section 2.4. -modifyBucket - :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => - forall ni nid xs. - KademliaSpace nid ni - -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni) -modifyBucket space nid f (BucketList self bkts) - = second (BucketList self) <$> go (0 :: BitIx) bkts - where - d = kademliaXor space nid (kademliaLocation space self) - - -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni]) - - go !i (bucket : buckets@(_:_)) - | kademliaTestBit space d i = second (: buckets) <$> f bucket - | otherwise = second (bucket :) <$> go (succ i) buckets - - go !i [bucket] = second (: []) <$> f bucket <|> gosplit - where - gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space - . kademliaLocation space ) - self - i - bucket) - | otherwise = Nothing -- Limit the number of buckets. - - -bktCount :: BucketList ni -> Int -bktCount (BucketList _ bkts) = L.length bkts - --- | Triggering event for atomic table update -data Event ni = TryInsert { foreignNode :: ni } - | PingResult { foreignNode :: ni , ponged :: Bool } - -#if 0 -deriving instance Eq (NodeId) => Eq (Event) -deriving instance ( Show ip - , Show (NodeId) - , Show u - ) => Show (Event) - -#endif - -eventId :: (ni -> nid) -> Event ni -> nid -eventId nodeId (TryInsert ni) = nodeId ni -eventId nodeId (PingResult ni _) = nodeId ni - - --- | Actions requested by atomic table update -data CheckPing ni = CheckPing [ni] - -#if 0 - -deriving instance Eq (NodeId) => Eq (CheckPing) -deriving instance ( Show ip - , Show (NodeId) - , Show u - ) => Show (CheckPing) - -#endif - - --- | Call on every inbound packet (including requested ping results). --- Returns a triple (was_inserted, to_ping, tbl') where --- --- [ /was_inserted/ ] True if the node was added to the routing table. --- --- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'. --- This will be empty if /was_inserted/, but a non-inserted node --- may be added to a replacement queue and will be inserted if --- one of the items in this list time out. --- --- [ /tbl'/ ] The updated routing 'BucketList'. --- -updateForInbound :: - KademliaSpace nid ni - -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni) -updateForInbound space tm ni tbl@(BucketList _ bkts) = - maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl')) - $ modifyBucket space - (kademliaLocation space ni) - (updateBucketForInbound tm ni) - tbl - --- | Update the routing table with the results of a ping. --- --- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the --- routing table and the node /b/, with timestamp /tm/, has taken its place. -updateForPingResult :: - KademliaSpace nid ni - -> ni -- ^ The pinged node. - -> Bool -- ^ True if we got a reply, False if it timed out. - -> BucketList ni -- ^ The routing table. - -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni ) -updateForPingResult space ni got_reply tbl = - fromMaybe ([],tbl) - $ modifyBucket space - (kademliaLocation space ni) - (updateBucketForPingResult ni got_reply) - tbl - - -{----------------------------------------------------------------------- --- Conversion ------------------------------------------------------------------------} - -type TableEntry ni = (ni, Timestamp) - -tableEntry :: NodeEntry ni -> TableEntry ni -tableEntry (a :-> b) = (a, b) - -toList :: BucketList ni -> [[TableEntry ni]] -toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts - -data KademliaSpace nid ni = KademliaSpace - { -- | Given a node record (probably including IP address), yields a - -- kademlia xor-metric location. - kademliaLocation :: ni -> nid - -- | Used when comparing locations. This is similar to - -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so - -- that 0 is the most significant bit. - , kademliaTestBit :: nid -> Word -> Bool - -- | The Kademlia xor-metric. - , kademliaXor :: nid -> nid -> nid - - , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid - } - -contramapKS f ks = ks - { kademliaLocation = kademliaLocation ks . f - } - diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs index 5fb1e334..873fc8c0 100644 --- a/src/Network/Kademlia.hs +++ b/src/Network/Kademlia.hs @@ -11,7 +11,7 @@ import Data.Maybe import qualified Data.Set as Set import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) -import Network.DHT.Routing as R +import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else diff --git a/src/Network/Kademlia/Routing.hs b/src/Network/Kademlia/Routing.hs new file mode 100644 index 00000000..7f76ac77 --- /dev/null +++ b/src/Network/Kademlia/Routing.hs @@ -0,0 +1,798 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Every node maintains a routing table of known good nodes. The +-- nodes in the routing table are used as starting points for +-- queries in the DHT. Nodes from the routing table are returned in +-- response to queries from other nodes. +-- +-- For more info see: +-- +-- +{-# LANGUAGE CPP #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} +module Network.Kademlia.Routing + {- + ( -- * BucketList + BucketList + , Info(..) + + -- * Attributes + , BucketCount + , defaultBucketCount + , BucketSize + , defaultBucketSize + , NodeCount + + -- * Query + , Network.Kademlia.Routing.null + , Network.Kademlia.Routing.full + , thisId + , shape + , Network.Kademlia.Routing.size + , Network.Kademlia.Routing.depth + , compatibleNodeId + + -- * Lookup + , K + , defaultK + , TableKey (..) + , kclosest + + -- * Construction + , Network.Kademlia.Routing.nullTable + , Event(..) + , CheckPing(..) + , Network.Kademlia.Routing.insert + + -- * Conversion + , Network.Kademlia.Routing.TableEntry + , Network.Kademlia.Routing.toList + + -- * Routing + , Timestamp + , getTimestamp + ) -} where + +import Control.Applicative as A +import Control.Arrow +import Control.Monad +import Data.Function +import Data.Functor.Identity +import Data.List as L hiding (insert) +import Data.Maybe +import Data.Monoid +import Data.Wrapper.PSQ as PSQ +import Data.Serialize as S hiding (Result, Done) +import qualified Data.Sequence as Seq +import Data.Time +import Data.Time.Clock.POSIX +import Data.Word +import GHC.Generics +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) +import qualified Data.ByteString as BS +import Data.Bits +import Data.Ord +import Data.Reflection +import Network.Address +import Data.Typeable +import Data.Coerce +import Data.Hashable + +-- | Last time the node was responding to our queries. +-- +-- Not all nodes that we learn about are equal. Some are \"good\" and +-- some are not. Many nodes using the DHT are able to send queries +-- and receive responses, but are not able to respond to queries +-- from other nodes. It is important that each node's routing table +-- must contain only known good nodes. A good node is a node has +-- responded to one of our queries within the last 15 minutes. A +-- node is also good if it has ever responded to one of our queries +-- and has sent us a query within the last 15 minutes. After 15 +-- minutes of inactivity, a node becomes questionable. Nodes become +-- bad when they fail to respond to multiple queries in a row. Nodes +-- that we know are good are given priority over nodes with unknown +-- status. +-- +type Timestamp = POSIXTime + +getTimestamp :: IO Timestamp +getTimestamp = do + utcTime <- getCurrentTime + return $ utcTimeToPOSIXSeconds utcTime + + + +{----------------------------------------------------------------------- + Bucket +-----------------------------------------------------------------------} +-- +-- When a k-bucket is full and a new node is discovered for that +-- k-bucket, the least recently seen node in the k-bucket is +-- PINGed. If the node is found to be still alive, the new node is +-- place in a secondary list, a replacement cache. The replacement +-- cache is used only if a node in the k-bucket stops responding. In +-- other words: new nodes are used only when older nodes disappear. + +-- | Timestamp - last time this node is pinged. +type NodeEntry ni = Binding ni Timestamp + + +-- | Maximum number of 'NodeInfo's stored in a bucket. Most clients +-- use this value. +defaultBucketSize :: Int +defaultBucketSize = 8 + +data QueueMethods m elem fifo = QueueMethods + { pushBack :: elem -> fifo -> m fifo + , popFront :: fifo -> m (Maybe elem, fifo) + , emptyQueue :: m fifo + } + +{- +fromQ :: Functor m => + ( a -> b ) + -> ( b -> a ) + -> QueueMethods m elem a + -> QueueMethods m elem b +fromQ embed project QueueMethods{..} = + QueueMethods { pushBack = \e -> fmap embed . pushBack e . project + , popFront = fmap (second embed) . popFront . project + , emptyQueue = fmap embed emptyQueue + } +-} + +seqQ :: QueueMethods Identity ni (Seq.Seq ni) +seqQ = QueueMethods + { pushBack = \e fifo -> pure (fifo Seq.|> e) + , popFront = \fifo -> case Seq.viewl fifo of + e Seq.:< fifo' -> pure (Just e, fifo') + Seq.EmptyL -> pure (Nothing, Seq.empty) + , emptyQueue = pure Seq.empty + } + +type BucketQueue ni = Seq.Seq ni + +bucketQ :: QueueMethods Identity ni (BucketQueue ni) +bucketQ = seqQ + + +data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int) + +contramapC :: (b -> a) -> Compare a -> Compare b +contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b)) + (\s x -> hsh s (f x)) + +newtype Ordered' s a = Ordered a + deriving (Show) + +-- | Hack to avoid UndecidableInstances +newtype Shrink a = Shrink a + deriving (Show) + +type Ordered s a = Ordered' s (Shrink a) + +instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where + a == b = (compare a b == EQ) + +instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where + compare a b = cmp (coerce a) (coerce b) + where Compare cmp _ = reflect (Proxy :: Proxy s) + +instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where + hashWithSalt salt x = hash salt (coerce x) + where Compare _ hash = reflect (Proxy :: Proxy s) + +-- | Bucket is also limited in its length — thus it's called k-bucket. +-- When bucket becomes full, we should split it in two lists by +-- current span bit. Span bit is defined by depth in the routing +-- table tree. Size of the bucket should be choosen such that it's +-- very unlikely that all nodes in bucket fail within an hour of +-- each other. +data Bucket s ni = Bucket + { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes + , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs + } deriving (Generic) + +#define CAN_SHOW_BUCKET 0 + +#if CAN_SHOW_BUCKET +deriving instance Show ni => Show (Bucket s ni) +#endif + +bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni +bucketCompare _ = reflect (Proxy :: Proxy s) + +mapBucket :: ( Reifies s (Compare a) + , Reifies t (Compare ni) + ) => (a -> ni) -> Bucket s a -> Bucket t ni +mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns) + (fmap (second f) q) + where f' = coerce . f . coerce + + +#if 0 + +{- +getGenericNode :: ( Serialize (NodeId) + , Serialize ip + , Serialize u + ) => Get (NodeInfo) +getGenericNode = do + nid <- get + naddr <- get + u <- get + return NodeInfo + { nodeId = nid + , nodeAddr = naddr + , nodeAnnotation = u + } + +putGenericNode :: ( Serialize (NodeId) + , Serialize ip + , Serialize u + ) => NodeInfo -> Put +putGenericNode (NodeInfo nid naddr u) = do + put nid + put naddr + put u + +instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where + get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) + put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes +-} + +#endif + +psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p +psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs + +psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)] +psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq + +-- | Update interval, in seconds. +delta :: NominalDiffTime +delta = 15 * 60 + +-- | Should maintain a set of stable long running nodes. +-- +-- Note: pings are triggerd only when a bucket is full. +updateBucketForInbound curTime info bucket + -- Just update timestamp if a node is already in bucket. + -- + -- Note PingResult events should only occur for nodes we requested a ping for, + -- and those will always already be in the routing queue and will get their + -- timestamp updated here, since 'TryInsert' is called on every inbound packet, + -- including ping results. + | already_have + = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime ) + -- bucket is good, but not full => we can insert a new node + | PSQ.size (bktNodes bucket) < defaultBucketSize + = pure ( [], map_ns $ PSQ.insert (coerce info) curTime ) + -- If there are any questionable nodes in the bucket have not been + -- seen in the last 15 minutes, the least recently seen node is + -- pinged. If any nodes in the bucket are known to have become bad, + -- then one is replaced by the new node in the next insertBucket + -- iteration. + | not (L.null stales) + = pure ( stales + , bucket { -- Update timestamps so that we don't redundantly ping. + bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket + -- Update queue with the pending NodeInfo in case of ping fail. + , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } ) + -- When the bucket is full of good nodes, the new node is simply discarded. + -- We must return 'A.empty' here to ensure that bucket splitting happens + -- inside 'modifyBucket'. + | otherwise = A.empty + where + -- We (take 1) to keep a 1-to-1 correspondence between pending pings and + -- waiting nodes in the bktQ. This way, we don't have to worry about what + -- to do with failed pings for which there is no ready replacements. + stales = -- One stale: + do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) + guard (t < curTime - delta) + return $ coerce n + -- All stale: + -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket + + already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket) + + map_ns f = bucket { bktNodes = f (bktNodes bucket) } + -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } + +updateBucketForPingResult bad_node got_response bucket + = pure ( map (,Nothing) forgotten + ++ map (second Just) replacements + , Bucket (foldr replace + (bktNodes bucket) + replacements) + popped + ) + where + (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) + + -- Dropped from accepted, replaced by pending. + replacements | got_response = [] -- Timestamp was already updated by TryInsert. + | Just info <- top = do + -- Insert only if there's a removal. + _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket) + return (bad_node, info) + | otherwise = [] + + -- Dropped from the pending queue without replacing. + forgotten | got_response = maybeToList $ fmap snd top + | otherwise = [] + + + replace (bad_node, (tm, info)) = + PSQ.insert (coerce info) tm + . PSQ.delete (coerce bad_node) + + +updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp +updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales + +type BitIx = Word + +partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) +partitionQ imp test q0 = do + pass0 <- emptyQueue imp + fail0 <- emptyQueue imp + let flipfix a b f = fix f a b + flipfix q0 (pass0,fail0) $ \rec q qs -> do + (mb,q') <- popFront imp q + case mb of + Nothing -> return qs + Just e -> do qs' <- select (pushBack imp e) qs + rec q' qs' + where + select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) + select f = if test e then \(a,b) -> flip (,) b <$> f a + else \(a,b) -> (,) a <$> f b + + + +split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => + forall ni s. ( Reifies s (Compare ni) ) => + (ni -> Word -> Bool) + -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni) +split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs) + where + (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b + (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b + + spanBit :: ni -> Bool + spanBit entry = testNodeIdBit entry i + + +{----------------------------------------------------------------------- +-- BucketList +-----------------------------------------------------------------------} + +defaultBucketCount :: Int +defaultBucketCount = 20 + +defaultMaxBucketCount :: Word +defaultMaxBucketCount = 24 + +data Info ni nid = Info + { myBuckets :: BucketList ni + , myNodeId :: nid + , myAddress :: SockAddr + } + deriving Generic + +deriving instance (Eq ni, Eq nid) => Eq (Info ni nid) +deriving instance (Show ni, Show nid) => Show (Info ni nid) + +-- instance (Eq ip, Serialize ip) => Serialize (Info ip) + +-- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ +-- 160. The routing table is subdivided into 'Bucket's that each cover +-- a portion of the space. An empty table has one bucket with an ID +-- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" +-- is inserted into the table, it is placed within the bucket that has +-- @min <= N < max@. An empty table has only one bucket so any node +-- must fit within it. Each bucket can only hold 'K' nodes, currently +-- eight, before becoming 'Full'. When a bucket is full of known good +-- nodes, no more nodes may be added unless our own 'NodeId' falls +-- within the range of the 'Bucket'. In that case, the bucket is +-- replaced by two new buckets each with half the range of the old +-- bucket and the nodes from the old bucket are distributed among the +-- two new ones. For a new table with only one bucket, the full bucket +-- is always split into two new buckets covering the ranges @0..2 ^ +-- 159@ and @2 ^ 159..2 ^ 160@. +-- +data BucketList ni = forall s. Reifies s (Compare ni) => + BucketList { thisNode :: !ni + -- | Non-empty list of buckets. + , buckets :: [Bucket s ni] + } + +mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b +mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts) + $ \p -> BucketList + { thisNode = f self + , buckets = map (resolve p . mapBucket f) bkts + } + where + resolve :: Proxy s -> Bucket s ni -> Bucket s ni + resolve = const id + +instance (Eq ni) => Eq (BucketList ni) where + (==) = (==) `on` Network.Kademlia.Routing.toList + +#if 0 + +instance Serialize NominalDiffTime where + put = putWord32be . fromIntegral . fromEnum + get = (toEnum . fromIntegral) <$> getWord32be + +#endif + +#if CAN_SHOW_BUCKET +deriving instance (Show ni) => Show (BucketList ni) +#else +instance Show ni => Show (BucketList ni) where + showsPrec d (BucketList self bkts) = + mappend "BucketList " + . showsPrec (d+1) self + . mappend " (fromList " + . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts) + . mappend ") " +#endif + +#if 0 + +-- | Normally, routing table should be saved between invocations of +-- the client software. Note that you don't need to store /this/ +-- 'NodeId' since it is already included in routing table. +instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList) + +#endif + +-- | Shape of the table. +instance Pretty (BucketList ni) where + pPrint t + | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss + | otherwise = brackets $ + PP.int (L.sum ss) <> " nodes, " <> + PP.int bucketCount <> " buckets" + where + bucketCount = L.length ss + ss = shape t + +-- | Empty table with specified /spine/ node id. +-- +-- XXX: The comparison function argument is awkward here. +nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni +nullTable cmp hsh ni n = + reify (Compare cmp hsh) + $ \p -> BucketList + ni + [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)] + where + empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp + empty = const $ PSQ.empty + +#if 0 + +-- | Test if table is empty. In this case DHT should start +-- bootstrapping process until table becomes 'full'. +null :: BucketList -> Bool +null (Tip _ _ b) = PSQ.null $ bktNodes b +null _ = False + +-- | Test if table have maximum number of nodes. No more nodes can be +-- 'insert'ed, except old ones becomes bad. +full :: BucketList -> Bool +full (Tip _ n _) = n == 0 +full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t +full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t + +-- | Get the /spine/ node id. +thisId :: BucketList -> NodeId +thisId (Tip nid _ _) = nid +thisId (Zero table _) = thisId table +thisId (One _ table) = thisId table + +-- | Number of nodes in a bucket or a table. +type NodeCount = Int + +#endif + +-- | Internally, routing table is similar to list of buckets or a +-- /matrix/ of nodes. This function returns the shape of the matrix. +shape :: BucketList ni -> [Int] +shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl + +#if 0 + +-- | Get number of nodes in the table. +size :: BucketList -> NodeCount +size = L.sum . shape + +-- | Get number of buckets in the table. +depth :: BucketList -> BucketCount +depth = L.length . shape + +#endif + +lookupBucket :: forall ni nid x. + ( -- FiniteBits nid + Ord nid + ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x +lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts + where + d = kademliaXor space nid (kademliaLocation space self) + + go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] + go i bs (bucket : buckets) + | kademliaTestBit space d i = bucket : buckets ++ bs + | otherwise = go (succ i) (bucket:bs) buckets + go _ bs [] = bs + +bucketNumber :: forall ni nid. + KademliaSpace nid ni -> nid -> BucketList ni -> Int +bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts + where + d = kademliaXor space nid (kademliaLocation space self) + + go :: Word -> [Bucket s ni] -> Word + go i (bucket : buckets) + | kademliaTestBit space d i = i + | otherwise = go (succ i) buckets + go i [] = i + + +compatibleNodeId :: forall ni nid. + ( Serialize nid, FiniteBits nid) => + (ni -> nid) -> BucketList ni -> IO nid +compatibleNodeId nodeId tbl = genBucketSample prefix br + where + br = bucketRange (L.length (shape tbl) - 1) True + nodeIdSize = finiteBitSize (undefined :: nid) `div` 8 + bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0 + prefix = either error id $ S.decode bs + +tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8] +tablePrefix testbit = map (packByte . take 8 . (++repeat False)) + . chunksOf 8 + . tableBits testbit + where + packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] + bitmask ix True = bit ix + bitmask _ _ = 0 + +tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool] +tableBits testbit (BucketList self bkts) = + zipWith const (map (testbit self) [0..]) + bkts + +selfNode :: BucketList ni -> ni +selfNode (BucketList self _) = self + +chunksOf :: Int -> [e] -> [[e]] +chunksOf i ls = map (take i) (build (splitter ls)) where + splitter :: [e] -> ([e] -> a -> a) -> a -> a + splitter [] _ n = n + splitter l c n = l `c` splitter (drop i l) c n + +build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] +build g = g (:) [] + + + +-- | Count of closest nodes in find_node reply. +type K = Int + +-- | Default 'K' is equal to 'defaultBucketSize'. +defaultK :: K +defaultK = 8 + +#if 0 +class TableKey dht k where + toNodeId :: k -> NodeId + +instance TableKey dht (NodeId) where + toNodeId = id + +#endif + +-- | In Kademlia, the distance metric is XOR and the result is +-- interpreted as an unsigned integer. +newtype NodeDistance nodeid = NodeDistance nodeid + deriving (Eq, Ord) + +-- | distance(A,B) = |A xor B| Smaller values are closer. +distance :: Bits nid => nid -> nid -> NodeDistance nid +distance a b = NodeDistance $ xor a b + +-- | Order by closeness: nearest nodes first. +rank :: ( Ord nid + ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] +rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) + + +-- | Get a list of /K/ closest nodes using XOR metric. Used in +-- 'find_node' and 'get_peers' queries. +kclosest :: ( -- FiniteBits nid + Ord nid + ) => + KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] +kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) + ++ rank space nid (L.concat everyone) + where + (bucket,everyone) = + L.splitAt 1 + . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) + $ tbl + + + +{----------------------------------------------------------------------- +-- Routing +-----------------------------------------------------------------------} + +splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => + ( Reifies s (Compare ni) ) => + (ni -> Word -> Bool) + -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ] +splitTip testNodeBit ni i bucket + | testNodeBit ni i = [zeros , ones ] + | otherwise = [ones , zeros ] + where + (ones, zeros) = split testNodeBit i bucket + +-- | Used in each query. +-- +-- TODO: Kademlia non-empty subtrees should should split if they have less than +-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia +-- paper. The rule requiring additional splits is in section 2.4. +modifyBucket + :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => + forall ni nid xs. + KademliaSpace nid ni + -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni) +modifyBucket space nid f (BucketList self bkts) + = second (BucketList self) <$> go (0 :: BitIx) bkts + where + d = kademliaXor space nid (kademliaLocation space self) + + -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni]) + + go !i (bucket : buckets@(_:_)) + | kademliaTestBit space d i = second (: buckets) <$> f bucket + | otherwise = second (bucket :) <$> go (succ i) buckets + + go !i [bucket] = second (: []) <$> f bucket <|> gosplit + where + gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space + . kademliaLocation space ) + self + i + bucket) + | otherwise = Nothing -- Limit the number of buckets. + + +bktCount :: BucketList ni -> Int +bktCount (BucketList _ bkts) = L.length bkts + +-- | Triggering event for atomic table update +data Event ni = TryInsert { foreignNode :: ni } + | PingResult { foreignNode :: ni , ponged :: Bool } + +#if 0 +deriving instance Eq (NodeId) => Eq (Event) +deriving instance ( Show ip + , Show (NodeId) + , Show u + ) => Show (Event) + +#endif + +eventId :: (ni -> nid) -> Event ni -> nid +eventId nodeId (TryInsert ni) = nodeId ni +eventId nodeId (PingResult ni _) = nodeId ni + + +-- | Actions requested by atomic table update +data CheckPing ni = CheckPing [ni] + +#if 0 + +deriving instance Eq (NodeId) => Eq (CheckPing) +deriving instance ( Show ip + , Show (NodeId) + , Show u + ) => Show (CheckPing) + +#endif + + +-- | Call on every inbound packet (including requested ping results). +-- Returns a triple (was_inserted, to_ping, tbl') where +-- +-- [ /was_inserted/ ] True if the node was added to the routing table. +-- +-- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'. +-- This will be empty if /was_inserted/, but a non-inserted node +-- may be added to a replacement queue and will be inserted if +-- one of the items in this list time out. +-- +-- [ /tbl'/ ] The updated routing 'BucketList'. +-- +updateForInbound :: + KademliaSpace nid ni + -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni) +updateForInbound space tm ni tbl@(BucketList _ bkts) = + maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl')) + $ modifyBucket space + (kademliaLocation space ni) + (updateBucketForInbound tm ni) + tbl + +-- | Update the routing table with the results of a ping. +-- +-- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the +-- routing table and the node /b/, with timestamp /tm/, has taken its place. +updateForPingResult :: + KademliaSpace nid ni + -> ni -- ^ The pinged node. + -> Bool -- ^ True if we got a reply, False if it timed out. + -> BucketList ni -- ^ The routing table. + -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni ) +updateForPingResult space ni got_reply tbl = + fromMaybe ([],tbl) + $ modifyBucket space + (kademliaLocation space ni) + (updateBucketForPingResult ni got_reply) + tbl + + +{----------------------------------------------------------------------- +-- Conversion +-----------------------------------------------------------------------} + +type TableEntry ni = (ni, Timestamp) + +tableEntry :: NodeEntry ni -> TableEntry ni +tableEntry (a :-> b) = (a, b) + +toList :: BucketList ni -> [[TableEntry ni]] +toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts + +data KademliaSpace nid ni = KademliaSpace + { -- | Given a node record (probably including IP address), yields a + -- kademlia xor-metric location. + kademliaLocation :: ni -> nid + -- | Used when comparing locations. This is similar to + -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so + -- that 0 is the most significant bit. + , kademliaTestBit :: nid -> Word -> Bool + -- | The Kademlia xor-metric. + , kademliaXor :: nid -> nid -> nid + + , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid + } + +contramapKS f ks = ks + { kademliaLocation = kademliaLocation ks . f + } + diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs index 195bed14..71107fbd 100644 --- a/src/Network/Kademlia/Search.hs +++ b/src/Network/Kademlia/Search.hs @@ -26,7 +26,7 @@ import qualified Data.MinMaxPSQ as MM import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) import Network.Address hiding (NodeId) -import Network.DHT.Routing as R +import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index 7279c2e3..c49cfe90 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs @@ -71,7 +71,7 @@ import Network.Address (Address, WantIP (..), either4or6, sockAddrPort, testIdBit, toSockAddr, un4map) import Network.Kademlia.Search (Search (..)) -import qualified Network.DHT.Routing as R +import qualified Network.Kademlia.Routing as R import Network.QueryResponse import Network.Socket import System.Endian diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs index 2dc183cd..4e43c4a7 100644 --- a/src/Network/Tox/DHT/Handlers.hs +++ b/src/Network/Tox/DHT/Handlers.hs @@ -11,7 +11,7 @@ import Network.Kademlia.Search import qualified Data.Wrapper.PSQInt as Int import Network.Kademlia import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort) -import qualified Network.DHT.Routing as R +import qualified Network.Kademlia.Routing as R import Control.TriadCommittee import System.Global6 diff --git a/src/Network/Tox/Onion/Handlers.hs b/src/Network/Tox/Onion/Handlers.hs index f02bac98..9dc6177c 100644 --- a/src/Network/Tox/Onion/Handlers.hs +++ b/src/Network/Tox/Onion/Handlers.hs @@ -18,7 +18,7 @@ import Data.Serialize as S import qualified Data.Wrapper.PSQInt as Int import Network.Kademlia import Network.Address (WantIP (..), ipFamily, testIdBit) -import qualified Network.DHT.Routing as R +import qualified Network.Kademlia.Routing as R import Control.TriadCommittee import qualified Data.MinMaxPSQ as MinMaxPSQ ;import Data.MinMaxPSQ (MinMaxPSQ') -- cgit v1.2.3