From 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sat, 28 Sep 2019 13:43:29 -0400 Subject: Factor out some new libraries word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search --- dht/src/Network/BitTorrent/DHT/ContactInfo.hs | 254 +++++ dht/src/Network/BitTorrent/DHT/Readme.md | 13 + dht/src/Network/BitTorrent/DHT/Token.hs | 201 ++++ dht/src/Network/BitTorrent/MainlineDHT.hs | 1169 +++++++++++++++++++++ dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs | 24 + 5 files changed, 1661 insertions(+) create mode 100644 dht/src/Network/BitTorrent/DHT/ContactInfo.hs create mode 100644 dht/src/Network/BitTorrent/DHT/Readme.md create mode 100644 dht/src/Network/BitTorrent/DHT/Token.hs create mode 100644 dht/src/Network/BitTorrent/MainlineDHT.hs create mode 100644 dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs (limited to 'dht/src/Network/BitTorrent') diff --git a/dht/src/Network/BitTorrent/DHT/ContactInfo.hs b/dht/src/Network/BitTorrent/DHT/ContactInfo.hs new file mode 100644 index 00000000..ec7e6658 --- /dev/null +++ b/dht/src/Network/BitTorrent/DHT/ContactInfo.hs @@ -0,0 +1,254 @@ +{-# LANGUAGE BangPatterns #-} +module Network.BitTorrent.DHT.ContactInfo + ( PeerStore + , PeerAddr(..) + , Network.BitTorrent.DHT.ContactInfo.lookup + , Network.BitTorrent.DHT.ContactInfo.freshPeers + , Network.BitTorrent.DHT.ContactInfo.insertPeer + , deleteOlderThan + , knownSwarms + ) where + +import Control.Applicative +import Data.Default +import Data.List as L +import Data.Maybe +import Data.HashMap.Strict as HM +import Data.Serialize +import Data.Semigroup +import Data.Wrapper.PSQ as PSQ +import Data.Time.Clock.POSIX +import Data.ByteString (ByteString) +import Data.Word + +import Data.Torrent +import Network.Address + +-- {- +-- import Data.HashMap.Strict as HM +-- +-- import Data.Torrent.InfoHash +-- import Network.Address +-- +-- -- increase prefix when table is too large +-- -- decrease prefix when table is too small +-- -- filter outdated peers +-- +-- {----------------------------------------------------------------------- +-- -- PeerSet +-- -----------------------------------------------------------------------} +-- +-- type PeerSet a = [(PeerAddr, NodeInfo a, Timestamp)] +-- +-- -- compare PSQueue vs Ordered list +-- +-- takeNewest :: PeerSet a -> [PeerAddr] +-- takeNewest = undefined +-- +-- dropOld :: Timestamp -> PeerSet a -> PeerSet a +-- dropOld = undefined +-- +-- insert :: PeerAddr -> Timestamp -> PeerSet a -> PeerSet a +-- insert = undefined +-- +-- type Mask = Int +-- type Size = Int +-- type Timestamp = Int +-- +-- {----------------------------------------------------------------------- +-- -- InfoHashMap +-- -----------------------------------------------------------------------} +-- +-- -- compare handwritten prefix tree versus IntMap +-- +-- data Tree a +-- = Nil +-- | Tip !InfoHash !(PeerSet a) +-- | Bin !InfoHash !Mask !Size !Timestamp (Tree a) (Tree a) +-- +-- insertTree :: InfoHash -> a -> Tree a -> Tree a +-- insertTree = undefined +-- +-- type Prio = Int +-- +-- --shrink :: ContactInfo ip -> Int +-- shrink Nil = Nil +-- shrink (Tip _ _) = undefined +-- shrink (Bin _ _) = undefined +-- +-- {----------------------------------------------------------------------- +-- -- InfoHashMap +-- -----------------------------------------------------------------------} +-- +-- -- compare new design versus HashMap +-- +-- data IntMap k p a +-- type ContactInfo = Map InfoHash Timestamp (Set (PeerAddr IP) Timestamp) +-- +-- data ContactInfo ip = PeerStore +-- { maxSize :: Int +-- , prefixSize :: Int +-- , thisNodeId :: NodeId +-- +-- , count :: Int -- ^ Cached size of the 'peerSet' +-- , peerSet :: HashMap InfoHash [PeerAddr ip] +-- } +-- +-- size :: ContactInfo ip -> Int +-- size = undefined +-- +-- prefixSize :: ContactInfo ip -> Int +-- prefixSize = undefined +-- +-- lookup :: InfoHash -> ContactInfo ip -> [PeerAddr ip] +-- lookup = undefined +-- +-- insert :: InfoHash -> PeerAddr ip -> ContactInfo ip -> ContactInfo ip +-- insert = undefined +-- +-- -- | Limit in size. +-- prune :: NodeId -> Int -> ContactInfo ip -> ContactInfo ip +-- prune pref targetSize Nil = Nil +-- prune pref targetSize (Tip _ _) = undefined +-- +-- -- | Remove expired entries. +-- splitGT :: Timestamp -> ContactInfo ip -> ContactInfo ip +-- splitGT = undefined +-- -} + +-- | Storage used to keep track a set of known peers in client, +-- tracker or DHT sessions. +newtype PeerStore = PeerStore (HashMap InfoHash SwarmData) + +type Timestamp = POSIXTime + +data SwarmData = SwarmData + { peers :: !(PSQ PeerAddr Timestamp) + , name :: !(Maybe ByteString) + } + +-- | This wrapper will serialize an ip address with a '4' or '6' prefix byte +-- to indicate whether it is IPv4 or IPv6. +-- +-- Note: it does not serialize port numbers. +newtype SerializeAddress a = SerializeAddress { unserializeAddress :: a } + +instance Address a => Serialize (SerializeAddress a) where + get = SerializeAddress <$> do + c <- get + case (c::Word8) of + 0x34 -> do ip4 <- get + return $ fromJust $ fromAddr (ip4::IPv4) + 0x36 -> do ip6 <- get + return $ fromJust $ fromAddr (ip6::IPv6) + _ -> return $ error "cannot deserialize non-IP SerializeAddress" + put (SerializeAddress a) + | Just ip4 <- fromAddr a + = put (0x34::Word8) >> put (ip4::IPv4) + | Just ip6 <- fromAddr a + = put (0x36::Word8) >> put (ip6::IPv6) + | otherwise = return $ error "cannot serialize non-IP SerializeAddress" + + +instance Serialize SwarmData where + get = flip SwarmData <$> get + <*> ( PSQ.fromList . L.map parseAddr <$> get ) + where + parseAddr (pid,addr,port) = PeerAddr { peerId = pid + , peerHost = unserializeAddress addr + , peerPort = port + } + :-> 0 + + put SwarmData{..} = do + put name + put $ L.map (\(addr :-> _) -> (peerId addr, SerializeAddress addr, peerPort addr)) + -- XXX: should we serialize the timestamp? + $ PSQ.toList peers + +knownSwarms :: PeerStore -> [ (InfoHash, Int, Maybe ByteString) ] +knownSwarms (PeerStore m) = L.map (\(ih,SwarmData q n) -> (ih,PSQ.size q,n)) $ HM.toList m + +swarmSingleton :: PeerAddr -> SwarmData +swarmSingleton a = SwarmData + { peers = PSQ.singleton a 0 + , name = Nothing } + +swarmInsert :: SwarmData -> SwarmData -> SwarmData +swarmInsert new old = SwarmData + { peers = L.foldl' (\q (a :-> t) -> PSQ.insertWith newerTimeStamp a t q) (peers old) (PSQ.toList $ peers new) + , name = name new <|> name old -- TODO: decodeUtf8' check + } + where + newerTimeStamp newtime oldtime = if newtime > oldtime then newtime else oldtime + +isSwarmOccupied :: SwarmData -> Bool +isSwarmOccupied SwarmData{..} = not $ PSQ.null peers + +-- | Empty store. +instance Default (PeerStore) where + def = PeerStore HM.empty + {-# INLINE def #-} + +instance Semigroup PeerStore where + PeerStore a <> PeerStore b = + PeerStore (HM.unionWith swarmInsert a b) + {-# INLINE (<>) #-} + +-- | Monoid under union operation. +instance Monoid PeerStore where + mempty = def + {-# INLINE mempty #-} + + mappend (PeerStore a) (PeerStore b) = + PeerStore (HM.unionWith swarmInsert a b) + {-# INLINE mappend #-} + +-- | Can be used to store peers between invocations of the client +-- software. +instance Serialize PeerStore where + get = PeerStore . HM.fromList <$> get + put (PeerStore m) = put (L.filter (isSwarmOccupied . snd) $ HM.toList m) + +-- | Returns all peers associated with a given info hash. +lookup :: InfoHash -> PeerStore -> [PeerAddr] +lookup ih (PeerStore m) = maybe [] (PSQ.keys . peers) $ HM.lookup ih m + +batchSize :: Int +batchSize = 64 + +-- | Used in 'get_peers' DHT queries. +freshPeers :: InfoHash -> Timestamp -> PeerStore -> ([PeerAddr], PeerStore) +freshPeers ih tm (PeerStore m) = fromMaybe ([],PeerStore m) $ do + swarm <- HM.lookup ih m + let ps0 = take batchSize $ unfoldr (incomp minView) (peers swarm) + peers' = case reverse ps0 of + (_,psq):_ -> psq + _ -> peers swarm + ps = L.map (key . fst) ps0 + m' = HM.insert ih swarm { peers = L.foldl' (\q p -> PSQ.insert p tm q) peers' ps } m + return $! m' `seq` (ps,PeerStore m') + +incomp :: (x -> Maybe (r,x)) -> x -> Maybe ((r,x),x) +incomp !f !x = do + (result,x') <- f x + pure $! ( (result,x'), x' ) + +-- | Used in 'announce_peer' DHT queries. +insertPeer :: InfoHash -> Maybe ByteString -> PeerAddr -> PeerStore -> PeerStore +insertPeer !ih !name !a !(PeerStore m) = seq a' $ PeerStore (HM.insertWith swarmInsert ih a' m) + where + a' = SwarmData { peers = PSQ.singleton a 0 + , name = name } + +deleteOlderThan :: POSIXTime -> PeerStore -> PeerStore +deleteOlderThan cutoff (PeerStore m) = PeerStore $ HM.mapMaybe gc m + where + gc :: SwarmData -> Maybe SwarmData + gc swarms = fmap (\ps -> swarms { peers = ps }) $ gcPSQ (peers swarms) + + gcPSQ :: PSQKey a => PSQ a Timestamp -> Maybe (PSQ a Timestamp) + gcPSQ ps = case minView ps of + Nothing -> Nothing + Just (_ :-> tm, ps') | tm < cutoff -> gcPSQ ps' + Just _ -> Just ps diff --git a/dht/src/Network/BitTorrent/DHT/Readme.md b/dht/src/Network/BitTorrent/DHT/Readme.md new file mode 100644 index 00000000..e2352f10 --- /dev/null +++ b/dht/src/Network/BitTorrent/DHT/Readme.md @@ -0,0 +1,13 @@ +References +========== + +Some good references excluding BEPs: + +* [Kademlia wiki page][kademlia-wiki] +* [Kademlia: A Peer-to-peer Information System Based on the XOR Metric][kademlia-paper] +* [BitTorrent Mainline DHT Measurement][mldht] +* Profiling a Million User DHT. (paper) + +[kademlia-wiki]: http://en.wikipedia.org/wiki/Kademlia +[kademlia-paper]: http://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf +[mldht]: http://www.cs.helsinki.fi/u/jakangas/MLDHT/ diff --git a/dht/src/Network/BitTorrent/DHT/Token.hs b/dht/src/Network/BitTorrent/DHT/Token.hs new file mode 100644 index 00000000..171cc8be --- /dev/null +++ b/dht/src/Network/BitTorrent/DHT/Token.hs @@ -0,0 +1,201 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- The return value for a query for peers includes an opaque value +-- known as the 'Token'. For a node to announce that its controlling +-- peer is downloading a torrent, it must present the token received +-- from the same queried node in a recent query for peers. When a node +-- attempts to \"announce\" a torrent, the queried node checks the +-- token against the querying node's 'IP' address. This is to prevent +-- malicious hosts from signing up other hosts for torrents. Since the +-- token is merely returned by the querying node to the same node it +-- received the token from, the implementation is not defined. Tokens +-- must be accepted for a reasonable amount of time after they have +-- been distributed. +-- +{-# LANGUAGE GeneralizedNewtypeDeriving, CPP #-} +module Network.BitTorrent.DHT.Token + ( -- * Token + Token + , maxInterval + , toPaddedByteString + , fromPaddedByteString + + -- * Session tokens + , TokenMap + , SessionTokens + , nullSessionTokens + , checkToken + , grantToken + + -- ** Construction + , Network.BitTorrent.DHT.Token.tokens + + -- ** Query + , Network.BitTorrent.DHT.Token.lookup + , Network.BitTorrent.DHT.Token.member + + -- ** Modification + , Network.BitTorrent.DHT.Token.defaultUpdateInterval + , Network.BitTorrent.DHT.Token.update + ) where + +import Control.Arrow +import Control.Monad.State +#ifdef VERSION_bencoding +import Data.BEncode (BEncode) +#endif +import Data.ByteString as BS +import Data.ByteString.Char8 as B8 +import Data.ByteString.Lazy as BL +import Data.ByteString.Lazy.Builder as BS +import qualified Data.ByteString.Base16 as Base16 +import Data.Default +import Data.List as L +import Data.Hashable +import Data.String +import Data.Time +import System.Random +import Control.Concurrent.STM + +-- TODO use ShortByteString + +-- | An opaque value. +newtype Token = Token BS.ByteString + deriving ( Eq, IsString +#ifdef VERSION_bencoding + , BEncode +#endif + ) + +instance Show Token where + show (Token bs) = B8.unpack $ Base16.encode bs + +instance Read Token where + readsPrec i s = pure $ (Token *** B8.unpack) $ Base16.decode (B8.pack s) + +-- | Meaningless token, for testing purposes only. +instance Default Token where + def = makeToken (0::Int) 0 + +-- | Prepend token with 0x20 bytes to fill the available width. +-- +-- If n > 8, then this will also guarantee a nonzero token, which is useful for +-- Tox ping-id values for announce responses. +toPaddedByteString :: Int -> Token -> BS.ByteString +toPaddedByteString n (Token bs) = BS.append (BS.replicate (n - BS.length bs) 0x20) bs + +fromPaddedByteString :: Int -> BS.ByteString -> Token +fromPaddedByteString n bs = Token $ BS.drop (n - len) bs + where + len = BS.length tok where Token tok = def + +-- | The secret value used as salt. +type Secret = Int + +-- The BitTorrent implementation uses the SHA1 hash of the IP address +-- concatenated onto a secret, we use hashable instead. +makeToken :: Hashable a => a -> Secret -> Token +makeToken n s = Token $ toBS $ hashWithSalt s n + where + toBS = toStrict . toLazyByteString . int64BE . fromIntegral +{-# INLINE makeToken #-} + +-- | Constant space 'Node' to 'Token' map based on the secret value. +data TokenMap = TokenMap + { prevSecret :: {-# UNPACK #-} !Secret + , curSecret :: {-# UNPACK #-} !Secret + , generator :: {-# UNPACK #-} !StdGen + } deriving Show + +-- | A new token map based on the specified seed value. Returned token +-- map should be periodicatically 'update'd. +-- +-- Normally, the seed value should vary between invocations of the +-- client software. +tokens :: Int -> TokenMap +tokens seed = (`evalState` mkStdGen seed) $ + TokenMap <$> state next + <*> state next + <*> get + +-- | Get token for the given node. A token becomes invalid after 2 +-- 'update's. +-- +-- Typically used to handle find_peers query. +lookup :: Hashable a => a -> TokenMap -> Token +lookup addr TokenMap {..} = makeToken addr curSecret + +-- | Check if token is valid. +-- +-- Typically used to handle 'Network.DHT.Mainline.Announce' +-- query. If token is invalid the 'Network.KRPC.ProtocolError' should +-- be sent back to the malicious node. +member :: Hashable a => a -> Token -> TokenMap -> Bool +member addr token TokenMap {..} = token `L.elem` valid + where valid = makeToken addr <$> [curSecret, prevSecret] + +-- | Secret changes every five minutes and tokens up to ten minutes old +-- are accepted. +defaultUpdateInterval :: NominalDiffTime +defaultUpdateInterval = 5 * 60 + +-- | Update current tokens. +update :: TokenMap -> TokenMap +update TokenMap {..} = TokenMap + { prevSecret = curSecret + , curSecret = newSecret + , generator = newGen + } + where + (newSecret, newGen) = next generator + +data SessionTokens = SessionTokens + { tokenMap :: !TokenMap + , lastUpdate :: !UTCTime + , maxInterval :: !NominalDiffTime + } + +nullSessionTokens :: IO SessionTokens +nullSessionTokens = SessionTokens + <$> (tokens <$> randomIO) + <*> getCurrentTime + <*> pure defaultUpdateInterval + +-- TODO invalidate *twice* if needed +invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens +invalidateTokens curTime ts @ SessionTokens {..} + | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens + { tokenMap = update tokenMap + , lastUpdate = curTime + , maxInterval = maxInterval + } + | otherwise = ts + +{----------------------------------------------------------------------- +-- Tokens +-----------------------------------------------------------------------} + +tryUpdateSecret :: TVar SessionTokens -> IO () +tryUpdateSecret toks = do + curTime <- getCurrentTime + atomically $ modifyTVar' toks (invalidateTokens curTime) + +grantToken :: Hashable addr => TVar SessionTokens -> addr -> IO Token +grantToken sessionTokens addr = do + tryUpdateSecret sessionTokens + toks <- readTVarIO sessionTokens + return $ Network.BitTorrent.DHT.Token.lookup addr $ tokenMap toks + +-- | Throws 'HandlerError' if the token is invalid or already +-- expired. See 'TokenMap' for details. +checkToken :: Hashable addr => TVar SessionTokens -> addr -> Token -> IO Bool +checkToken sessionTokens addr questionableToken = do + tryUpdateSecret sessionTokens + toks <- readTVarIO sessionTokens + return $ member addr questionableToken (tokenMap toks) + diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs new file mode 100644 index 00000000..89851e88 --- /dev/null +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs @@ -0,0 +1,1169 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveFoldable #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TupleSections #-} +module Network.BitTorrent.MainlineDHT where + +import Control.Applicative +import Control.Arrow +import Control.Concurrent.STM +import Control.Monad +import Crypto.Random +import Data.BEncode as BE +import qualified Data.BEncode.BDict as BE + ;import Data.BEncode.BDict (BKey) +import Data.BEncode.Pretty +import Data.BEncode.Types (BDict) +import Data.Bits +import Data.Bits.ByteString () +import Data.Bool +import Data.ByteArray (ByteArrayAccess) +import qualified Data.ByteString as B + ;import Data.ByteString (ByteString) +import qualified Data.ByteString.Base16 as Base16 +import qualified Data.ByteString.Char8 as C8 +import Data.ByteString.Lazy (toStrict) +import qualified Data.ByteString.Lazy.Char8 as L8 +import Data.Char +import Data.Coerce +import Data.Data +import Data.Default +import Data.Digest.CRC32C +import Data.Function (fix) +import Data.Hashable +#if MIN_VERSION_iproute(1,7,4) +import Data.IP hiding (fromSockAddr) +#else +import Data.IP +#endif +import Data.Maybe +import Data.Monoid +import Data.Ord +import qualified Data.Serialize as S +import Data.Set (Set) +import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) +import Data.Torrent +import Data.Word +import qualified Data.Wrapper.PSQInt as Int +import Debug.Trace +import Network.BitTorrent.MainlineDHT.Symbols +import Network.Kademlia +import Network.Kademlia.Bootstrap +import Network.Address (fromSockAddr, + setPort, sockAddrPort, testIdBit, + toSockAddr, genBucketSample', WantIP(..), + un4map,either4or6,ipFamily) +import Network.BitTorrent.DHT.ContactInfo as Peers +import Network.Kademlia.Search (Search (..)) +import Network.BitTorrent.DHT.Token as Token +import qualified Network.Kademlia.Routing as R + ;import Network.Kademlia.Routing (getTimestamp) +import Network.QueryResponse +import Network.Socket +import System.IO.Error +import System.IO.Unsafe (unsafeInterleaveIO) +import qualified Text.ParserCombinators.ReadP as RP +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif +import qualified Data.Aeson as JSON + ;import Data.Aeson (FromJSON, ToJSON, (.=)) +import Text.Read +import System.Global6 +import Control.TriadCommittee +import Data.TableMethods +import DPut +import DebugTag + +newtype NodeId = NodeId ByteString + deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) + +instance BEncode NodeId where + fromBEncode bval = do + bs <- fromBEncode bval + if B.length bs /= 20 + then Left "Invalid length node id." + else Right $ NodeId bs + + toBEncode (NodeId bs) = toBEncode bs + +instance Show NodeId where + show (NodeId bs) = C8.unpack $ Base16.encode bs + +instance S.Serialize NodeId where + get = NodeId <$> S.getBytes 20 + put (NodeId bs) = S.putByteString bs + +instance FiniteBits NodeId where + finiteBitSize _ = 160 + +instance Read NodeId where + readsPrec _ str + | (bs, xs) <- Base16.decode $ C8.pack str + , B.length bs == 20 + = [ (NodeId bs, drop 40 str) ] + | otherwise = [] + +zeroID :: NodeId +zeroID = NodeId $ B.replicate 20 0 + +data NodeInfo = NodeInfo + { nodeId :: NodeId + , nodeIP :: IP + , nodePort :: PortNumber + } + deriving (Eq,Ord) + +instance ToJSON NodeInfo where + toJSON (NodeInfo nid (IPv4 ip) port) + = JSON.object [ "node-id" .= show nid + , "ipv4" .= show ip + , "port" .= (fromIntegral port :: Int) + ] + toJSON (NodeInfo nid (IPv6 ip6) port) + | Just ip <- un4map ip6 + = JSON.object [ "node-id" .= show nid + , "ipv4" .= show ip + , "port" .= (fromIntegral port :: Int) + ] + | otherwise + = JSON.object [ "node-id" .= show nid + , "ipv6" .= show ip6 + , "port" .= (fromIntegral port :: Int) + ] +instance FromJSON NodeInfo where + parseJSON (JSON.Object v) = do + nidstr <- v JSON..: "node-id" + ip6str <- v JSON..:? "ipv6" + ip4str <- v JSON..:? "ipv4" + portnum <- v JSON..: "port" + ip <- maybe empty (return . IPv6) (ip6str >>= readMaybe) + <|> maybe empty (return . IPv4) (ip4str >>= readMaybe) + let (bs,_) = Base16.decode (C8.pack nidstr) + guard (B.length bs == 20) + return $ NodeInfo (NodeId bs) ip (fromIntegral (portnum :: Word16)) + +hexdigit :: Char -> Bool +hexdigit c = ('0' <= c && c <= '9') || ( 'a' <= c && c <= 'f') || ( 'A' <= c && c <= 'F') + +instance Read NodeInfo where + readsPrec i = RP.readP_to_S $ do + RP.skipSpaces + let n = 40 -- characters in node id. + parseAddr = RP.between (RP.char '(') (RP.char ')') (RP.munch (/=')')) + RP.+++ RP.munch (not . isSpace) + nodeidAt = do hexhash <- sequence $ replicate n (RP.satisfy hexdigit) + RP.char '@' RP.+++ RP.satisfy isSpace + addrstr <- parseAddr + nid <- case Base16.decode $ C8.pack hexhash of + (bs,_) | B.length bs==20 -> return (NodeId bs) + _ -> fail "Bad node id." + return (nid,addrstr) + (nid,addrstr) <- ( nodeidAt RP.+++ ( (zeroID,) <$> parseAddr) ) + let raddr = do + ip <- RP.between (RP.char '[') (RP.char ']') + (IPv6 <$> RP.readS_to_P (readsPrec i)) + RP.+++ (IPv4 <$> RP.readS_to_P (readsPrec i)) + _ <- RP.char ':' + port <- toEnum <$> RP.readS_to_P (readsPrec i) + return (ip, port) + + (ip,port) <- case RP.readP_to_S raddr addrstr of + [] -> fail "Bad address." + ((ip,port),_):_ -> return (ip,port) + return $ NodeInfo nid ip port + + + +-- The Hashable instance depends only on the IP address and port number. It is +-- used to compute the announce token. +instance Hashable NodeInfo where + hashWithSalt s ni = hashWithSalt s (nodeIP ni , nodePort ni) + {-# INLINE hashWithSalt #-} + + +instance Show NodeInfo where + showsPrec _ (NodeInfo nid ip port) = + shows nid . ('@' :) . showsip . (':' :) . shows port + where + showsip + | IPv4 ip4 <- ip = shows ip4 + | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = shows ip4 + | otherwise = ('[' :) . shows ip . (']' :) + +{- + +-- | KRPC 'compact list' compatible encoding: contact information for +-- nodes is encoded as a 26-byte string. Also known as "Compact node +-- info" the 20-byte Node ID in network byte order has the compact +-- IP-address/port info concatenated to the end. + get = NodeInfo <$> (NodeId <$> S.getBytes 20 ) <*> S.get <*> S.get +-} + +getNodeInfo4 :: S.Get NodeInfo +getNodeInfo4 = NodeInfo <$> (NodeId <$> S.getBytes 20) + <*> (IPv4 <$> S.get) + <*> S.get + +putNodeInfo4 :: NodeInfo -> S.Put +putNodeInfo4 (NodeInfo (NodeId nid) ip port) + | IPv4 ip4 <- ip = put4 ip4 + | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = put4 ip4 + | otherwise = return () + where + put4 ip4 = S.putByteString nid >> S.put ip4 >> S.put port + +getNodeInfo6 :: S.Get NodeInfo +getNodeInfo6 = NodeInfo <$> (NodeId <$> S.getBytes 20) + <*> (IPv6 <$> S.get) + <*> S.get + +putNodeInfo6 :: NodeInfo -> S.Put +putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port) + = S.putByteString nid >> S.put ip >> S.put port +putNodeInfo6 _ = return () + + +-- | TODO: This should depend on the bind address to support IPv4-only. For +-- now, in order to support dual-stack listen, we're going to assume IPv6 is +-- wanted and map IPv4 addresses accordingly. +nodeAddr :: NodeInfo -> SockAddr +nodeAddr (NodeInfo _ ip port) = + case ip of + IPv4 ip4 -> setPort port $ toSockAddr (ipv4ToIPv6 ip4) + IPv6 ip6 -> setPort port $ toSockAddr ip6 + +nodeInfo :: NodeId -> SockAddr -> Either String NodeInfo +nodeInfo nid saddr + | Just ip <- fromSockAddr saddr + , Just port <- sockAddrPort saddr = Right $ NodeInfo nid ip port + | otherwise = Left "Address family not supported." + +-- | Types of RPC errors. +data ErrorCode + -- | Some error doesn't fit in any other category. + = GenericError + + -- | Occurs when server fail to process procedure call. + | ServerError + + -- | Malformed packet, invalid arguments or bad token. + | ProtocolError + + -- | Occurs when client trying to call method server don't know. + | MethodUnknown + deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data) + +-- | According to the table: +-- +instance Enum ErrorCode where + fromEnum GenericError = 201 + fromEnum ServerError = 202 + fromEnum ProtocolError = 203 + fromEnum MethodUnknown = 204 + {-# INLINE fromEnum #-} + toEnum 201 = GenericError + toEnum 202 = ServerError + toEnum 203 = ProtocolError + toEnum 204 = MethodUnknown + toEnum _ = GenericError + {-# INLINE toEnum #-} + +instance BEncode ErrorCode where + toBEncode = toBEncode . fromEnum + {-# INLINE toBEncode #-} + fromBEncode b = toEnum <$> fromBEncode b + {-# INLINE fromBEncode #-} + +data Error = Error + { errorCode :: !ErrorCode -- ^ The type of error. + , errorMessage :: !ByteString -- ^ Human-readable text message. + } deriving ( Show, Eq, Ord, Typeable, Data, Read ) + +newtype TransactionId = TransactionId ByteString + deriving (Eq, Ord, Show, BEncode) + +newtype Method = Method ByteString + deriving (Eq, Ord, Show, BEncode) + +data Message a = Q { msgOrigin :: NodeId + , msgID :: TransactionId + , qryPayload :: a + , qryMethod :: Method + , qryReadOnly :: Bool } + + | R { msgOrigin :: NodeId + , msgID :: TransactionId + , rspPayload :: Either Error a + , rspReflectedIP :: Maybe SockAddr } + +showBE :: BValue -> String +showBE bval = L8.unpack (showBEncode bval) + +instance BE.BEncode (Message BValue) where + toBEncode m = encodeMessage m + {- + in case m of + Q {} -> trace ("encoded(query): "++showBE r) r + R {} -> trace ("encoded(response): "++showBE r) r -} + fromBEncode bval = decodeMessage bval + {- + in case r of + Left e -> trace (show e) r + Right (Q {}) -> trace ("decoded(query): "++showBE bval) r + Right (R {}) -> trace ("decoded(response): "++showBE bval) r -} + +decodeMessage :: BValue -> Either String (Message BValue) +decodeMessage = fromDict $ do + key <- lookAhead (field (req "y")) + let _ = key :: BKey + f <- case key of + "q" -> do a <- field (req "a") + g <- either fail return $ flip fromDict a $ do + who <- field (req "id") + ro <- fromMaybe False <$> optional (field (req "ro")) + return $ \meth tid -> Q who tid a meth ro + meth <- field (req "q") + return $ g meth + "r" -> do ip <- do + ipstr <- optional (field (req "ip")) + mapM (either fail return . decodeAddr) ipstr + vals <- field (req "r") + either fail return $ flip fromDict vals $ do + who <- field (req "id") + return $ \tid -> R who tid (Right vals) ip + "e" -> do (ecode,emsg) <- field (req "e") + ip <- do + ipstr <- optional (field (req "ip")) + mapM (either fail return . decodeAddr) ipstr + -- FIXME:Spec does not give us the NodeId of the sender. + -- Using 'zeroID' as place holder. + -- We should ignore the msgOrigin for errors in 'updateRouting'. + -- We should consider making msgOrigin a Maybe value. + return $ \tid -> R zeroID tid (Left (Error ecode emsg)) ip + _ -> fail $ "Mainline message is not a query, response, or an error: " + ++ show key + tid <- field (req "t") + return $ f (tid :: TransactionId) + + +encodeMessage :: Message BValue -> BValue +encodeMessage (Q origin tid a meth ro) + = case a of + BDict args -> encodeQuery tid meth (BDict $ genericArgs origin ro `BE.union` args) + _ -> encodeQuery tid meth a -- XXX: Not really a valid query. +encodeMessage (R origin tid v ip) + = case v of + Right (BDict vals) -> encodeResponse tid (BDict $ genericArgs origin False `BE.union` vals) ip + Left err -> encodeError tid err + + +encodeAddr :: SockAddr -> ByteString +encodeAddr = either encode4 encode6 . either4or6 + where + encode4 (SockAddrInet port addr) + = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port)) + + encode6 (SockAddrInet6 port _ addr _) + = S.runPut (S.put addr >> S.putWord16be (fromIntegral port)) + encode6 _ = B.empty + +decodeAddr :: ByteString -> Either String SockAddr +decodeAddr bs = S.runGet g bs + where + g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be) + | otherwise = do host <- S.get -- TODO: Is this right? + port <- fromIntegral <$> S.getWord16be + return $ SockAddrInet6 port 0 host 0 + +genericArgs :: BEncode a => a -> Bool -> BDict +genericArgs nodeid ro = + "id" .=! nodeid + .: "ro" .=? bool Nothing (Just (1 :: Int)) ro + .: endDict + +encodeError :: BEncode a => a -> Error -> BValue +encodeError tid (Error ecode emsg) = encodeAny tid "e" (ecode,emsg) id + +encodeResponse :: (BEncode tid, BEncode vals) => + tid -> vals -> Maybe SockAddr -> BValue +encodeResponse tid rvals rip = + encodeAny tid "r" rvals ("ip" .=? (BString . encodeAddr <$> rip) .:) + +encodeQuery :: (BEncode args, BEncode tid, BEncode method) => + tid -> method -> args -> BValue +encodeQuery tid qmeth qargs = encodeAny tid "q" qmeth ("a" .=! qargs .:) + +encodeAny :: + (BEncode tid, BEncode a) => + tid -> BKey -> a -> (BDict -> BDict) -> BValue +encodeAny tid key val aux = toDict $ + aux $ key .=! val + .: "t" .=! tid + .: "y" .=! key + .: endDict + + +showPacket :: ([L8.ByteString] -> [L8.ByteString]) -> SockAddr -> L8.ByteString -> ByteString -> String +showPacket f addr flow bs = L8.unpack $ L8.unlines es + where + es = map (L8.append prefix) (f $ L8.lines pp) + + prefix = L8.pack (either show show $ either4or6 addr) <> flow + + pp = either L8.pack showBEncode $ BE.decode bs + +-- Add detailed printouts for every packet. +addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString +addVerbosity tr = + tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do + forM_ m $ mapM_ $ \(msg,addr) -> do + dput XBitTorrent (showPacket id addr " --> " msg) + kont m + , sendMessage = \addr msg -> do + dput XBitTorrent (showPacket id addr " <-- " msg) + sendMessage tr addr msg + } + + +showParseError :: ByteString -> SockAddr -> String -> String +showParseError bs addr err = showPacket (L8.pack err :) addr " --> " bs + +parsePacket :: ByteString -> SockAddr -> Either String (Message BValue, NodeInfo) +parsePacket bs addr = left (showParseError bs addr) $ do + pkt <- BE.decode bs + -- TODO: Error packets do not include a valid msgOrigin. + -- The BE.decode method is using 'zeroID' as a placeholder. + ni <- nodeInfo (msgOrigin pkt) addr + return (pkt, ni) + +encodePacket :: Message BValue -> NodeInfo -> (ByteString, SockAddr) +encodePacket msg ni = ( toStrict $ BE.encode msg + , nodeAddr ni ) + +classify :: Message BValue -> MessageClass String Method TransactionId NodeInfo (Message BValue) +classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid +classify (R { msgID = tid }) = IsResponse tid + +encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue +encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) + +encodeQueryPayload :: BEncode a => + Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue +encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly + +errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a +errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) + +decodePayload :: BEncode a => Message BValue -> Either String a +decodePayload msg = BE.fromBEncode $ qryPayload msg + +type Handler = MethodHandler String TransactionId NodeInfo (Message BValue) + +handler :: ( BEncode a + , BEncode b + ) => + (NodeInfo -> a -> IO b) -> Maybe Handler +handler f = Just $ MethodHandler decodePayload encodeResponsePayload f + + +handlerE :: ( BEncode a + , BEncode b + ) => + (NodeInfo -> a -> IO (Either Error b)) -> Maybe Handler +handlerE f = Just $ MethodHandler decodePayload enc f + where + enc tid self dest (Left e) = errorPayload tid self dest e + enc tid self dest (Right b) = encodeResponsePayload tid self dest b + +type AnnounceSet = Set (InfoHash, PortNumber) + +data SwarmsDatabase = SwarmsDatabase + { contactInfo :: !( TVar PeerStore ) -- ^ Published by other nodes. + , sessionTokens :: !( TVar SessionTokens ) -- ^ Query session IDs. + , announceInfo :: !( TVar AnnounceSet ) -- ^ To publish by this node. + } + +newSwarmsDatabase :: IO SwarmsDatabase +newSwarmsDatabase = do + toks <- nullSessionTokens + atomically + $ SwarmsDatabase <$> newTVar def + <*> newTVar toks + <*> newTVar def + +data Routing = Routing + { tentativeId :: NodeInfo + , committee4 :: TriadCommittee NodeId SockAddr + , committee6 :: TriadCommittee NodeId SockAddr + , refresher4 :: BucketRefresher NodeId NodeInfo + , refresher6 :: BucketRefresher NodeId NodeInfo + } + +sched4 :: Routing -> TVar (Int.PSQ POSIXTime) +sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue + +sched6 :: Routing -> TVar (Int.PSQ POSIXTime) +sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue + +routing4 :: Routing -> TVar (R.BucketList NodeInfo) +routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets + +routing6 :: Routing -> TVar (R.BucketList NodeInfo) +routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets + +traced :: Show tid => TableMethods t tid -> TableMethods t tid +traced (TableMethods ins del lkup) + = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) + (\tid t -> trace ("del "++show tid) $ del tid t) + (\tid t -> trace ("lookup "++show tid) $ lkup tid t) + + +type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) + +-- | Like 'nodeInfo' but falls back to 'iNADDR_ANY' for nodeIP' and 'nodePort'. +mkNodeInfo :: NodeId -> SockAddr -> NodeInfo +mkNodeInfo nid addr = NodeInfo + { nodeId = nid + , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr + , nodePort = fromMaybe 0 $ sockAddrPort addr + } + +newClient :: SwarmsDatabase -> SockAddr + -> IO ( MainlineClient + , Routing + , [NodeInfo] -> [NodeInfo] -> IO () + , [NodeInfo] -> [NodeInfo] -> IO () + ) +newClient swarms addr = do + udp <- udpTransport addr + nid <- NodeId <$> getRandomBytes 20 + let tentative_info = mkNodeInfo nid addr + tentative_info6 <- + maybe tentative_info + (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info) + $ bep42 (toSockAddr ip6) (nodeId tentative_info) + , nodeIP = IPv6 ip6 + }) + <$> global6 + addr4 <- atomically $ newTChan + addr6 <- atomically $ newTChan + mkrouting <- atomically $ do + -- We defer initializing the refreshSearch and refreshPing until we + -- have a client to send queries with. + let nullPing = const $ return False + nullSearch = mainlineSearch $ Left $ \_ _ -> return Nothing + tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount + refresher4 <- newBucketRefresher tbl4 nullSearch nullPing + tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount + refresher6 <- newBucketRefresher tbl6 nullSearch nullPing + let updateIPVote tblvar addrvar a = do + bkts <- readTVar tblvar + case bep42 a (nodeId $ R.thisNode bkts) of + Just nid -> do + let tbl = R.nullTable (comparing nodeId) + (\s -> hashWithSalt s . nodeId) + (mkNodeInfo nid a) + (R.defaultBucketCount) + writeTVar tblvar tbl + writeTChan addrvar (a,map fst $ concat $ R.toList bkts) + Nothing -> return () + committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 + committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 + return $ \client -> + -- Now we have a client, so tell the BucketRefresher how to search and ping. + let updIO r = updateRefresherIO (nodeSearch client) (ping client) r + in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6) + map_var <- atomically $ newTVar (0, mempty) + + let routing = mkrouting outgoingClient + + net = onInbound (updateRouting outgoingClient routing) + $ layerTransport parsePacket encodePacket + $ udp + + -- Paranoid: It's safe to define /net/ and /client/ to be mutually + -- recursive since 'updateRouting' does not invoke 'awaitMessage' which + -- which was modified by 'onInbound'. However, I'm going to avoid the + -- mutual reference just to be safe. + outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } + + dispatch = DispatchMethods + { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x + , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x) + , tableMethods = mapT -- :: TransactionMethods tbl tid x + } + + handlers :: Method -> Maybe Handler + handlers ( Method "ping" ) = handler pingH + handlers ( Method "find_node" ) = handler $ findNodeH routing + handlers ( Method "get_peers" ) = handler $ getPeersH routing swarms + handlers ( Method "announce_peer" ) = handlerE $ announceH swarms + handlers ( Method meth ) = Just $ defaultHandler meth + + mapT = transactionMethods mapMethods gen + + gen :: Word16 -> (TransactionId, Word16) + gen cnt = (TransactionId $ S.encode cnt, cnt+1) + + ignoreParseError :: String -> IO () + ignoreParseError _ = return () + + client = Client + { clientNet = addHandler ignoreParseError (handleMessage client) net + , clientDispatcher = dispatch + , clientErrorReporter = ignoreErrors -- printErrors stderr + , clientPending = map_var + , clientAddress = \maddr -> atomically $ do + let var = case flip prefer4or6 Nothing <$> maddr of + Just Want_IP6 -> routing6 routing + _ -> routing4 routing + R.thisNode <$> readTVar var + , clientResponseId = return + } + + -- TODO: Provide some means of shutting down these five auxillary threads: + + fork $ fix $ \again -> do + myThreadId >>= flip labelThread "addr4" + (addr, ns) <- atomically $ readTChan addr4 + dput XBitTorrent $ "External IPv4: "++show (addr, length ns) + forM_ ns $ \n -> do + dput XBitTorrent $ "Change IP, ping: "++show n + ping outgoingClient n + -- TODO: trigger bootstrap ipv4 + again + fork $ fix $ \again -> do + myThreadId >>= flip labelThread "addr6" + (addr,ns) <- atomically $ readTChan addr6 + dput XBitTorrent $ "External IPv6: "++show (addr, length ns) + forM_ ns $ \n -> do + dput XBitTorrent $ "Change IP, ping: "++show n + ping outgoingClient n + -- TODO: trigger bootstrap ipv6 + again + + + refresh_thread4 <- forkPollForRefresh $ refresher4 routing + refresh_thread6 <- forkPollForRefresh $ refresher6 routing + + forkAnnouncedInfohashesGC (contactInfo swarms) + + return (client, routing, bootstrap (refresher4 routing), bootstrap (refresher6 routing)) + +-- Note that you should call .put() every hour for content that you want to +-- keep alive, since nodes may discard data nodes older than 2 hours. (source: +-- https://www.npmjs.com/package/bittorrent-dht) +-- +-- This function will discard records between 3 and 6 hours old. +forkAnnouncedInfohashesGC :: TVar PeerStore -> IO ThreadId +forkAnnouncedInfohashesGC vpeers = fork $ do + myThreadId >>= flip labelThread "gc:bt-peers" + fix $ \loop -> do + cutoff <- getPOSIXTime + threadDelay 10800000000 -- 3 hours + atomically $ modifyTVar' vpeers $ deleteOlderThan cutoff + loop + +-- | Modifies a purely random 'NodeId' to one that is related to a given +-- routable address in accordance with BEP 42. +-- +-- Test vectors from the spec: +-- +-- IP rand example node ID +-- ============ ===== ========================================== +-- 124.31.75.21 1 5fbfbf f10c5d6a4ec8a88e4c6ab4c28b95eee4 01 +-- 21.75.31.124 86 5a3ce9 c14e7a08645677bbd1cfe7d8f956d532 56 +-- 65.23.51.170 22 a5d432 20bc8f112a3d426c84764f8c2a1150e6 16 +-- 84.124.73.14 65 1b0321 dd1bb1fe518101ceef99462b947a01ff 41 +-- 43.213.53.83 90 e56f6c bf5b7c4be0237986d5243b87aa6d5130 5a +bep42 :: SockAddr -> NodeId -> Maybe NodeId +bep42 addr0 (NodeId r) + | let addr = either id id $ either4or6 addr0 -- unmap 4mapped SockAddrs + , Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4) + <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6) + = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) + | otherwise + = Nothing + where + ip4mask = "\x03\x0f\x3f\xff" :: ByteString + ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString + nbhood_select = B.last r .&. 7 + retr n = pure $ B.drop (B.length r - n) r + crc = S.encode . crc32c . B.pack + applyMask ip = case B.zipWith (.&.) msk ip of + (b:bs) -> (b .|. shiftL nbhood_select 5) : bs + bs -> bs + where msk | B.length ip == 4 = ip4mask + | otherwise = ip6mask + + + +defaultHandler :: ByteString -> Handler +defaultHandler meth = MethodHandler decodePayload errorPayload returnError + where + returnError :: NodeInfo -> BValue -> IO Error + returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) + +mainlineKademlia :: MainlineClient + -> TriadCommittee NodeId SockAddr + -> BucketRefresher NodeId NodeInfo + -> Kademlia NodeId NodeInfo +mainlineKademlia client committee refresher + = Kademlia quietInsertions + mainlineSpace + (vanillaIO (refreshBuckets refresher) $ ping client) + { tblTransition = \tr -> do + io1 <- transitionCommittee committee tr + io2 <- touchBucket refresher tr + return $ do + io1 >> io2 + {- noisy (timestamp updates are currently reported as transitions to Accepted) + dput XBitTorrent $ unwords + [ show (transitionedTo tr) + , show (transitioningNode tr) + ] -} + } + + +mainlineSpace :: R.KademliaSpace NodeId NodeInfo +mainlineSpace = R.KademliaSpace + { R.kademliaLocation = nodeId + , R.kademliaTestBit = testIdBit + , R.kademliaXor = xor + , R.kademliaSample = genBucketSample' + } + +transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) +transitionCommittee committee (RoutingTransition ni Stranger) = do + delVote committee (nodeId ni) + return $ do + dput XBitTorrent $ "delVote "++show (nodeId ni) +transitionCommittee committee _ = return $ return () + +updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () +updateRouting client routing naddr msg = do + case prefer4or6 naddr Nothing of + Want_IP4 -> go (committee4 routing) (refresher4 routing) + Want_IP6 -> go (committee6 routing) (refresher6 routing) + where + go committee refresher = do + self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) + when (nodeIP self /= nodeIP naddr) $ do + case msg of + R { rspReflectedIP = Just sockaddr } + -> do + -- dput XBitTorrent $ "External: "++show (nodeId naddr,sockaddr) + atomically $ addVote committee (nodeId naddr) sockaddr + _ -> return () + insertNode (mainlineKademlia client committee refresher) naddr + +data Ping = Ping deriving Show + +-- Pong is the same as Ping. +type Pong = Ping +pattern Pong = Ping + +instance BEncode Ping where + toBEncode Ping = toDict endDict + fromBEncode _ = pure Ping + +wantList :: WantIP -> [ByteString] +wantList Want_IP4 = ["ip4"] +wantList Want_IP6 = ["ip6"] +wantList Want_Both = ["ip4","ip6"] + +instance BEncode WantIP where + toBEncode w = toBEncode $ wantList w + fromBEncode bval = do + wants <- fromBEncode bval + let _ = wants :: [ByteString] + case (elem "ip4" wants, elem "ip6" wants) of + (True,True) -> Right Want_Both + (True,False) -> Right Want_IP4 + (False,True) -> Right Want_IP6 + _ -> Left "Unrecognized IP type." + +data FindNode = FindNode NodeId (Maybe WantIP) + +instance BEncode FindNode where + toBEncode (FindNode nid iptyp) = toDict $ target_key .=! nid + .: want_key .=? iptyp + .: endDict + fromBEncode = fromDict $ FindNode <$>! target_key + <*>? want_key + +data NodeFound = NodeFound + { nodes4 :: [NodeInfo] + , nodes6 :: [NodeInfo] + } + +instance BEncode NodeFound where + toBEncode (NodeFound ns ns6) = toDict $ + nodes_key .=? + (if Prelude.null ns then Nothing + else Just (S.runPut (mapM_ putNodeInfo4 ns))) + .: nodes6_key .=? + (if Prelude.null ns6 then Nothing + else Just (S.runPut (mapM_ putNodeInfo6 ns6))) + .: endDict + + fromBEncode bval = NodeFound <$> ns4 <*> ns6 + where + opt ns = fromMaybe [] <$> optional ns + ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval + ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval + +binary :: S.Get a -> BKey -> BE.Get [a] +binary get k = field (req k) >>= either (fail . format) return . + S.runGet (many get) + where + format str = "fail to deserialize " ++ show k ++ " field: " ++ str + +pingH :: NodeInfo -> Ping -> IO Pong +pingH _ Ping = return Pong + +prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP +prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp + +findNodeH :: Routing -> NodeInfo -> FindNode -> IO NodeFound +findNodeH routing addr (FindNode node iptyp) = do + let preferred = prefer4or6 addr iptyp + + (append4,append6) <- atomically $ do + ni4 <- R.thisNode <$> readTVar (routing4 routing) + ni6 <- R.thisNode <$> readTVar (routing6 routing) + return $ case ipFamily (nodeIP addr) of + Want_IP4 -> (id, (++ [ni6])) + Want_IP6 -> ((++ [ni4]), id) + ks <- bool (return []) (go append4 $ routing4 routing) (preferred /= Want_IP6) + ks6 <- bool (return []) (go append6 $ routing6 routing) (preferred /= Want_IP4) + return $ NodeFound ks ks6 + where + go f var = f . R.kclosest mainlineSpace k node <$> atomically (readTVar var) + + k = R.defaultK + + +data GetPeers = GetPeers InfoHash (Maybe WantIP) + +instance BEncode GetPeers where + toBEncode (GetPeers ih iptyp) + = toDict $ info_hash_key .=! ih + .: want_key .=? iptyp + .: endDict + fromBEncode = fromDict $ GetPeers <$>! info_hash_key <*>? want_key + + +data GotPeers = GotPeers + { -- | If the queried node has no peers for the infohash, returned + -- the K nodes in the queried nodes routing table closest to the + -- infohash supplied in the query. + peers :: [PeerAddr] + + , nodes :: NodeFound + + -- | The token value is a required argument for a future + -- announce_peer query. + , grantedToken :: Token + } -- deriving (Show, Eq, Typeable) + +nodeIsIPv6 :: NodeInfo -> Bool +nodeIsIPv6 (NodeInfo _ (IPv6 _) _) = True +nodeIsIPv6 _ = False + +instance BEncode GotPeers where + toBEncode GotPeers { nodes = NodeFound ns4 ns6, ..} = toDict $ + nodes_key .=? (if null ns4 then Nothing + else Just $ S.runPut (mapM_ putNodeInfo4 ns4)) + .: nodes6_key .=? (if null ns6 then Nothing + else Just $ S.runPut (mapM_ putNodeInfo4 ns6)) + .: token_key .=! grantedToken + .: peers_key .=! map S.encode peers + .: endDict + + fromBEncode = fromDict $ do + ns4 <- fromMaybe [] <$> optional (binary getNodeInfo4 nodes_key) -- "nodes" + ns6 <- fromMaybe [] <$> optional (binary getNodeInfo6 nodes6_key) -- "nodes6" + -- TODO: BEP 42... + -- + -- Once enforced, responses to get_peers requests whose node ID does not + -- match its external IP should be considered to not contain a token and + -- thus not be eligible as storage target. Implementations should take + -- care that they find the closest set of nodes which return a token and + -- whose IDs matches their IPs before sending a store request to those + -- nodes. + -- + -- Sounds like something to take care of at peer-search time, so I'll + -- ignore it for now. + tok <- field (req token_key) -- "token" + ps <- fromMaybe [] <$> optional (field (req peers_key) >>= decodePeers) -- "values" + pure $ GotPeers ps (NodeFound ns4 ns6) tok + where + decodePeers = either fail pure . mapM S.decode + +getPeersH :: Routing -> SwarmsDatabase -> NodeInfo -> GetPeers -> IO GotPeers +getPeersH routing (SwarmsDatabase peers toks _) naddr (GetPeers ih iptyp) = do + ps <- do + tm <- getTimestamp + atomically $ do + (ps,store') <- Peers.freshPeers ih tm <$> readTVar peers + writeTVar peers store' + return ps + -- Filter peer results to only a single address family, IPv4 or IPv6, as + -- per BEP 32. + let notboth = iptyp >>= \case Want_Both -> Nothing + specific -> Just specific + selected = prefer4or6 naddr notboth + ps' = filter ( (== selected) . ipFamily . peerHost ) ps + tok <- grantToken toks naddr + ns <- findNodeH routing naddr (FindNode (coerce ih) iptyp) + return $ GotPeers ps' ns tok + +-- | Announce that the peer, controlling the querying node, is +-- downloading a torrent on a port. +data Announce = Announce + { -- | If set, the 'port' field should be ignored and the source + -- port of the UDP packet should be used as the peer's port + -- instead. This is useful for peers behind a NAT that may not + -- know their external port, and supporting uTP, they accept + -- incoming connections on the same port as the DHT port. + impliedPort :: Bool + + -- | infohash of the torrent; + , topic :: InfoHash + + -- | some clients announce the friendly name of the torrent here. + , announcedName :: Maybe ByteString + + -- | the port /this/ peer is listening; + , port :: PortNumber + + -- TODO: optional boolean "seed" key + + -- | received in response to a previous get_peers query. + , sessionToken :: Token + + } deriving (Show, Eq, Typeable) + +mkAnnounce :: PortNumber -> InfoHash -> Token -> Announce +mkAnnounce portnum info token = Announce + { topic = info + , port = portnum + , sessionToken = token + , announcedName = Nothing + , impliedPort = False + } + + +instance BEncode Announce where + toBEncode Announce {..} = toDict $ + implied_port_key .=? flagField impliedPort + .: info_hash_key .=! topic + .: name_key .=? announcedName + .: port_key .=! port + .: token_key .=! sessionToken + .: endDict + where + flagField flag = if flag then Just (1 :: Int) else Nothing + + fromBEncode = fromDict $ do + Announce <$> (boolField <$> optional (field (req implied_port_key))) + <*>! info_hash_key + <*>? name_key + <*>! port_key + <*>! token_key + where + boolField = maybe False (/= (0 :: Int)) + + + +-- | The queried node must verify that the token was previously sent +-- to the same IP address as the querying node. Then the queried node +-- should store the IP address of the querying node and the supplied +-- port number under the infohash in its store of peer contact +-- information. +data Announced = Announced + deriving (Show, Eq, Typeable) + +instance BEncode Announced where + toBEncode _ = toBEncode Ping + fromBEncode _ = pure Announced + +announceH :: SwarmsDatabase -> NodeInfo -> Announce -> IO (Either Error Announced) +announceH (SwarmsDatabase peers toks _) naddr announcement = do + checkToken toks naddr (sessionToken announcement) + >>= bool (Left <$> return (Error ProtocolError "invalid parameter: token")) + (Right <$> go) + where + go = atomically $ do + modifyTVar' peers + $ insertPeer (topic announcement) (announcedName announcement) + $ PeerAddr + { peerId = Nothing + -- Avoid storing IPv4-mapped addresses. + , peerHost = case nodeIP naddr of + IPv6 ip6 | Just ip4 <- un4map ip6 -> IPv4 ip4 + a -> a + , peerPort = if impliedPort announcement + then nodePort naddr + else port announcement + } + return Announced + +isReadonlyClient :: MainlineClient -> Bool +isReadonlyClient client = False -- TODO + +mainlineSend :: ( BEncode a + , BEncode a2 + ) => Method + -> (a2 -> b) + -> (t -> a) + -> MainlineClient + -> t + -> NodeInfo + -> IO (Maybe b) +mainlineSend meth unwrap msg client nid addr = do + reply <- sendQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr + -- sendQuery will return (Just (Left _)) on a parse error. We're going to + -- blow it away with the join-either sequence. + -- TODO: Do something with parse errors. + return $ join $ either (const Nothing) Just <$> reply + +mainlineAsync :: (BEncode a1, BEncode a2) => + Method + -> (a2 -> a3) + -> (t -> a1) + -> Client String Method TransactionId NodeInfo (Message BValue) + -> t + -> NodeInfo + -> (Maybe a3 -> IO ()) + -> IO () +mainlineAsync meth unwrap msg client nid addr onresult = do + asyncQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr + $ \reply -> + -- sendQuery will return (Just (Left _)) on a parse error. We're going to + -- blow it away with the join-either sequence. + -- TODO: Do something with parse errors. + onresult $ join $ either (const Nothing) Just <$> reply + +mainlineSerializeer :: (BEncode a2, BEncode a1) => + Method + -> (a2 -> b) + -> MainlineClient + -> MethodSerializer + TransactionId NodeInfo (Message BValue) Method a1 (Either Error b) +mainlineSerializeer meth unwrap client = MethodSerializer + { methodTimeout = \_ ni -> return (ni, 5000000) + , method = meth + , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) + , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) + (Right . unwrap) + . BE.fromBEncode) + . rspPayload + } + +ping :: MainlineClient -> NodeInfo -> IO Bool +ping client addr = + fromMaybe False + <$> mainlineSend (Method "ping") (\Pong -> True) (const Ping) client () addr + +-- searchQuery :: ni -> IO (Maybe [ni], [r], tok)) +getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) +getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) + +asyncGetNodes :: Client String Method TransactionId NodeInfo (Message BValue) + -> NodeId + -> NodeInfo + -> (Maybe ([NodeInfo], [NodeInfo], Maybe ()) -> IO ()) + -> IO () +asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) + +unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) +unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) + +getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Maybe Token)) +getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce + +asyncGetPeers :: Client String Method TransactionId NodeInfo (Message BValue) + -> NodeId + -> NodeInfo + -> (Maybe ([NodeInfo], [PeerAddr], Maybe Token) -> IO ()) + -> IO () +asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce + +unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) +unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) + +mainlineSearch :: Either (NodeId -> NodeInfo -> IO (Maybe ([NodeInfo], [r], Maybe tok))) + (NodeId -> NodeInfo -> (Maybe ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO ()) + -> Search NodeId (IP, PortNumber) tok NodeInfo r +mainlineSearch qry = Search + { searchSpace = mainlineSpace + , searchNodeAddress = nodeIP &&& nodePort + , searchQuery = qry + , searchAlpha = 8 + , searchK = 16 + } + +nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo +nodeSearch client = mainlineSearch (Right $ asyncGetNodes client) + +peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr +peerSearch client = mainlineSearch (Right $ asyncGetPeers client) + +-- | List of bootstrap nodes maintained by different bittorrent +-- software authors. +bootstrapNodes :: WantIP -> IO [NodeInfo] +bootstrapNodes want = unsafeInterleaveIO $ do + let wellknowns = + [ "router.bittorrent.com:6881" -- by BitTorrent Inc. + + -- doesn't work at the moment (use git blame) of commit + , "dht.transmissionbt.com:6881" -- by Transmission project + + , "router.utorrent.com:6881" + ] + nss <- forM wellknowns $ \hostAndPort -> do + e <- resolve want hostAndPort + case e of + Left _ -> return [] + Right sockaddr -> either (const $ return []) + (return . (: [])) + $ nodeInfo zeroID sockaddr + return $ concat nss + +-- | Resolve either a numeric network address or a hostname to a +-- numeric IP address of the node. +resolve :: WantIP -> String -> IO (Either IOError SockAddr) +resolve want hostAndPort = do + let hints = defaultHints { addrSocketType = Datagram + , addrFamily = case want of + Want_IP4 -> AF_INET + _ -> AF_INET6 + } + (rport,rhost) = span (/= ':') $ reverse hostAndPort + (host,port) = case rhost of + [] -> (hostAndPort, Nothing) + (_:hs) -> (reverse hs, Just (reverse rport)) + tryIOError $ do + -- getAddrInfo throws exception on empty list, so this + -- pattern matching never fails. + info : _ <- getAddrInfo (Just hints) (Just host) port + return $ addrAddress info + + +announce :: MainlineClient -> Announce -> NodeInfo -> IO (Maybe Announced) +announce client msg addr = do + mainlineSend (Method "announce_peer") id (\() -> msg) client () addr diff --git a/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs b/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs new file mode 100644 index 00000000..05a64014 --- /dev/null +++ b/dht/src/Network/BitTorrent/MainlineDHT/Symbols.hs @@ -0,0 +1,24 @@ +{-# OPTIONS_GHC -fno-warn-missing-signatures #-} +module Network.BitTorrent.MainlineDHT.Symbols where + +import Data.BEncode.BDict + +peer_ip_key = "ip" :: BKey +peer_id_key = "peer id" :: BKey +peer_port_key = "port" :: BKey +msg_type_key = "msg_type" :: BKey +piece_key = "piece" :: BKey +total_size_key = "total_size" :: BKey +node_id_key = "id" :: BKey +read_only_key = "ro" :: BKey +want_key = "want" :: BKey +target_key = "target" :: BKey +nodes_key = "nodes" :: BKey +nodes6_key = "nodes6" :: BKey +info_hash_key = "info_hash" :: BKey +peers_key = "values" :: BKey +token_key = "token" :: BKey +name_key = "name" :: BKey +port_key = "port" :: BKey +implied_port_key = "implied_port" :: BKey + -- cgit v1.2.3