From 8c33deac14ca92ef67afc7fbcd3f67bc19317f88 Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 8 Jun 2017 03:07:13 -0400 Subject: WIP: Adapting DHT to Tox network (part 6). --- src/Network/BitTorrent/Address.hs | 242 +----------------------------- src/Network/BitTorrent/DHT/Message.hs | 4 +- src/Network/BitTorrent/DHT/Query.hs | 3 +- src/Network/BitTorrent/DHT/Routing.hs | 268 ++++++++++++++++++---------------- src/Network/BitTorrent/DHT/Search.hs | 22 ++- src/Network/BitTorrent/DHT/Session.hs | 42 +++++- 6 files changed, 201 insertions(+), 380 deletions(-) (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/Address.hs b/src/Network/BitTorrent/Address.hs index 560ac1ef..f364abbe 100644 --- a/src/Network/BitTorrent/Address.hs +++ b/src/Network/BitTorrent/Address.hs @@ -13,6 +13,7 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} @@ -60,13 +61,10 @@ module Network.BitTorrent.Address -- * Node -- ** Id , NodeId - , nodeIdSize , testIdBit , genNodeId , bucketRange , genBucketSample - , bep42 - , bep42s -- ** Info , NodeAddr (..) @@ -129,47 +127,15 @@ import System.Locale (defaultTimeLocale) #endif import System.Entropy import Data.Digest.CRC32C -import qualified Network.RPC as RPC +import Network.RPC as RPC import Network.KRPC.Message (KMessageOf) -import Network.DHT.Mainline +-- import Network.DHT.Mainline -- import Paths_bittorrent (version) -{----------------------------------------------------------------------- --- Address ------------------------------------------------------------------------} - instance Pretty UTCTime where pPrint = PP.text . show -class (Eq a, Serialize a, Typeable a, Hashable a, Pretty a) - => Address a where - toSockAddr :: a -> SockAddr - fromSockAddr :: SockAddr -> Maybe a - -fromAddr :: (Address a, Address b) => a -> Maybe b -fromAddr = fromSockAddr . toSockAddr - --- | Note that port is zeroed. -instance Address IPv4 where - toSockAddr = SockAddrInet 0 . toHostAddress - fromSockAddr (SockAddrInet _ h) = Just (fromHostAddress h) - fromSockAddr _ = Nothing - --- | Note that port is zeroed. -instance Address IPv6 where - toSockAddr h = SockAddrInet6 0 0 (toHostAddress6 h) 0 - fromSockAddr (SockAddrInet6 _ _ h _) = Just (fromHostAddress6 h) - fromSockAddr _ = Nothing - --- | Note that port is zeroed. -instance Address IP where - toSockAddr (IPv4 h) = toSockAddr h - toSockAddr (IPv6 h) = toSockAddr h - fromSockAddr sa = - IPv4 <$> fromSockAddr sa - <|> IPv6 <$> fromSockAddr sa - setPort :: PortNumber -> SockAddr -> SockAddr setPort port (SockAddrInet _ h ) = SockAddrInet port h setPort port (SockAddrInet6 _ f h s) = SockAddrInet6 port f h s @@ -388,21 +354,6 @@ instance BEncode PortNumber where = pure $ fromIntegral n | otherwise = decodingError $ "PortNumber: " ++ show n #endif - -instance Serialize PortNumber where - get = fromIntegral <$> getWord16be - {-# INLINE get #-} - put = putWord16be . fromIntegral - {-# INLINE put #-} - -instance Hashable PortNumber where - hashWithSalt s = hashWithSalt s . fromEnum - {-# INLINE hashWithSalt #-} - -instance Pretty PortNumber where - pPrint = PP.int . fromEnum - {-# INLINE pPrint #-} - {----------------------------------------------------------------------- -- IP addr -----------------------------------------------------------------------} @@ -457,51 +408,6 @@ instance BEncode IPv6 where {-# INLINE fromBEncode #-} #endif --- | When 'get'ing an IP it must be 'isolate'd to the appropriate --- number of bytes since we have no other way of telling which --- address type we are trying to parse -instance Serialize IP where - put (IPv4 ip) = put ip - put (IPv6 ip) = put ip - - get = do - n <- remaining - case n of - 4 -> IPv4 <$> get - 16 -> IPv6 <$> get - _ -> fail (show n ++ " is the wrong number of remaining bytes to parse IP") - -instance Serialize IPv4 where - put = putWord32host . toHostAddress - get = fromHostAddress <$> getWord32host - -instance Serialize IPv6 where - put ip = put $ toHostAddress6 ip - get = fromHostAddress6 <$> get - -instance Pretty IPv4 where - pPrint = PP.text . show - {-# INLINE pPrint #-} - -instance Pretty IPv6 where - pPrint = PP.text . show - {-# INLINE pPrint #-} - -instance Pretty IP where - pPrint = PP.text . show - {-# INLINE pPrint #-} - -instance Hashable IPv4 where - hashWithSalt = hashUsing toHostAddress - {-# INLINE hashWithSalt #-} - -instance Hashable IPv6 where - hashWithSalt s a = hashWithSalt s (toHostAddress6 a) - -instance Hashable IP where - hashWithSalt s (IPv4 h) = hashWithSalt s h - hashWithSalt s (IPv6 h) = hashWithSalt s h - {----------------------------------------------------------------------- -- Peer addr -----------------------------------------------------------------------} @@ -666,13 +572,6 @@ testIdBit :: FiniteBits bs => bs -> Word -> Bool testIdBit bs i = testBit bs (fromIntegral (finiteBitSize bs - fromIntegral i)) {-# INLINE testIdBit #-} --- TODO WARN is the 'system' random suitable for this? --- | Generate random NodeID used for the entire session. --- Distribution of ID's should be as uniform as possible. --- -genNodeId :: IO NodeId -genNodeId = NodeId . either error id . S.decode <$> getEntropy nodeIdSize - ------------------------------------------------------------------------ -- | Accepts a depth/index of a bucket and whether or not it is the last one, @@ -693,54 +592,8 @@ bucketRange depth is_last = (q,m,b) m = 2^(7-r) - 1 b = if is_last then 0 else 2^(7-r) --- | Generate a random 'NodeId' within a range suitable for a bucket. To --- obtain a sample for bucket number /index/ where /is_last/ indicates if this --- is for the current deepest bucket in our routing table: --- --- > sample <- genBucketSample nid (bucketRange index is_last) -genBucketSample :: NodeId -> (Int,Word8,Word8) -> IO NodeId -genBucketSample n qmb = genBucketSample' getEntropy n qmb - --- | Generalizion of 'genBucketSample' that accepts a byte generator --- function to use instead of the system entropy. -genBucketSample' :: Applicative m => - (Int -> m ByteString) -> NodeId -> (Int,Word8,Word8) -> m NodeId -genBucketSample' gen (NodeId self) (q,m,b) - | q <= 0 = NodeId . either error id . S.decode <$> gen nodeIdSize - | q >= nodeIdSize = pure (NodeId self) - | otherwise = NodeId . either error id . S.decode . build <$> gen (nodeIdSize - q + 1) - where - build tl = BS.init hd <> BS.cons (h .|. t) (BS.tail tl) - where - hd = BS.take q $ S.encode self - h = xor b (complement m .&. BS.last hd) - t = m .&. BS.head tl - ------------------------------------------------------------------------ -data NodeAddr a = NodeAddr - { nodeHost :: !a - , nodePort :: {-# UNPACK #-} !PortNumber - } deriving (Eq, Ord, Typeable, Functor, Foldable, Traversable) - -instance Show a => Show (NodeAddr a) where - showsPrec i NodeAddr {..} - = showsPrec i nodeHost <> showString ":" <> showsPrec i nodePort - -instance Read (NodeAddr IPv4) where - readsPrec i x = [ (fromPeerAddr a, s) | (a, s) <- readsPrec i x ] - --- | @127.0.0.1:6882@ -instance Default (NodeAddr IPv4) where - def = "127.0.0.1:6882" - --- | KRPC compatible encoding. -instance Serialize a => Serialize (NodeAddr a) where - get = NodeAddr <$> get <*> get - {-# INLINE get #-} - put NodeAddr {..} = put nodeHost >> put nodePort - {-# INLINE put #-} - #ifdef VERSION_bencoding -- | Torrent file compatible encoding. instance BEncode a => BEncode (NodeAddr a) where @@ -750,20 +603,6 @@ instance BEncode a => BEncode (NodeAddr a) where {-# INLINE fromBEncode #-} #endif -instance Hashable a => Hashable (NodeAddr a) where - hashWithSalt s NodeAddr {..} = hashWithSalt s (nodeHost, nodePort) - {-# INLINE hashWithSalt #-} - -instance Pretty ip => Pretty (NodeAddr ip) where - pPrint NodeAddr {..} = pPrint nodeHost <> ":" <> pPrint nodePort - --- | Example: --- --- @nodePort \"127.0.0.1:6881\" == 6881@ --- -instance IsString (NodeAddr IPv4) where - fromString = fromPeerAddr . fromString - fromPeerAddr :: PeerAddr a -> NodeAddr a fromPeerAddr PeerAddr {..} = NodeAddr { nodeHost = peerHost @@ -772,45 +611,10 @@ fromPeerAddr PeerAddr {..} = NodeAddr ------------------------------------------------------------------------ -data NodeInfo dht addr u = NodeInfo - { nodeId :: !(RPC.NodeId dht) - , nodeAddr :: !(NodeAddr addr) - , nodeAnnotation :: u - } deriving (Functor, Foldable, Traversable) - -deriving instance ( Show (RPC.NodeId dht) - , Show addr - , Show u ) => Show (NodeInfo dht addr u) - -mapAddress :: (addr -> b) -> NodeInfo dht addr u -> NodeInfo dht b u -mapAddress f ni = ni { nodeAddr = fmap f (nodeAddr ni) } - -traverseAddress :: Applicative f => (addr -> f b) -> NodeInfo dht addr u -> f (NodeInfo dht b u) -traverseAddress f ni = fmap (\addr -> ni { nodeAddr = addr }) $ traverse f (nodeAddr ni) - --- Warning: Eq and Ord only look at the nodeId field. -instance Eq (RPC.NodeId dht) => Eq (NodeInfo dht a u) where - a == b = (nodeId a == nodeId b) - -instance Ord (RPC.NodeId dht) => Ord (NodeInfo dht a u) where - compare = comparing nodeId - --- | KRPC 'compact list' compatible encoding: contact information for --- nodes is encoded as a 26-byte string. Also known as "Compact node --- info" the 20-byte Node ID in network byte order has the compact --- IP-address/port info concatenated to the end. -instance Serialize a => Serialize (NodeInfo KMessageOf a ()) where - get = (\a b -> NodeInfo a b ()) <$> get <*> get - put NodeInfo {..} = put nodeId >> put nodeAddr - -instance Pretty ip => Pretty (NodeInfo KMessageOf ip ()) where - pPrint NodeInfo {..} = pPrint nodeId <> "@(" <> pPrint nodeAddr <> ")" - -instance Pretty ip => Pretty [NodeInfo KMessageOf ip ()] where - pPrint = PP.vcat . PP.punctuate "," . L.map pPrint - -- | Order by closeness: nearest nodes first. -rank :: (x -> NodeId) -> NodeId -> [x] -> [x] +rank :: ( Ord (NodeId dht) + , Bits (NodeId dht) + ) => (x -> NodeId dht) -> NodeId dht -> [x] -> [x] rank f nid = L.sortBy (comparing (RPC.distance nid . f)) {----------------------------------------------------------------------- @@ -1219,40 +1023,6 @@ fingerprint pid = either (const def) id $ runGet getCI (getPeerId pid) return $ Version (catMaybes $ L.map decodeShadowVerNr str) [] --- | Yields all 8 DHT neighborhoods available to you given a particular ip --- address. -bep42s :: Address a => a -> NodeId -> [NodeId] -bep42s addr (NodeId r) = mapMaybe (bep42 addr) rs - where - rs = L.map (NodeId . change3bits r) [0..7] - --- change3bits :: ByteString -> Word8 -> ByteString --- change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n) - -change3bits :: (Num b, Bits b) => b -> b -> b -change3bits bs n = (bs .&. complement 7) .|. n - --- | Modifies a purely random 'NodeId' to one that is related to a given --- routable address in accordance with BEP 42. -bep42 :: Address a => a -> NodeId -> Maybe NodeId -bep42 addr (NodeId r) - | Just ip <- fmap S.encode (fromAddr addr :: Maybe IPv4) - <|> fmap S.encode (fromAddr addr :: Maybe IPv6) - = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) - | otherwise - = Nothing - where - ip4mask = "\x03\x0f\x3f\xff" :: ByteString - ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString - nbhood_select = (fromIntegral r :: Word8) .&. 7 - retr n = pure $ BS.drop (nodeIdSize - n) $ S.encode r - crc = flip shiftL (finiteBitSize (NodeId undefined) - 32) . fromIntegral . crc32c . BS.pack - applyMask ip = case BS.zipWith (.&.) msk ip of - (b:bs) -> (b .|. shiftL nbhood_select 5) : bs - bs -> bs - where msk | BS.length ip == 4 = ip4mask - | otherwise = ip6mask - -- | Given a string specifying a port (numeric or service name) -- and a flag indicating whether you want to support IPv6, this diff --git a/src/Network/BitTorrent/DHT/Message.hs b/src/Network/BitTorrent/DHT/Message.hs index c3df683a..c99c72bb 100644 --- a/src/Network/BitTorrent/DHT/Message.hs +++ b/src/Network/BitTorrent/DHT/Message.hs @@ -113,8 +113,10 @@ import Data.Maybe import Data.Torrent (InfoHash) import Network.BitTorrent.DHT.Token +#ifdef VERSION_bencoding import Network.KRPC () import Network.DHT.Mainline () +#endif import Network.RPC hiding (Query,Response) {----------------------------------------------------------------------- @@ -237,7 +239,7 @@ instance KRPC (Query Ping) (Response Ping) where #ifdef VERSION_bencoding newtype FindNode ip = FindNode (NodeId KMessageOf) #else -data FindNode ip = FindNode NodeId Tox.Nonce8 -- Tox: Get Nodes +data FindNode ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes #endif deriving (Show, Eq, Typeable) diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 4b386cdc..56ea262a 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -14,6 +14,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE GADTs #-} module Network.BitTorrent.DHT.Query ( -- * Handler @@ -322,7 +323,7 @@ insertNode info witnessed_ip0 = do let logMsg = "Routing table: " <> pPrint t $(logDebugS) "insertNode" (T.pack (render logMsg)) let arrival0 = TryInsert info - arrival4 = TryInsert (mapAddress fromAddr info) :: Event (Maybe IPv4) + arrival4 = TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ $(logDebugS) "insertNode" $ T.pack (show arrival4) maxbuckets <- asks (optBucketCount . options) fallbackid <- asks tentativeNodeId diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 6cf7f122..42728a53 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs @@ -13,12 +13,14 @@ -- For more info see: -- -- +{-# LANGUAGE CPP #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module Network.BitTorrent.DHT.Routing ( -- * Table @@ -59,8 +61,6 @@ module Network.BitTorrent.DHT.Routing -- * Routing , Timestamp - , Routing - , runRouting ) where import Control.Applicative as A @@ -83,10 +83,16 @@ import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) import qualified Data.ByteString as BS import Data.Bits -import Network.KRPC.Message (KMessageOf) import Data.Torrent import Network.BitTorrent.Address -import Network.DHT.Mainline +#ifdef VERSION_bencoding +import Network.DHT.Mainline () +import Network.KRPC.Message (KMessageOf) +#else +import Data.Tox as Tox +type KMessageOf = Tox.Message +#endif + {----------------------------------------------------------------------- -- Routing monad @@ -109,66 +115,6 @@ import Network.DHT.Mainline -- type Timestamp = POSIXTime --- | Some routing operations might need to perform additional IO. -data Routing ip result - = Full - | Done result - | GetTime ( Timestamp -> Routing ip result) - | NeedPing (NodeAddr ip) ( Bool -> Routing ip result) - | Refresh NodeId (Routing ip result) - -instance Functor (Routing ip) where - fmap _ Full = Full - fmap f (Done r) = Done ( f r) - fmap f (GetTime g) = GetTime (fmap f . g) - fmap f (NeedPing addr g) = NeedPing addr (fmap f . g) - fmap f (Refresh nid g) = Refresh nid (fmap f g) - -instance Monad (Routing ip) where - return = Done - - Full >>= _ = Full - Done r >>= m = m r - GetTime f >>= m = GetTime $ \ t -> f t >>= m - NeedPing a f >>= m = NeedPing a $ \ p -> f p >>= m - Refresh n f >>= m = Refresh n $ f >>= m - -instance Applicative (Routing ip) where - pure = return - (<*>) = ap - -instance Alternative (Routing ip) where - empty = Full - - Full <|> m = m - Done a <|> _ = Done a - GetTime f <|> m = GetTime $ \ t -> f t <|> m - NeedPing a f <|> m = NeedPing a $ \ p -> f p <|> m - Refresh n f <|> m = Refresh n (f <|> m) - --- | Run routing table operation. -runRouting :: Monad m - => (NodeAddr ip -> m Bool) -- ^ ping the specific node; - -> (NodeId -> m ()) -- ^ refresh nodes; - -> m Timestamp -- ^ get current time; - -> Routing ip f -- ^ operation to run; - -> m (Maybe f) -- ^ operation result; -runRouting ping_node find_nodes timestamper = go - where - go Full = return (Nothing) - go (Done r) = return (Just r) - go (GetTime f) = do - t <- timestamper - go (f t) - - go (NeedPing addr f) = do - pong <- ping_node addr - go (f pong) - - go (Refresh nid f) = do - find_nodes nid - go f - {----------------------------------------------------------------------- Bucket -----------------------------------------------------------------------} @@ -182,7 +128,7 @@ runRouting ping_node find_nodes timestamper = go -- other words: new nodes are used only when older nodes disappear. -- | Timestamp - last time this node is pinged. -type NodeEntry ip = Binding (NodeInfo KMessageOf ip ()) Timestamp +type NodeEntry dht ip u = Binding (NodeInfo dht ip u) Timestamp -- TODO instance Pretty where @@ -213,7 +159,7 @@ fromQ embed project QueueMethods{..} = } -} -seqQ :: QueueMethods Identity (NodeInfo KMessageOf ip ()) (Seq.Seq (NodeInfo KMessageOf ip ())) +seqQ :: QueueMethods Identity (NodeInfo dht ip u) (Seq.Seq (NodeInfo dht ip u)) seqQ = QueueMethods { pushBack = \e fifo -> pure (fifo Seq.|> e) , popFront = \fifo -> case Seq.viewl fifo of @@ -222,9 +168,9 @@ seqQ = QueueMethods , emptyQueue = pure Seq.empty } -type BucketQueue ip = Seq.Seq (NodeInfo KMessageOf ip ()) +type BucketQueue dht ip u = Seq.Seq (NodeInfo dht ip u) -bucketQ :: QueueMethods Identity (NodeInfo KMessageOf ip ()) (BucketQueue ip) +bucketQ :: QueueMethods Identity (NodeInfo dht ip u) (BucketQueue dht ip u) bucketQ = seqQ -- | Bucket is also limited in its length — thus it's called k-bucket. @@ -234,16 +180,45 @@ bucketQ = seqQ -- very unlikely that all nodes in bucket fail within an hour of -- each other. -- -data Bucket ip = Bucket { bktNodes :: !(PSQ (NodeInfo KMessageOf ip ()) Timestamp) - , bktQ :: !(BucketQueue ip) - } deriving (Show,Generic) - -instance (Eq ip, Serialize ip) => Serialize (Bucket ip) where - get = Bucket . psqFromPairList <$> get <*> pure (runIdentity $ emptyQueue bucketQ) - put = put . psqToPairList . bktNodes - +data Bucket dht ip u = Bucket { bktNodes :: !(PSQ (NodeInfo dht ip u) Timestamp) + , bktQ :: !(BucketQueue dht ip u) + } deriving Generic + +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Bucket dht ip u) + + +getGenericNode :: ( Serialize (NodeId dht) + , Serialize ip + , Serialize u + ) => Get (NodeInfo dht ip u) +getGenericNode = do + nid <- get + naddr <- get + u <- get + return NodeInfo + { nodeId = nid + , nodeAddr = naddr + , nodeAnnotation = u + } + +putGenericNode :: ( Serialize (NodeId dht) + , Serialize ip + , Serialize u + ) => NodeInfo dht ip u -> Put +putGenericNode (NodeInfo nid naddr u) = do + put nid + put naddr + put u + +instance (Eq ip, Ord (NodeId dht), Serialize (NodeId dht), Serialize ip, Serialize u) => Serialize (Bucket dht ip u) where + get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) + put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes + + +psqFromPairList :: (Ord p, Ord k) => [(k, p)] -> OrdPSQ k p () psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs +psqToPairList :: OrdPSQ t t1 () -> [(t, t1)] psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq -- | Update interval, in seconds. @@ -253,8 +228,8 @@ delta = 15 * 60 -- | Should maintain a set of stable long running nodes. -- -- Note: pings are triggerd only when a bucket is full. -insertBucket :: (Eq ip, Alternative f) => Timestamp -> Event ip -> Bucket ip - -> f ([CheckPing ip], Bucket ip) +insertBucket :: (Eq ip, Alternative f, Ord (NodeId dht)) => Timestamp -> Event dht ip u -> Bucket dht ip u + -> f ([CheckPing dht ip u], Bucket dht ip u) insertBucket curTime (TryInsert info) bucket -- just update timestamp if a node is already in bucket | already_have @@ -305,7 +280,9 @@ insertBucket curTime (PingResult bad_node got_response) bucket pure $ PSQ.insert info curTime nodes' | otherwise = id -updateStamps :: Eq ip => Timestamp -> [NodeInfo KMessageOf ip ()] -> PSQ (NodeInfo KMessageOf ip ()) Timestamp -> PSQ (NodeInfo KMessageOf ip ()) Timestamp +updateStamps :: ( Eq ip + , Ord (NodeId dht) + ) => Timestamp -> [NodeInfo dht ip u] -> PSQ (NodeInfo dht ip u) Timestamp -> PSQ (NodeInfo dht ip u) Timestamp updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales @@ -327,7 +304,11 @@ partitionQ imp test q0 = do select f = if test e then \(a,b) -> flip (,) b <$> f a else \(a,b) -> (,) a <$> f b -split :: Eq ip => BitIx -> Bucket ip -> (Bucket ip, Bucket ip) +split :: forall dht ip u. + ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => BitIx -> Bucket dht ip u -> (Bucket dht ip u, Bucket dht ip u) split i b = (Bucket ns qs, Bucket ms rs) where (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . key) . PSQ.toList $ bktNodes b @@ -337,7 +318,7 @@ split i b = (Bucket ns qs, Bucket ms rs) FiniteBits (Network.RPC.NodeId dht) => NodeInfo dht addr u -> Bool -} - spanBit :: NodeInfo KMessageOf addr () -> Bool + spanBit :: NodeInfo dht addr u -> Bool spanBit entry = testIdBit (nodeId entry) i {----------------------------------------------------------------------- @@ -350,12 +331,15 @@ type BucketCount = Int defaultBucketCount :: BucketCount defaultBucketCount = 20 -data Info ip = Info - { myBuckets :: Table ip - , myNodeId :: NodeId +data Info dht ip u = Info + { myBuckets :: Table dht ip u + , myNodeId :: NodeId dht , myAddress :: SockAddr } - deriving (Eq, Show, Generic) + deriving Generic + +deriving instance (Eq ip, Eq u, Eq (NodeId dht)) => Eq (Info dht ip u) +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Info dht ip u) -- instance (Eq ip, Serialize ip) => Serialize (Info ip) @@ -375,31 +359,33 @@ data Info ip = Info -- is always split into two new buckets covering the ranges @0..2 ^ -- 159@ and @2 ^ 159..2 ^ 160@. -- -data Table ip +data Table dht ip u -- most nearest bucket - = Tip NodeId BucketCount (Bucket ip) + = Tip (NodeId dht) BucketCount (Bucket dht ip u) -- left biased tree branch - | Zero (Table ip) (Bucket ip) + | Zero (Table dht ip u) (Bucket dht ip u) -- right biased tree branch - | One (Bucket ip) (Table ip) - deriving (Show, Generic) + | One (Bucket dht ip u) (Table dht ip u) + deriving Generic -instance Eq ip => Eq (Table ip) where +instance (Eq ip, Eq (NodeId dht)) => Eq (Table dht ip u) where (==) = (==) `on` Network.BitTorrent.DHT.Routing.toList instance Serialize NominalDiffTime where put = putWord32be . fromIntegral . fromEnum get = (toEnum . fromIntegral) <$> getWord32be +deriving instance (Show ip, Show u, Show (NodeId dht)) => Show (Table dht ip u) + -- | Normally, routing table should be saved between invocations of -- the client software. Note that you don't need to store /this/ -- 'NodeId' since it is already included in routing table. -instance (Eq ip, Serialize ip) => Serialize (Table ip) +instance (Eq ip, Serialize ip, Ord (NodeId dht), Serialize (NodeId dht), Serialize u) => Serialize (Table dht ip u) -- | Shape of the table. -instance Pretty (Table ip) where +instance Pretty (Table dht ip u) where pPrint t | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss | otherwise = brackets $ @@ -410,26 +396,26 @@ instance Pretty (Table ip) where ss = shape t -- | Empty table with specified /spine/ node id. -nullTable :: Eq ip => NodeId -> BucketCount -> Table ip +nullTable :: Eq ip => NodeId dht -> BucketCount -> Table dht ip u nullTable nid n = Tip nid (bucketCount (pred n)) (Bucket PSQ.empty (runIdentity $ emptyQueue bucketQ)) where bucketCount x = max 0 (min 159 x) -- | Test if table is empty. In this case DHT should start -- bootstrapping process until table becomes 'full'. -null :: Table ip -> Bool +null :: Table dht ip u -> Bool null (Tip _ _ b) = PSQ.null $ bktNodes b null _ = False -- | Test if table have maximum number of nodes. No more nodes can be -- 'insert'ed, except old ones becomes bad. -full :: Table ip -> Bool +full :: Table dht ip u -> Bool full (Tip _ n _) = n == 0 full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t -- | Get the /spine/ node id. -thisId :: Table ip -> NodeId +thisId :: Table dht ip u -> NodeId dht thisId (Tip nid _ _) = nid thisId (Zero table _) = thisId table thisId (One _ table) = thisId table @@ -439,18 +425,19 @@ type NodeCount = Int -- | Internally, routing table is similar to list of buckets or a -- /matrix/ of nodes. This function returns the shape of the matrix. -shape :: Table ip -> [BucketSize] +shape :: Table dht ip u -> [BucketSize] shape = map (PSQ.size . bktNodes) . toBucketList -- | Get number of nodes in the table. -size :: Table ip -> NodeCount +size :: Table dht ip u -> NodeCount size = L.sum . shape -- | Get number of buckets in the table. -depth :: Table ip -> BucketCount +depth :: Table dht ip u -> BucketCount depth = L.length . shape -lookupBucket :: NodeId -> Table ip -> [Bucket ip] +lookupBucket :: ( FiniteBits (NodeId dht) + ) => NodeId dht -> Table dht ip u -> [Bucket dht ip u] lookupBucket nid = go 0 [] where go i bs (Zero table bucket) @@ -461,14 +448,18 @@ lookupBucket nid = go 0 [] | otherwise = bucket : toBucketList table ++ bs go _ bs (Tip _ _ bucket) = bucket : bs -compatibleNodeId :: Table ip -> IO NodeId +compatibleNodeId :: forall dht ip u. + ( Serialize (NodeId dht) + , FiniteBits (NodeId dht) + ) => Table dht ip u -> IO (NodeId dht) compatibleNodeId tbl = genBucketSample prefix br where br = bucketRange (L.length (shape tbl) - 1) True + nodeIdSize = finiteBitSize (undefined :: NodeId dht) `div` 8 bs = BS.pack $ take nodeIdSize $ tablePrefix tbl ++ repeat 0 prefix = either error id $ S.decode bs -tablePrefix :: Table ip -> [Word8] +tablePrefix :: Table dht ip u -> [Word8] tablePrefix = map (packByte . take 8 . (++repeat False)) . chunksOf 8 . tableBits @@ -477,7 +468,7 @@ tablePrefix = map (packByte . take 8 . (++repeat False)) bitmask ix True = bit ix bitmask _ _ = 0 -tableBits :: Table ip -> [Bool] +tableBits :: Table dht ip u -> [Bool] tableBits (One _ tbl) = True : tableBits tbl tableBits (Zero tbl _) = False : tableBits tbl tableBits (Tip _ _ _) = [] @@ -498,20 +489,23 @@ type K = Int defaultK :: K defaultK = 8 -class TableKey k where - toNodeId :: k -> NodeId +class TableKey dht k where + toNodeId :: k -> NodeId dht -instance TableKey NodeId where +instance TableKey dht (NodeId dht) where toNodeId = id -instance TableKey InfoHash where +instance TableKey KMessageOf InfoHash where toNodeId = either (error msg) id . S.decode . S.encode where -- TODO unsafe coerse? msg = "tableKey: impossible" -- | Get a list of /K/ closest nodes using XOR metric. Used in -- 'find_node' and 'get_peers' queries. -kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo KMessageOf ip ()] +kclosest :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => TableKey dht a => K -> a -> Table dht ip u -> [NodeInfo dht ip u] kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) ++ rank nodeId nid (L.concat everyone) where @@ -525,7 +519,10 @@ kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) -- Routing -----------------------------------------------------------------------} -splitTip :: Eq ip => NodeId -> BucketCount -> BitIx -> Bucket ip -> Table ip +splitTip :: ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => NodeId dht -> BucketCount -> BitIx -> Bucket dht ip u -> Table dht ip u splitTip nid n i bucket | testIdBit nid i = (One zeros (Tip nid (pred n) ones)) | otherwise = (Zero (Tip nid (pred n) zeros) ones) @@ -538,11 +535,15 @@ splitTip nid n i bucket -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia -- paper. The rule requiring additional splits is in section 2.4. modifyBucket - :: forall ip xs. (Eq ip) => - NodeId -> (Bucket ip -> Maybe (xs, Bucket ip)) -> Table ip -> Maybe (xs,Table ip) + :: forall xs dht ip u. + ( Eq ip + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => + NodeId dht -> (Bucket dht ip u -> Maybe (xs, Bucket dht ip u)) -> Table dht ip u -> Maybe (xs,Table dht ip u) modifyBucket nodeId f = go (0 :: BitIx) where - go :: BitIx -> Table ip -> Maybe (xs, Table ip) + go :: BitIx -> Table dht ip u -> Maybe (xs, Table dht ip u) go !i (Zero table bucket) | testIdBit nodeId i = second (Zero table) <$> f bucket | otherwise = second (`Zero` bucket) <$> go (succ i) table @@ -555,23 +556,36 @@ modifyBucket nodeId f = go (0 :: BitIx) <|> go i (splitTip nid n i bucket) -- | Triggering event for atomic table update -data Event ip = TryInsert { foreignNode :: NodeInfo KMessageOf ip () } - | PingResult { foreignNode :: NodeInfo KMessageOf ip () - , ponged :: Bool - } - deriving (Eq,Show) -- Ord - -eventId :: Event ip -> NodeId +data Event dht ip u = TryInsert { foreignNode :: NodeInfo dht ip u } + | PingResult { foreignNode :: NodeInfo dht ip u + , ponged :: Bool + } +deriving instance Eq (NodeId dht) => Eq (Event dht ip u) +deriving instance ( Show ip + , Show (NodeId dht) + , Show u + ) => Show (Event dht ip u) + +eventId :: Event dht ip u -> NodeId dht eventId (TryInsert NodeInfo{..}) = nodeId eventId (PingResult NodeInfo{..} _) = nodeId -- | Actions requested by atomic table update -data CheckPing ip = CheckPing [NodeInfo KMessageOf ip ()] - deriving (Eq,Show) -- Ord +data CheckPing dht ip u = CheckPing [NodeInfo dht ip u] + +deriving instance Eq (NodeId dht) => Eq (CheckPing dht ip u) +deriving instance ( Show ip + , Show (NodeId dht) + , Show u + ) => Show (CheckPing dht ip u) -- | Atomic 'Table' update -insert :: (Eq ip, Applicative m) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) +insert :: ( Eq ip + , Applicative m + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + ) => Timestamp -> Event dht ip u -> Table dht ip u -> m ([CheckPing dht ip u], Table dht ip u) insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) (insertBucket tm event) tbl @@ -579,16 +593,16 @@ insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) ( -- Conversion -----------------------------------------------------------------------} -type TableEntry ip = (NodeInfo KMessageOf ip (), Timestamp) +type TableEntry dht ip u = (NodeInfo dht ip u, Timestamp) -tableEntry :: NodeEntry ip -> TableEntry ip +tableEntry :: NodeEntry dht ip u -> TableEntry dht ip u tableEntry (a :-> b) = (a, b) -- | Non-empty list of buckets. -toBucketList :: Table ip -> [Bucket ip] +toBucketList :: Table dht ip u -> [Bucket dht ip u] toBucketList (Tip _ _ b) = [b] toBucketList (Zero t b) = b : toBucketList t toBucketList (One b t) = b : toBucketList t -toList :: Eq ip => Table ip -> [[TableEntry ip]] +toList :: Eq ip => Table dht ip u -> [[TableEntry dht ip u]] toList = L.map (L.map tableEntry . PSQ.toList . bktNodes) . toBucketList diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index 854f26c7..844b4575 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -24,21 +25,28 @@ import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) import Network.BitTorrent.Address hiding (NodeId) import Network.RPC -import Network.KRPC.Message (KMessageOf) +#ifdef VERSION_bencoding import Network.DHT.Mainline () +import Network.KRPC.Message (KMessageOf) +type Ann = () +#else +import Data.Tox as Tox +type KMessageOf = Tox.Message +type Ann = Bool +#endif data IterativeSearch ip r = IterativeSearch { searchTarget :: NodeId KMessageOf - , searchQuery :: NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]) + , searchQuery :: NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r]) , searchPendingCount :: TVar Int - , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip ()) (NodeDistance (NodeId KMessageOf))) - , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip ()) (NodeDistance (NodeId KMessageOf))) + , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) + , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) , searchVisited :: TVar (Set (NodeAddr ip)) , searchResults :: TVar (Set r) } -newSearch :: Eq ip => (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r])) - -> NodeId KMessageOf -> [NodeInfo KMessageOf ip ()] -> IO (IterativeSearch ip r) +newSearch :: Eq ip => (NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r])) + -> NodeId KMessageOf -> [NodeInfo KMessageOf ip Ann] -> IO (IterativeSearch ip r) newSearch qry target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns @@ -55,7 +63,7 @@ searchK = 8 sendQuery :: forall a ip. (Ord a, Ord ip) => IterativeSearch ip a - -> Binding (NodeInfo KMessageOf ip ()) (NodeDistance (NodeId KMessageOf)) + -> Binding (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf)) -> IO () sendQuery IterativeSearch{..} (ni :-> d) = do (ns,rs) <- handle (\(SomeException e) -> return ([],[])) diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index aa6ee396..bec2dabc 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -106,9 +106,11 @@ import Data.Serialize as S import Data.Torrent as Torrent import Network.KRPC as KRPC hiding (Options, def) import qualified Network.KRPC as KRPC (def) -import Network.KRPC.Message (KMessageOf) #ifdef VERSION_bencoding import Data.BEncode (BValue) +import Network.KRPC.Message (KMessageOf) +#else +import Data.Tox as Tox #endif import Network.BitTorrent.Address import Network.BitTorrent.DHT.ContactInfo (PeerStore) @@ -257,11 +259,19 @@ data Node ip = Node -- | Pseudo-unique self-assigned session identifier. This value is -- constant during DHT session and (optionally) between sessions. - , tentativeNodeId :: !NodeId +#ifdef VERSION_bencoding + , tentativeNodeId :: !(NodeId KMessageOf) +#else + , tentativeNodeId :: !(NodeId Tox.Message) +#endif , resources :: !InternalState , manager :: !(Manager (DHT ip )) -- ^ RPC manager; - , routingInfo :: !(TVar (Maybe (R.Info ip))) -- ^ search table; +#ifdef VERSION_bencoding + , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table; +#else + , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table; +#endif , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. @@ -319,7 +329,7 @@ instance MonadLogger (DHT ip) where #ifdef VERSION_bencoding type NodeHandler ip = Handler (DHT ip) KMessageOf BValue #else -type NodeHandler ip = Handler (DHT ip) KMessageOf ByteString +type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString #endif -- | Run DHT session. You /must/ properly close session using @@ -330,7 +340,11 @@ newNode :: Address ip -> Options -- ^ various dht options; -> NodeAddr ip -- ^ node address to bind; -> LogFun -- ^ - -> Maybe NodeId -- ^ use this NodeId, if not given a new one is generated. +#ifdef VERSION_bencoding + -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. +#else + -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated. +#endif -> IO (Node ip) -- ^ a new DHT node running at given address. newNode hs opts naddr logger mbid = do s <- createInternalState @@ -406,7 +420,11 @@ routableAddress = do return $ myAddress <$> info -- | The current NodeId that the given remote node should know us by. -myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId +#ifdef VERSION_bencoding +myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf) +#else +myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message) +#endif myNodeIdAccordingTo _ = do info <- asks routingInfo >>= liftIO . atomically . readTVar maybe (asks tentativeNodeId) @@ -415,7 +433,11 @@ myNodeIdAccordingTo _ = do -- | Get current routing table. Normally you don't need to use this -- function, but it can be usefull for debugging and profiling purposes. -getTable :: Eq ip => DHT ip (Table ip) +#ifdef VERSION_bencoding +getTable :: Eq ip => DHT ip (Table KMessageOf ip ()) +#else +getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool) +#endif getTable = do Node { tentativeNodeId = myId , routingInfo = var @@ -452,7 +474,11 @@ allPeers ih = do -- -- This operation used for 'find_nodes' query. -- -getClosest :: Eq ip => TableKey k => k -> DHT ip [NodeInfo KMessageOf ip ()] +#ifdef VERSION_bencoding +getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] +#else +getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] +#endif getClosest node = do k <- asks (optK . options) kclosest k node <$> getTable -- cgit v1.2.3