-- | -- Copyright : (c) Sam Truzjan 2013 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module provides message datatypes which is used for /Node to -- Node/ communication. Bittorrent DHT is based on Kademlia -- specification, but have a slightly different set of messages -- which have been adopted for /peer/ discovery mechanism. Messages -- are sent over "Network.KRPC" protocol, but normally you should -- use "Network.BitTorrent.DHT.Session" to send and receive -- messages. -- -- DHT queries are not /recursive/, they are /iterative/. This means -- that /querying/ node . While original specification (namely BEP5) -- do not impose any restrictions for /quered/ node behaviour, a -- good DHT implementation should follow some rules to guarantee -- that unlimit recursion will never happen. The following set of -- restrictions: -- -- * 'Ping' query must not trigger any message. -- -- * 'FindNode' query /may/ trigger 'Ping' query to check if a -- list of nodes to return is /good/. See -- 'Network.DHT.Routing.Routing' for further explanation. -- -- * 'GetPeers' query may trigger 'Ping' query for the same reason. -- -- * 'Announce' query must trigger 'Ping' query for the same reason. -- -- It is easy to see that the most long RPC chain is: -- -- @ -- | | | -- Node_A | | -- | FindNode or GetPeers or Announce | | -- | ------------------------------------> Node_B | -- | | Ping | -- | | -----------> | -- | | Node_C -- | | Pong | -- | NodeFound or GotPeers or Announced | <----------- | -- | <------------------------------------- Node_B | -- Node_A | | -- | | | -- @ -- -- where in some cases 'Node_C' is 'Node_A'. -- -- For more info see: -- -- -- For Kamelia messages see original Kademlia paper: -- -- {-# LANGUAGE CPP #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} module Network.DHT.Mainline ( -- * Envelopes Query (..) , Response (..) -- * Queries -- ** ping , Ping (..) -- ** find_node , FindNode (..) , NodeFound (..) , bep42s -- , bep42 #ifdef VERSION_bencoding -- ** get_peers , PeerList , GetPeers (..) , GotPeers (..) -- ** announce_peer , Announce (..) , Announced (..) #endif , DHTData(..) , SessionTokens(..) , grantToken , checkToken ) where import Data.String import Control.Applicative import Data.Bool #ifdef VERSION_bencoding import Data.BEncode as BE import Data.BEncode.BDict as BDict hiding (map) #else import qualified Network.DatagramServer.Tox as Tox import Network.DatagramServer.Tox (NodeId) import Data.Word import Control.Monad #endif import Network.KRPC.Method import Network.Address hiding (NodeId) import Data.Bits import Data.ByteString (ByteString) import qualified Data.ByteString as BS import Data.Digest.CRC32C import Data.List as L import Data.Monoid import Data.Serialize as S import Data.Typeable import Data.Word import Network import Network.DatagramServer import Network.DatagramServer.Mainline import Data.Maybe import Data.Torrent (InfoHash) import Network.BitTorrent.DHT.Token as T import Network.BitTorrent.DHT.ContactInfo #ifdef VERSION_bencoding import Network.DatagramServer () #endif import Network.DatagramServer.Types hiding (Query,Response) import Network.DHT.Types import Network.DHT.Routing import Data.Time import Control.Concurrent.STM import System.Random import Data.Hashable {----------------------------------------------------------------------- -- envelopes -----------------------------------------------------------------------} #ifndef VERSION_bencoding type BKey = ByteString #endif node_id_key :: BKey node_id_key = "id" read_only_key :: BKey read_only_key = "ro" #ifdef VERSION_bencoding instance BEncode a => BEncode (Query KMessageOf a) where toBEncode Query {..} = toDict $ BDict.union ( node_id_key .=! queringNodeId queryExtra .: read_only_key .=? bool Nothing (Just (1 :: Integer)) (queryIsReadOnly queryExtra) .: endDict) (dict (toBEncode queryParams)) where dict (BDict d) = d dict _ = error "impossible: instance BEncode (Query a)" fromBEncode v = Query <$> (MainlineQuery <$> fromDict (field (req node_id_key)) v <*> fromDict (fromMaybe False <$>? read_only_key) v) <*> fromBEncode v #else data Query a = Query a #endif #ifdef VERSION_bencoding instance BEncode a => BEncode (Response KMessageOf a) where toBEncode = toBEncode . toQuery where toQuery (Response (MainlineResponse nid) a) = Query (MainlineQuery nid False) a fromBEncode b = fromQuery <$> fromBEncode b where fromQuery (Query (MainlineQuery nid _) a) = Response (MainlineResponse nid) a #else data Response KMessageOf a = Response KMessageOf a #endif {----------------------------------------------------------------------- -- ping method -----------------------------------------------------------------------} -- / The most basic query is a ping. Ping query is used to check if a -- quered node is still alive. -- data Ping = Ping Tox.Nonce8 deriving (Show, Eq, Typeable) #ifdef VERSION_bencoding instance BEncode (Ping KMessageOf) where toBEncode Ping = toDict endDict fromBEncode _ = pure Ping #else instance Serialize (Query (Ping KMessageOf)) where get = do b <- get when ( (b::Word8) /= 0) $ fail "Bad ping request" nonce <- get return $ Query (Ping nonce) put (Query (Ping nonce)) = do put (0 :: Word8) put nonce instance Serialize (Response Ping) where get = do b <- get when ( (b::Word8) /= 1) $ fail "Bad ping response" nonce <- get return $ Response (Ping nonce) put (Response (Ping nonce)) = do put (1 :: Word8) put nonce #endif -- | \"q\" = \"ping\" instance KRPC KMessageOf (Query KMessageOf (Ping KMessageOf)) (Response KMessageOf (Ping KMessageOf)) where method = "ping" makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) makeResponseExtra _ nid _ _ = return $ MainlineResponse nid -- TODO KError Sender/Responder messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r {----------------------------------------------------------------------- -- find_node method -----------------------------------------------------------------------} -- / Find node is used to find the contact information for a node -- given its ID. -- data FindNode KMessageOf ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes -- deriving (Show, Eq, Typeable) target_key :: BKey target_key = "target" #ifdef VERSION_bencoding instance Typeable ip => BEncode (FindNode KMessageOf ip) where toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict fromBEncode = fromDict $ FindNode <$>! target_key #else instance Serialize (Query KMessageOf (FindNode KMessageOf ip)) where get = do nid <- get nonce <- get return $ Query (FindNode nid nonce) put (Query (FindNode nid nonce)) = do put nid put nonce #endif -- | When a node receives a 'FindNode' query, it should respond with a -- the compact node info for the target node or the K (8) closest good -- nodes in its own routing table. -- #ifdef VERSION_bencoding -- newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable) #else data NodeFound KMessageOf ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 deriving (Show, Eq, Typeable) #endif -- Tox: send_nodes nodes_key :: BKey nodes_key = "nodes" -- Convert IPv4 address. Useful for using variadic IP type. from4 :: forall dht u s. Address s => NodeInfo dht IPv4 u -> Either String (NodeInfo dht s u) from4 n = maybe (Left "Error converting IPv4") Right $ traverseAddress (fromAddr :: IPv4 -> Maybe s) n #ifdef VERSION_bencoding binary :: Serialize a => BKey -> BE.Get [a] binary k = field (req k) >>= either (fail . format) return . runGet (many get) where format str = "fail to deserialize " ++ show k ++ " field: " ++ str instance Address ip => BEncode (NodeFound KMessageOf ip) where toBEncode (NodeFound ns) = toDict $ nodes_key .=! runPut (mapM_ put ns) .: endDict -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32) fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval) #else instance Serialize (Response KMessageOf (NodeFound KMessageOf ip)) where get = do count <- get :: Get Word8 nodes <- sequence $ replicate (fromIntegral count) get nonce <- get :: Get Tox.Nonce8 return $ Response $ NodeFound nodes nonce put (Response (NodeFound nodes nonce)) = do put (fromIntegral (length nodes) :: Word8) mapM_ put nodes put nonce #endif -- | \"q\" == \"find_node\" instance (Address ip, Typeable ip) => KRPC KMessageOf (Query KMessageOf (FindNode KMessageOf ip)) (Response KMessageOf (NodeFound KMessageOf ip)) where method = "find_node" makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) makeResponseExtra _ nid _ _ = return $ MainlineResponse nid -- TODO KError Sender/Responder messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r {----------------------------------------------------------------------- -- get_peers method -----------------------------------------------------------------------} -- | Get peers associated with a torrent infohash. newtype GetPeers ip = GetPeers InfoHash deriving (Show, Eq, Typeable) info_hash_key :: BKey info_hash_key = "info_hash" instance Typeable ip => BEncode (GetPeers ip) where toBEncode (GetPeers ih) = toDict $ info_hash_key .=! ih .: endDict fromBEncode = fromDict $ GetPeers <$>! info_hash_key type PeerList ip = Either [NodeInfo KMessageOf ip ()] [PeerAddr ip] data GotPeers ip = GotPeers { -- | If the queried node has no peers for the infohash, returned -- the K nodes in the queried nodes routing table closest to the -- infohash supplied in the query. peers :: PeerList ip -- | The token value is a required argument for a future -- announce_peer query. , grantedToken :: Token } deriving (Show, Eq, Typeable) peers_key :: BKey peers_key = "values" token_key :: BKey token_key = "token" name_key :: BKey name_key = "name" instance (Typeable ip, Serialize ip) => BEncode (GotPeers ip) where toBEncode GotPeers {..} = toDict $ case peers of Left ns -> nodes_key .=! runPut (mapM_ put ns) .: token_key .=! grantedToken .: endDict Right ps -> token_key .=! grantedToken .: peers_key .=! L.map S.encode ps .: endDict fromBEncode = fromDict $ do mns <- optional (binary nodes_key) -- "nodes" tok <- field (req token_key) -- "token" mps <- optional (field (req peers_key) >>= decodePeers) -- "values" case (Right <$> mps) <|> (Left <$> mns) of Nothing -> fail "get_peers: neihter peers nor nodes key is valid" Just xs -> pure $ GotPeers xs tok where decodePeers = either fail pure . mapM S.decode -- | \"q" = \"get_peers\" instance (Typeable ip, Serialize ip) => KRPC KMessageOf (Query KMessageOf (GetPeers ip)) (Response KMessageOf (GotPeers ip)) where method = "get_peers" makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) makeResponseExtra _ nid _ _ = return $ MainlineResponse nid -- TODO KError Sender/Responder messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r {----------------------------------------------------------------------- -- announce method -----------------------------------------------------------------------} -- | Announce that the peer, controlling the querying node, is -- downloading a torrent on a port. data Announce = Announce { -- | If set, the 'port' field should be ignored and the source -- port of the UDP packet should be used as the peer's port -- instead. This is useful for peers behind a NAT that may not -- know their external port, and supporting uTP, they accept -- incoming connections on the same port as the DHT port. impliedPort :: Bool -- | infohash of the torrent; , topic :: InfoHash -- | some clients announce the friendly name of the torrent here. , announcedName :: Maybe ByteString -- | the port /this/ peer is listening; , port :: PortNumber -- TODO: optional boolean "seed" key -- | received in response to a previous get_peers query. , sessionToken :: Token } deriving (Show, Eq, Typeable) port_key :: BKey port_key = "port" implied_port_key :: BKey implied_port_key = "implied_port" instance BEncode Announce where toBEncode Announce {..} = toDict $ implied_port_key .=? flagField impliedPort .: info_hash_key .=! topic .: name_key .=? announcedName .: port_key .=! port .: token_key .=! sessionToken .: endDict where flagField flag = if flag then Just (1 :: Int) else Nothing fromBEncode = fromDict $ do Announce <$> (boolField <$> optional (field (req implied_port_key))) <*>! info_hash_key <*>? name_key <*>! port_key <*>! token_key where boolField = maybe False (/= (0 :: Int)) -- | The queried node must verify that the token was previously sent -- to the same IP address as the querying node. Then the queried node -- should store the IP address of the querying node and the supplied -- port number under the infohash in its store of peer contact -- information. data Announced = Announced deriving (Show, Eq, Typeable) instance BEncode Announced where toBEncode _ = toBEncode ( Ping :: Ping KMessageOf ) fromBEncode _ = pure Announced -- | \"q" = \"announce\" instance KRPC KMessageOf (Query KMessageOf Announce) (Response KMessageOf Announced) where method = "announce_peer" makeQueryExtra _ nid _ _ = return $ MainlineQuery nid False -- TODO: check for NAT issues. (BEP 43) makeResponseExtra _ nid _ _ = return $ MainlineResponse nid -- TODO KError Sender/Responder messageSender (Q q) _ = queringNodeId $ queryExtra $ queryArgs q messageResponder _ (R r) = queredNodeId $ responseExtra $ respVals r -- | Yields all 8 DHT neighborhoods available to you given a particular ip -- address. bep42s :: Address a => a -> NodeId KMessageOf -> [NodeId KMessageOf] bep42s addr (NodeId r) = mapMaybe (bep42 addr) rs where rs = map (NodeId . change3bits r) [0..7] -- change3bits :: ByteString -> Word8 -> ByteString -- change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n) change3bits :: (Num b, Bits b) => b -> b -> b change3bits bs n = (bs .&. complement 7) .|. n -- | Modifies a purely random 'NodeId' to one that is related to a given -- routable address in accordance with BEP 42. bep42 :: Address a => a -> NodeId KMessageOf -> Maybe (NodeId KMessageOf) bep42 addr (NodeId r) | Just ip <- fmap S.encode (fromAddr addr :: Maybe IPv4) <|> fmap S.encode (fromAddr addr :: Maybe IPv6) = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) | otherwise = Nothing where ip4mask = "\x03\x0f\x3f\xff" :: ByteString ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString nbhood_select = (fromIntegral r :: Word8) .&. 7 retr n = pure $ BS.drop (nodeIdSize - n) $ S.encode r crc = flip shiftL (finiteBitSize (NodeId undefined) - 32) . fromIntegral . crc32c . BS.pack applyMask ip = case BS.zipWith (.&.) msk ip of (b:bs) -> (b .|. shiftL nbhood_select 5) : bs bs -> bs where msk | BS.length ip == 4 = ip4mask | otherwise = ip6mask {----------------------------------------------------------------------- -- Tokens policy -----------------------------------------------------------------------} data SessionTokens = SessionTokens { tokenMap :: !TokenMap , lastUpdate :: !UTCTime , maxInterval :: !NominalDiffTime } nullSessionTokens :: IO SessionTokens nullSessionTokens = SessionTokens <$> (tokens <$> randomIO) <*> getCurrentTime <*> pure defaultUpdateInterval -- TODO invalidate *twice* if needed invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens invalidateTokens curTime ts @ SessionTokens {..} | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens { tokenMap = update tokenMap , lastUpdate = curTime , maxInterval = maxInterval } | otherwise = ts {----------------------------------------------------------------------- -- Tokens -----------------------------------------------------------------------} tryUpdateSecret :: TVar SessionTokens -> IO () tryUpdateSecret toks = do curTime <- getCurrentTime atomically $ modifyTVar' toks (invalidateTokens curTime) grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token grantToken sessionTokens addr = do tryUpdateSecret sessionTokens toks <- readTVarIO sessionTokens return $ T.lookup addr $ tokenMap toks -- | Throws 'HandlerError' if the token is invalid or already -- expired. See 'TokenMap' for details. checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool checkToken sessionTokens addr questionableToken = do tryUpdateSecret sessionTokens toks <- readTVarIO sessionTokens return $ T.member addr questionableToken (tokenMap toks) -------------------------- instance Kademlia KMessageOf where data DHTData KMessageOf ip = TorrentData { contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. } dhtAdjustID _ fallback ip0 arrival = fromMaybe fallback $ do ip <- fromSockAddr ip0 -- :: Maybe ip let _ = ip `asTypeOf` nodeAddr (foreignNode arrival) listToMaybe $ rank id (nodeId $ foreignNode arrival) $ bep42s ip fallback namePing _ = "ping" nameFindNodes _ = "find-nodes" initializeDHTData = TorrentData <$> newTVarIO def <*> (newTVarIO =<< nullSessionTokens) deriving instance IsString (QueryMethod dht) => IsString (Method dht param result) deriving instance BEncode (QueryMethod dht) => BEncode (Method dht param result)