From d6fac9a8df0ce872ede54d6a71ca6d6c750eadc9 Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 8 Jun 2017 00:00:56 -0400 Subject: WIP: Adapting DHT to Tox network (part 5). --- src/Network/BitTorrent/Address.hs | 128 +++++++++++++--------------------- src/Network/BitTorrent/DHT.hs | 7 +- src/Network/BitTorrent/DHT/Message.hs | 20 +++--- src/Network/BitTorrent/DHT/Query.hs | 63 +++++++++-------- src/Network/BitTorrent/DHT/Routing.hs | 36 ++++++---- src/Network/BitTorrent/DHT/Search.hs | 29 +++++--- src/Network/BitTorrent/DHT/Session.hs | 2 +- src/Network/DHT/Mainline.hs | 72 +++++++++++++++++-- src/Network/KRPC/Manager.hs | 44 +++++------- src/Network/RPC.hs | 24 ++++++- 10 files changed, 247 insertions(+), 178 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/Address.hs b/src/Network/BitTorrent/Address.hs index 2132f8f9..560ac1ef 100644 --- a/src/Network/BitTorrent/Address.hs +++ b/src/Network/BitTorrent/Address.hs @@ -11,6 +11,7 @@ -- {-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE ViewPatterns #-} @@ -59,11 +60,8 @@ module Network.BitTorrent.Address -- * Node -- ** Id , NodeId - , asNodeId , nodeIdSize , testIdBit - , NodeDistance - , distance , genNodeId , bucketRange , genBucketSample @@ -73,6 +71,8 @@ module Network.BitTorrent.Address -- ** Info , NodeAddr (..) , NodeInfo (..) + , mapAddress + , traverseAddress , rank -- * Fingerprint @@ -98,7 +98,6 @@ import Data.BEncode.BDict (BKey) import Data.Bits import qualified Data.ByteString as BS import qualified Data.ByteString.Internal as BS -import Data.ByteString.Base16 as Base16 import Data.ByteString.Char8 as BC import Data.ByteString.Char8 as BS8 import qualified Data.ByteString.Lazy as BL @@ -130,6 +129,9 @@ import System.Locale (defaultTimeLocale) #endif import System.Entropy import Data.Digest.CRC32C +import qualified Network.RPC as RPC +import Network.KRPC.Message (KMessageOf) +import Network.DHT.Mainline -- import Paths_bittorrent (version) @@ -646,48 +648,10 @@ peerSocket socketType pa = do -- in the DHT to get the location of peers to download from using -- the BitTorrent protocol. --- TODO more compact representation ('ShortByteString's?) +-- asNodeId :: ByteString -> NodeId +-- asNodeId bs = NodeId $ BS.take nodeIdSize bs --- | Each node has a globally unique identifier known as the \"node --- ID.\" --- --- Normally, /this/ node id should be saved between invocations --- of the client software. -newtype NodeId = NodeId ByteString - deriving (Show, Eq, Ord, Typeable -#ifdef VERSION_bencoding - , BEncode -#endif - ) - - -nodeIdSize :: Int -nodeIdSize = 20 - -asNodeId :: ByteString -> NodeId -asNodeId bs = NodeId $ BS.take nodeIdSize bs - --- | Meaningless node id, for testing purposes only. -instance Default NodeId where - def = NodeId (BS.replicate nodeIdSize 0) - -instance Serialize NodeId where - get = NodeId <$> getByteString nodeIdSize - {-# INLINE get #-} - put (NodeId bs) = putByteString bs - {-# INLINE put #-} - --- | ASCII encoded. -instance IsString NodeId where - fromString str - | L.length str == nodeIdSize = NodeId (fromString str) - | L.length str == 2 * nodeIdSize = NodeId (fst $ Base16.decode $ fromString str) - | otherwise = error "fromString: invalid NodeId length" - {-# INLINE fromString #-} - --- | base16 encoded. -instance Pretty NodeId where - pPrint (NodeId nid) = PP.text $ BC.unpack $ Base16.encode nid +{- -- | Test if the nth bit is set. testIdBit :: NodeId -> Word -> Bool @@ -696,6 +660,10 @@ testIdBit (NodeId bs) i , (q, r) <- quotRem (fromIntegral i) 8 = testBit (BS.index bs q) (7 - r) | otherwise = False +-} + +testIdBit :: FiniteBits bs => bs -> Word -> Bool +testIdBit bs i = testBit bs (fromIntegral (finiteBitSize bs - fromIntegral i)) {-# INLINE testIdBit #-} -- TODO WARN is the 'system' random suitable for this? @@ -703,25 +671,10 @@ testIdBit (NodeId bs) i -- Distribution of ID's should be as uniform as possible. -- genNodeId :: IO NodeId -genNodeId = NodeId <$> getEntropy nodeIdSize +genNodeId = NodeId . either error id . S.decode <$> getEntropy nodeIdSize ------------------------------------------------------------------------ --- | In Kademlia, the distance metric is XOR and the result is --- interpreted as an unsigned integer. -newtype NodeDistance = NodeDistance BS.ByteString - deriving (Eq, Ord) - -instance Pretty NodeDistance where - pPrint (NodeDistance bs) = text $ BC.unpack (Base16.encode bs) - -instance Show NodeDistance where - show (NodeDistance bs) = BC.unpack (Base16.encode bs) - --- | distance(A,B) = |A xor B| Smaller values are closer. -distance :: NodeId -> NodeId -> NodeDistance -distance (NodeId a) (NodeId b) = NodeDistance (BS.pack (BS.zipWith xor a b)) - -- | Accepts a depth/index of a bucket and whether or not it is the last one, -- yields: -- @@ -753,13 +706,13 @@ genBucketSample n qmb = genBucketSample' getEntropy n qmb genBucketSample' :: Applicative m => (Int -> m ByteString) -> NodeId -> (Int,Word8,Word8) -> m NodeId genBucketSample' gen (NodeId self) (q,m,b) - | q <= 0 = NodeId <$> gen nodeIdSize + | q <= 0 = NodeId . either error id . S.decode <$> gen nodeIdSize | q >= nodeIdSize = pure (NodeId self) - | otherwise = NodeId . build <$> gen (nodeIdSize - q + 1) + | otherwise = NodeId . either error id . S.decode . build <$> gen (nodeIdSize - q + 1) where build tl = BS.init hd <> BS.cons (h .|. t) (BS.tail tl) where - hd = BS.take q self + hd = BS.take q $ S.encode self h = xor b (complement m .&. BS.last hd) t = m .&. BS.head tl @@ -819,32 +772,46 @@ fromPeerAddr PeerAddr {..} = NodeAddr ------------------------------------------------------------------------ -data NodeInfo a = NodeInfo - { nodeId :: !NodeId - , nodeAddr :: !(NodeAddr a) - } deriving (Show, Eq, Functor, Foldable, Traversable) +data NodeInfo dht addr u = NodeInfo + { nodeId :: !(RPC.NodeId dht) + , nodeAddr :: !(NodeAddr addr) + , nodeAnnotation :: u + } deriving (Functor, Foldable, Traversable) + +deriving instance ( Show (RPC.NodeId dht) + , Show addr + , Show u ) => Show (NodeInfo dht addr u) +mapAddress :: (addr -> b) -> NodeInfo dht addr u -> NodeInfo dht b u +mapAddress f ni = ni { nodeAddr = fmap f (nodeAddr ni) } -instance Eq a => Ord (NodeInfo a) where +traverseAddress :: Applicative f => (addr -> f b) -> NodeInfo dht addr u -> f (NodeInfo dht b u) +traverseAddress f ni = fmap (\addr -> ni { nodeAddr = addr }) $ traverse f (nodeAddr ni) + +-- Warning: Eq and Ord only look at the nodeId field. +instance Eq (RPC.NodeId dht) => Eq (NodeInfo dht a u) where + a == b = (nodeId a == nodeId b) + +instance Ord (RPC.NodeId dht) => Ord (NodeInfo dht a u) where compare = comparing nodeId -- | KRPC 'compact list' compatible encoding: contact information for -- nodes is encoded as a 26-byte string. Also known as "Compact node -- info" the 20-byte Node ID in network byte order has the compact -- IP-address/port info concatenated to the end. -instance Serialize a => Serialize (NodeInfo a) where - get = NodeInfo <$> get <*> get +instance Serialize a => Serialize (NodeInfo KMessageOf a ()) where + get = (\a b -> NodeInfo a b ()) <$> get <*> get put NodeInfo {..} = put nodeId >> put nodeAddr -instance Pretty ip => Pretty (NodeInfo ip) where +instance Pretty ip => Pretty (NodeInfo KMessageOf ip ()) where pPrint NodeInfo {..} = pPrint nodeId <> "@(" <> pPrint nodeAddr <> ")" -instance Pretty ip => Pretty [NodeInfo ip] where +instance Pretty ip => Pretty [NodeInfo KMessageOf ip ()] where pPrint = PP.vcat . PP.punctuate "," . L.map pPrint -- | Order by closeness: nearest nodes first. rank :: (x -> NodeId) -> NodeId -> [x] -> [x] -rank f nid = L.sortBy (comparing (distance nid . f)) +rank f nid = L.sortBy (comparing (RPC.distance nid . f)) {----------------------------------------------------------------------- -- Fingerprint @@ -1259,8 +1226,11 @@ bep42s addr (NodeId r) = mapMaybe (bep42 addr) rs where rs = L.map (NodeId . change3bits r) [0..7] -change3bits :: ByteString -> Word8 -> ByteString -change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n) +-- change3bits :: ByteString -> Word8 -> ByteString +-- change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n) + +change3bits :: (Num b, Bits b) => b -> b -> b +change3bits bs n = (bs .&. complement 7) .|. n -- | Modifies a purely random 'NodeId' to one that is related to a given -- routable address in accordance with BEP 42. @@ -1274,9 +1244,9 @@ bep42 addr (NodeId r) where ip4mask = "\x03\x0f\x3f\xff" :: ByteString ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString - nbhood_select = BS.last r .&. 7 - retr n = pure $ BS.drop (BS.length r - n) r - crc = S.encode . crc32c . BS.pack + nbhood_select = (fromIntegral r :: Word8) .&. 7 + retr n = pure $ BS.drop (nodeIdSize - n) $ S.encode r + crc = flip shiftL (finiteBitSize (NodeId undefined) - 32) . fromIntegral . crc32c . BS.pack applyMask ip = case BS.zipWith (.&.) msk ip of (b:bs) -> (b .|. shiftL nbhood_select 5) : bs bs -> bs diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index aaa1cf33..ab948a2d 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -17,6 +17,7 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeOperators #-} +{-# LANGUAGE ScopedTypeVariables #-} module Network.BitTorrent.DHT ( -- * Distributed Hash Table DHT @@ -70,6 +71,7 @@ import Network.BitTorrent.DHT.Session import Network.BitTorrent.DHT.Routing as T hiding (null) import qualified Data.Text as Text import Data.Monoid +import Network.KRPC.Message (KMessageOf) {----------------------------------------------------------------------- @@ -166,7 +168,7 @@ resolveHostName NodeAddr {..} = do -- -- This operation do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -bootstrap :: Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () +bootstrap :: forall ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () bootstrap mbs startNodes = do restored <- case decode <$> mbs of @@ -178,7 +180,8 @@ bootstrap mbs startNodes = do $(logInfoS) "bootstrap" "Start node bootstrapping" let searchAll aliveNodes = do nid <- myNodeIdAccordingTo (error "FIXME") - C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume + nss <- C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume + return ( nss :: [[NodeInfo KMessageOf ip ()]] ) input_nodes <- (restored ++) . T.toList <$> getTable -- Step 1: Use iterative searches to flesh out the table.. do let knowns = map (map $ nodeAddr . fst) input_nodes diff --git a/src/Network/BitTorrent/DHT/Message.hs b/src/Network/BitTorrent/DHT/Message.hs index 0e2bfdd9..c3df683a 100644 --- a/src/Network/BitTorrent/DHT/Message.hs +++ b/src/Network/BitTorrent/DHT/Message.hs @@ -93,15 +93,14 @@ import Data.Bool #ifdef VERSION_bencoding import Data.BEncode as BE import Data.BEncode.BDict as BDict -import Network.BitTorrent.Address #else import qualified Data.Tox as Tox import Data.Tox (NodeId) import Data.Word import Control.Monad import Network.KRPC.Method -import Network.BitTorrent.Address hiding (NodeId) #endif +import Network.BitTorrent.Address hiding (NodeId) import Data.ByteString (ByteString) import Data.List as L import Data.Monoid @@ -109,11 +108,14 @@ import Data.Serialize as S import Data.Typeable import Network import Network.KRPC +import Network.KRPC.Message (KMessageOf) import Data.Maybe import Data.Torrent (InfoHash) import Network.BitTorrent.DHT.Token import Network.KRPC () +import Network.DHT.Mainline () +import Network.RPC hiding (Query,Response) {----------------------------------------------------------------------- -- envelopes @@ -134,7 +136,7 @@ read_only_key = "ro" -- | All queries have an \"id\" key and value containing the node ID -- of the querying node. data Query a = Query - { queringNodeId :: NodeId -- ^ node id of /quering/ node; + { queringNodeId :: NodeId KMessageOf -- ^ node id of /quering/ node; , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43 , queryParams :: a -- ^ query parameters. } deriving (Show, Eq, Typeable) @@ -161,7 +163,7 @@ data Query a = Query a -- | All responses have an \"id\" key and value containing the node ID -- of the responding node. data Response a = Response - { queredNodeId :: NodeId -- ^ node id of /quered/ node; + { queredNodeId :: NodeId KMessageOf -- ^ node id of /quered/ node; , responseVals :: a -- ^ query result. } deriving (Show, Eq, Typeable) @@ -233,7 +235,7 @@ instance KRPC (Query Ping) (Response Ping) where -- | Find node is used to find the contact information for a node -- given its ID. #ifdef VERSION_bencoding -newtype FindNode ip = FindNode NodeId +newtype FindNode ip = FindNode (NodeId KMessageOf) #else data FindNode ip = FindNode NodeId Tox.Nonce8 -- Tox: Get Nodes #endif @@ -262,7 +264,7 @@ instance Serialize (Query (FindNode ip)) where -- nodes in its own routing table. -- #ifdef VERSION_bencoding -newtype NodeFound ip = NodeFound [NodeInfo ip] +newtype NodeFound ip = NodeFound [NodeInfo KMessageOf ip ()] #else data NodeFound ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 #endif @@ -273,9 +275,9 @@ nodes_key :: BKey nodes_key = "nodes" -- Convert IPv4 address. Useful for using variadic IP type. -from4 :: forall s. Address s => NodeInfo IPv4 -> Either String (NodeInfo s) +from4 :: forall dht u s. Address s => NodeInfo dht IPv4 u -> Either String (NodeInfo dht s u) from4 n = maybe (Left "Error converting IPv4") Right - $ traverse (fromAddr :: IPv4 -> Maybe s) n + $ traverseAddress (fromAddr :: IPv4 -> Maybe s) n #ifdef VERSION_bencoding binary :: Serialize a => BKey -> BE.Get [a] @@ -334,7 +336,7 @@ instance Typeable ip => BEncode (GetPeers ip) where toBEncode (GetPeers ih) = toDict $ info_hash_key .=! ih .: endDict fromBEncode = fromDict $ GetPeers <$>! info_hash_key -type PeerList ip = Either [NodeInfo ip] [PeerAddr ip] +type PeerList ip = Either [NodeInfo KMessageOf ip ()] [PeerAddr ip] data GotPeers ip = GotPeers { -- | If the queried node has no peers for the infohash, returned diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 820db8ba..4b386cdc 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -80,7 +80,7 @@ import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Time import Data.Time.Clock.POSIX -import Network.KRPC hiding (Options, def) +import Network.KRPC as KRPC hiding (Options, def) import Network.KRPC.Message (ReflectedIP(..)) import Network.KRPC.Manager (QueryFailure(..)) import Data.Torrent @@ -90,14 +90,15 @@ import Network.BitTorrent.DHT.Session import Control.Concurrent.STM import qualified Network.BitTorrent.DHT.Search as Search #ifdef VERSION_bencoding -import Network.BitTorrent.Address import Data.BEncode (BValue) -import Network.DHT.Mainline +import Network.DHT.Mainline hiding (NodeId) +import Network.KRPC.Message (KMessageOf) #else -import Network.BitTorrent.Address hiding (NodeId) import Data.ByteString (ByteString) import Data.Tox #endif +import Network.BitTorrent.Address hiding (NodeId) +import Network.RPC as RPC hiding (Query,Response) {----------------------------------------------------------------------- -- Handlers @@ -106,18 +107,17 @@ import Data.Tox nodeHandler :: ( Address ip , KRPC (Query a) (Response b) #ifdef VERSION_bencoding - , Envelope (Query a) (Response b) ~ BValue ) + , KRPC.Envelope (Query a) (Response b) ~ BValue ) #else - , Envelope (Query a) (Response b) ~ ByteString ) + , KPRC.Envelope (Query a) (Response b) ~ ByteString ) #endif => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip -#ifdef VERSION_bencoding nodeHandler action = handler $ \ sockAddr qry -> do +#ifdef VERSION_bencoding let remoteId = queringNodeId qry read_only = queryIsReadOnly qry q = queryParams qry #else -nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do let remoteId = msgClient qry read_only = False q = msgPayload qry @@ -125,7 +125,7 @@ nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do case fromSockAddr sockAddr of Nothing -> throwIO BadAddress Just naddr -> do - let ni = NodeInfo remoteId naddr + let ni = NodeInfo remoteId naddr () -- Do not route read-only nodes. (bep 43) if read_only then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) @@ -136,8 +136,11 @@ nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do -- | Default 'Ping' handler. pingH :: Address ip => NodeHandler ip -pingH = nodeHandler $ \ _ Ping -> do - return Ping +#ifdef VERSION_bencoding +pingH = nodeHandler $ \ _ Ping -> return Ping +#else +pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } +#endif -- | Default 'FindNode' handler. findNodeH :: Address ip => NodeHandler ip @@ -177,19 +180,23 @@ defaultHandlers = [pingH, findNodeH] -- Basic queries -----------------------------------------------------------------------} -type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) +type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. -pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) +pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) pingQ addr = do +#ifdef VERSION_bencoding (nid, Ping, mip) <- queryNode' addr Ping - return (NodeInfo nid addr, mip) +#else + (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} +#endif + return (NodeInfo nid addr (), mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at -- all throwing an exception -findNodeQ :: Address ip => TableKey key => key -> Iteration ip NodeInfo +-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo findNodeQ key NodeInfo {..} = do NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr $(logInfoS) "findNodeQ" $ "NodeFound\n" @@ -223,7 +230,7 @@ announceQ ih p NodeInfo {..} = do -----------------------------------------------------------------------} -ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [PeerAddr ip])) +ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) ioGetPeers ih = do session <- ask return $ \ni -> runDHT session $ do @@ -232,7 +239,7 @@ ioGetPeers ih = do Right e -> return $ either (,[]) ([],) e Left e -> let _ = e :: QueryFailure in return ([],[]) -ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [NodeInfo ip])) +ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) ioFindNode ih = do session <- ask return $ \ni -> runDHT session $ do @@ -240,7 +247,7 @@ ioFindNode ih = do return $ L.partition (\n -> nodeId n /= toNodeId ih) ns isearch :: (Ord r, Ord ip) => - (InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [r]))) + (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) -> InfoHash -> DHT ip (ThreadId, Search.IterativeSearch ip r) isearch f ih = do @@ -255,10 +262,10 @@ isearch f ih = do return (a, s) -type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] +type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] -- TODO: use reorder and filter (Traversal option) leftovers -search :: k -> Iteration ip o -> Search ip o +-- search :: k -> IterationI ip o -> Search ip o search _ action = do awaitForever $ \ batch -> unless (L.null batch) $ do $(logWarnS) "search" "start query" @@ -285,15 +292,15 @@ probeNode addr = do -- FIXME do not use getClosest sinse we should /refresh/ them -refreshNodes :: Address ip => NodeId -> DHT ip () -- [NodeInfo ip] +refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()] refreshNodes nid = do $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) nodes <- getClosest nid do -- forM (L.take 1 nodes) $ \ addr -> do -- NodeFound ns <- FindNode nid <@> addr - -- Expected type: ConduitM [NodeAddr ip] [NodeInfo ip] (DHT ip) () - -- Actual type: ConduitM [NodeInfo ip] [NodeInfo ip] (DHT ip) () + -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () + -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." @@ -306,7 +313,7 @@ refreshNodes nid = do -- | This operation do not block but acquire exclusive access to -- routing table. -insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip () +insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () insertNode info witnessed_ip0 = do var <- asks routingInfo tm <- getTimestamp @@ -315,7 +322,7 @@ insertNode info witnessed_ip0 = do let logMsg = "Routing table: " <> pPrint t $(logDebugS) "insertNode" (T.pack (render logMsg)) let arrival0 = TryInsert info - arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) + arrival4 = TryInsert (mapAddress fromAddr info) :: Event (Maybe IPv4) $(logDebugS) "insertNode" $ T.pack (show arrival4) maxbuckets <- asks (optBucketCount . options) fallbackid <- asks tentativeNodeId @@ -380,18 +387,18 @@ insertNode info witnessed_ip0 = do -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip (NodeId, b) + => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) + => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) queryNode' addr q = do nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) -- <> " by " <> T.pack (show (toSockAddr addr)) - _ <- insertNode (NodeInfo remoteId addr) witnessed_ip + _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index cf4a4de3..6cf7f122 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs @@ -83,8 +83,10 @@ import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) import qualified Data.ByteString as BS import Data.Bits +import Network.KRPC.Message (KMessageOf) import Data.Torrent import Network.BitTorrent.Address +import Network.DHT.Mainline {----------------------------------------------------------------------- -- Routing monad @@ -180,7 +182,7 @@ runRouting ping_node find_nodes timestamper = go -- other words: new nodes are used only when older nodes disappear. -- | Timestamp - last time this node is pinged. -type NodeEntry ip = Binding (NodeInfo ip) Timestamp +type NodeEntry ip = Binding (NodeInfo KMessageOf ip ()) Timestamp -- TODO instance Pretty where @@ -211,7 +213,7 @@ fromQ embed project QueueMethods{..} = } -} -seqQ :: QueueMethods Identity (NodeInfo ip) (Seq.Seq (NodeInfo ip)) +seqQ :: QueueMethods Identity (NodeInfo KMessageOf ip ()) (Seq.Seq (NodeInfo KMessageOf ip ())) seqQ = QueueMethods { pushBack = \e fifo -> pure (fifo Seq.|> e) , popFront = \fifo -> case Seq.viewl fifo of @@ -220,9 +222,9 @@ seqQ = QueueMethods , emptyQueue = pure Seq.empty } -type BucketQueue ip = Seq.Seq (NodeInfo ip) +type BucketQueue ip = Seq.Seq (NodeInfo KMessageOf ip ()) -bucketQ :: QueueMethods Identity (NodeInfo ip) (BucketQueue ip) +bucketQ :: QueueMethods Identity (NodeInfo KMessageOf ip ()) (BucketQueue ip) bucketQ = seqQ -- | Bucket is also limited in its length — thus it's called k-bucket. @@ -232,7 +234,7 @@ bucketQ = seqQ -- very unlikely that all nodes in bucket fail within an hour of -- each other. -- -data Bucket ip = Bucket { bktNodes :: !(PSQ (NodeInfo ip) Timestamp) +data Bucket ip = Bucket { bktNodes :: !(PSQ (NodeInfo KMessageOf ip ()) Timestamp) , bktQ :: !(BucketQueue ip) } deriving (Show,Generic) @@ -303,7 +305,7 @@ insertBucket curTime (PingResult bad_node got_response) bucket pure $ PSQ.insert info curTime nodes' | otherwise = id -updateStamps :: Eq ip => Timestamp -> [NodeInfo ip] -> PSQ (NodeInfo ip) Timestamp -> PSQ (NodeInfo ip) Timestamp +updateStamps :: Eq ip => Timestamp -> [NodeInfo KMessageOf ip ()] -> PSQ (NodeInfo KMessageOf ip ()) Timestamp -> PSQ (NodeInfo KMessageOf ip ()) Timestamp updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales @@ -330,6 +332,12 @@ split i b = (Bucket ns qs, Bucket ms rs) where (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . key) . PSQ.toList $ bktNodes b (qs,rs) = runIdentity $ partitionQ bucketQ spanBit $ bktQ b + {- + spanBit :: forall (dht :: * -> *) addr u. + FiniteBits (Network.RPC.NodeId dht) => + NodeInfo dht addr u -> Bool + -} + spanBit :: NodeInfo KMessageOf addr () -> Bool spanBit entry = testIdBit (nodeId entry) i {----------------------------------------------------------------------- @@ -458,7 +466,7 @@ compatibleNodeId tbl = genBucketSample prefix br where br = bucketRange (L.length (shape tbl) - 1) True bs = BS.pack $ take nodeIdSize $ tablePrefix tbl ++ repeat 0 - prefix = asNodeId bs + prefix = either error id $ S.decode bs tablePrefix :: Table ip -> [Word8] tablePrefix = map (packByte . take 8 . (++repeat False)) @@ -503,7 +511,7 @@ instance TableKey InfoHash where -- | Get a list of /K/ closest nodes using XOR metric. Used in -- 'find_node' and 'get_peers' queries. -kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] +kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo KMessageOf ip ()] kclosest k (toNodeId -> nid) tbl = take k $ rank nodeId nid (L.concat bucket) ++ rank nodeId nid (L.concat everyone) where @@ -547,19 +555,19 @@ modifyBucket nodeId f = go (0 :: BitIx) <|> go i (splitTip nid n i bucket) -- | Triggering event for atomic table update -data Event ip = TryInsert { foreignNode :: NodeInfo ip } - | PingResult { foreignNode :: NodeInfo ip +data Event ip = TryInsert { foreignNode :: NodeInfo KMessageOf ip () } + | PingResult { foreignNode :: NodeInfo KMessageOf ip () , ponged :: Bool } - deriving (Eq,Ord,Show) + deriving (Eq,Show) -- Ord eventId :: Event ip -> NodeId eventId (TryInsert NodeInfo{..}) = nodeId eventId (PingResult NodeInfo{..} _) = nodeId -- | Actions requested by atomic table update -data CheckPing ip = CheckPing [NodeInfo ip] - deriving (Eq,Ord,Show) +data CheckPing ip = CheckPing [NodeInfo KMessageOf ip ()] + deriving (Eq,Show) -- Ord -- | Atomic 'Table' update @@ -571,7 +579,7 @@ insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) ( -- Conversion -----------------------------------------------------------------------} -type TableEntry ip = (NodeInfo ip, Timestamp) +type TableEntry ip = (NodeInfo KMessageOf ip (), Timestamp) tableEntry :: NodeEntry ip -> TableEntry ip tableEntry (a :-> b) = (a, b) diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index 79cc9489..854f26c7 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -1,5 +1,6 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} module Network.BitTorrent.DHT.Search where import Control.Concurrent @@ -21,20 +22,23 @@ import qualified Data.MinMaxPSQ as MM ;import Data.MinMaxPSQ (MinMaxPSQ) import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) -import Network.BitTorrent.Address +import Network.BitTorrent.Address hiding (NodeId) +import Network.RPC +import Network.KRPC.Message (KMessageOf) +import Network.DHT.Mainline () data IterativeSearch ip r = IterativeSearch - { searchTarget :: NodeId - , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r]) + { searchTarget :: NodeId KMessageOf + , searchQuery :: NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]) , searchPendingCount :: TVar Int - , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) - , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) + , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip ()) (NodeDistance (NodeId KMessageOf))) + , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip ()) (NodeDistance (NodeId KMessageOf))) , searchVisited :: TVar (Set (NodeAddr ip)) , searchResults :: TVar (Set r) } -newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r])) - -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r) +newSearch :: Eq ip => (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r])) + -> NodeId KMessageOf -> [NodeInfo KMessageOf ip ()] -> IO (IterativeSearch ip r) newSearch qry target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns @@ -49,9 +53,9 @@ searchAlpha = 3 searchK :: Int searchK = 8 -sendQuery :: (Ord a, Ord t) => - IterativeSearch t a - -> Binding (NodeInfo t) NodeDistance +sendQuery :: forall a ip. (Ord a, Ord ip) => + IterativeSearch ip a + -> Binding (NodeInfo KMessageOf ip ()) (NodeDistance (NodeId KMessageOf)) -> IO () sendQuery IterativeSearch{..} (ni :-> d) = do (ns,rs) <- handle (\(SomeException e) -> return ([],[])) @@ -60,7 +64,10 @@ sendQuery IterativeSearch{..} (ni :-> d) = do modifyTVar searchPendingCount pred vs <- readTVar searchVisited -- We only queue a node if it is not yet visited - let insertFoundNode n q + let insertFoundNode :: NodeInfo KMessageOf ip u + -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) + -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) + insertFoundNode n q | nodeAddr n `Set.member` vs = q | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 20dba595..aa6ee396 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -452,7 +452,7 @@ allPeers ih = do -- -- This operation used for 'find_nodes' query. -- -getClosest :: Eq ip => TableKey k => k -> DHT ip [NodeInfo ip] +getClosest :: Eq ip => TableKey k => k -> DHT ip [NodeInfo KMessageOf ip ()] getClosest node = do k <- asks (optK . options) kclosest k node <$> getTable diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs index 540b74f9..d7aed430 100644 --- a/src/Network/DHT/Mainline.hs +++ b/src/Network/DHT/Mainline.hs @@ -1,18 +1,76 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} module Network.DHT.Mainline where -import Network.Socket -import Network.RPC -import Network.KRPC.Message as KRPC -import Data.BEncode as BE -import qualified Data.ByteString.Lazy as L -import Network.BitTorrent.Address as BT (NodeId) +import Data.BEncode as BE +import Data.Bits +import Data.ByteString (ByteString) +import Data.ByteString.Base16 as Base16 +import qualified Data.ByteString.Char8 as Char8 +import qualified Data.ByteString.Lazy as L +import Data.Default +import Data.LargeWord +import Data.Serialize as S +import Data.String +import Data.Typeable +import Network.KRPC.Message as KRPC +import qualified Network.RPC as RPC (NodeId) + ;import Network.RPC as RPC hiding (NodeId) +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) + +-- | Each node has a globally unique identifier known as the \"node +-- ID.\" +-- +-- Normally, /this/ node id should be saved between invocations +-- of the client software. +newtype NodeId = NodeId Word160 + deriving (Show, Eq, Ord, Typeable, Bits, FiniteBits) + +instance BEncode NodeId where + toBEncode (NodeId w) = toBEncode $ S.encode w + fromBEncode bval = fromBEncode bval >>= S.decode + +-- | NodeId size in bytes. +nodeIdSize :: Int +nodeIdSize = 20 + + +-- instance BEncode NodeId where TODO + +-- TODO: put this somewhere appropriate +instance (Serialize a, Serialize b) => Serialize (LargeKey a b) where + put (LargeKey lo hi) = put hi >> put lo + get = flip LargeKey <$> get <*> get + +instance Serialize NodeId where + get = NodeId <$> get + {-# INLINE get #-} + put (NodeId bs) = put bs + {-# INLINE put #-} + +-- | ASCII encoded. +instance IsString NodeId where + fromString str + | length str == nodeIdSize = NodeId (either error id $ S.decode (fromString str :: ByteString)) + | length str == 2 * nodeIdSize = NodeId (either error id $ S.decode (fst $ Base16.decode $ fromString str)) + | otherwise = error "fromString: invalid NodeId length" + {-# INLINE fromString #-} + +-- | Meaningless node id, for testing purposes only. +instance Default NodeId where + def = NodeId 0 + +-- | base16 encoded. +instance Pretty NodeId where + pPrint (NodeId nid) = PP.text $ Char8.unpack $ Base16.encode $ S.encode nid instance Envelope KMessageOf where type TransactionID KMessageOf = KRPC.TransactionId - type NodeId KMessageOf = BT.NodeId + type NodeId KMessageOf = Network.DHT.Mainline.NodeId envelopePayload (Q q) = queryArgs q envelopePayload (R r) = respVals r diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index f31a3cd6..efd59f32 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -84,7 +84,9 @@ import Network.Socket hiding (listen) import Network.Socket.ByteString as BS import System.IO.Error import System.Timeout +#ifdef VERSION_bencoding import Network.DHT.Mainline +#endif {----------------------------------------------------------------------- @@ -268,15 +270,9 @@ data QueryFailure instance Exception QueryFailure -#ifdef VERSION_bencoding -sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () -sendMessage sock addr a = do - liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr -#else sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () sendMessage sock addr a = do liftIO $ sendManyTo sock [a] addr -#endif genTransactionId :: TransactionCounter -> IO TransactionId genTransactionId ref = do @@ -309,13 +305,8 @@ unregisterQuery cid ref = do -- (sendmsg EINVAL) -#ifdef VERSION_bencoding -sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () +sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO () sendQuery sock addr q = handle sockError $ sendMessage sock addr q -#else -sendQuery :: Serialize a => Socket -> SockAddr -> a -> IO () -sendQuery sock addr q = handle sockError $ sendMessage sock addr (S.encode q) -#endif where sockError :: IOError -> IO () sockError _ = throwIO SendFailed @@ -351,12 +342,17 @@ queryK addr params kont = do ares <- registerQuery (tid, addr) pendingCalls #ifdef VERSION_bencoding - let q = KQuery (toBEncode params) (methodName queryMethod) tid + let q = Q (KQuery (toBEncode params) (methodName queryMethod) tid) + qb = encodePayload q :: KMessage + qbs = encodeHeaders () qb :: BC.ByteString #else let q = Tox.Message (methodName queryMethod) cli tid params cli = error "TODO TOX client node id" + ctx = error "TODO TOX ToxCipherContext" + qb = encodePayload q :: Tox.Message BC.ByteString + qbs = encodeHeaders ctx qb :: BC.ByteString #endif - sendQuery sock addr q + sendQuery sock addr qbs `onException` unregisterQuery (tid, addr) pendingCalls timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do @@ -463,14 +459,7 @@ runHandler h addr m = Lifted.catches wrapper failbacks Right a -> do -- KQueryArgs $(logDebugS) "handler.success" signature -#ifdef VERSION_bencoding return $ Right a -#else - let cli = error "TODO TOX client node id" - messageid = error "TODO TOX message response id" - -- TODO: ReflectedIP addr ?? - return $ Right $ Tox.Message messageid cli (queryId m) a -#endif failbacks = [ E.Handler $ \ (e :: HandlerFailure) -> do @@ -528,16 +517,18 @@ handleQuery raw q addr = void $ fork $ do Manager {..} <- getManager res <- dispatchHandler q addr #ifdef VERSION_bencoding - let resbe = either toBEncode toBEncode res + let res' = either E id res + resbe = either toBEncode toBEncode res $(logOther "q") $ T.unlines [ either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) , "==>" , either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) ] - sendMessage sock addr resbe + sendMessage sock addr $ encodeHeaders () res' #else -- Errors not sent for Tox. - either (const $ return ()) (sendMessage sock addr . S.encode) res + let ctx = error "TODO TOX ToxCipherContext 2" + either (const $ return ()) (sendMessage sock addr . encodeHeaders ctx) res #endif handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () @@ -570,16 +561,17 @@ listener :: MonadKRPC h m => m () listener = do Manager {..} <- getManager fix $ \again -> do + let ctx = error "TODO TOX ToxCipherContext 3" (bs, addr) <- liftIO $ do handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) #ifdef VERSION_bencoding case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of #else - case return bs >>= \r -> (,) r <$> decode bs of + case return bs >>= \r -> (,) r <$> decodeHeaders ctx bs of #endif -- TODO ignore unknown messages at all? #ifdef VERSION_bencoding - Left e -> liftIO $ sendMessage sock addr $ unknownMessage e + Left e -> liftIO $ sendMessage sock addr $ encodeHeaders () (E (unknownMessage e) :: KMessage) #else Left _ -> return () -- TODO TOX send unknownMessage error #endif diff --git a/src/Network/RPC.hs b/src/Network/RPC.hs index 727422fd..7fb0e571 100644 --- a/src/Network/RPC.hs +++ b/src/Network/RPC.hs @@ -1,16 +1,22 @@ {-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} -{-# LANGUAGE DeriveDataTypeable #-} module Network.RPC where +import Data.Bits import Data.ByteString (ByteString) import Data.Kind (Constraint) import Data.Data import Network.Socket +import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) +import Data.Serialize as S +import qualified Data.ByteString.Char8 as Char8 +import Data.ByteString.Base16 as Base16 data MessageClass = Error | Query | Response deriving (Eq,Ord,Enum,Bounded,Data,Show,Read) @@ -36,6 +42,22 @@ class Envelope envelope where -- Returns: response message envelope buildReply :: NodeId envelope -> SockAddr -> envelope a -> b -> envelope b +-- | In Kademlia, the distance metric is XOR and the result is +-- interpreted as an unsigned integer. +newtype NodeDistance nodeid = NodeDistance nodeid + deriving (Eq, Ord) + +-- | distance(A,B) = |A xor B| Smaller values are closer. +distance :: Bits nid => nid -> nid -> NodeDistance nid +distance a b = NodeDistance $ xor a b + +instance Serialize nodeid => Show (NodeDistance nodeid) where + show (NodeDistance w) = Char8.unpack $ Base16.encode $ S.encode w + +instance Serialize nodeid => Pretty (NodeDistance nodeid) where + pPrint n = text $ show n + + class Envelope envelope => WireFormat raw envelope where type SerializableTo raw :: * -> Constraint type CipherContext raw envelope -- cgit v1.2.3