From 7acfd91d1390d2137994366cbdfdfc6b9a2885fd Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 15 Sep 2017 03:48:59 -0400 Subject: Moved Mainline to hierarchical location. --- src/Network/BitTorrent/MainlineDHT.hs | 1081 +++++++++++++++++++++++++++++++++ 1 file changed, 1081 insertions(+) create mode 100644 src/Network/BitTorrent/MainlineDHT.hs (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs new file mode 100644 index 00000000..9d48c67b --- /dev/null +++ b/src/Network/BitTorrent/MainlineDHT.hs @@ -0,0 +1,1081 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveFoldable #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TupleSections #-} +module Network.BitTorrent.MainlineDHT where + +import Control.Applicative +import Control.Arrow +import Control.Concurrent.STM +import Control.Monad +import Crypto.Random +import Data.BEncode as BE +import qualified Data.BEncode.BDict as BE + ;import Data.BEncode.BDict (BKey) +import Data.BEncode.Pretty +import Data.BEncode.Types (BDict) +import Data.Bits +import Data.Bits.ByteString +import Data.Bool +import qualified Data.ByteArray as BA + ;import Data.ByteArray (ByteArrayAccess) +import qualified Data.ByteString as B + ;import Data.ByteString (ByteString) +import qualified Data.ByteString.Base16 as Base16 +import qualified Data.ByteString.Char8 as C8 +import Data.ByteString.Lazy (toStrict) +import qualified Data.ByteString.Lazy.Char8 as L8 +import Data.Char +import Data.Coerce +import Data.Data +import Data.Default +import Data.Digest.CRC32C +import Data.Function (fix) +import Data.Hashable +import Data.IP +import Data.List +import Data.Maybe +import Data.Monoid +import Data.Ord +import qualified Data.Serialize as S +import Data.Set (Set) +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Torrent +import Data.Typeable +import Data.Word +import qualified Data.Wrapper.PSQInt as Int +import Debug.Trace +import Network.Kademlia +import Network.Address (Address, fromAddr, fromSockAddr, + setPort, sockAddrPort, testIdBit, + toSockAddr, genBucketSample', WantIP(..), + un4map,either4or6,ipFamily) +import Network.BitTorrent.DHT.ContactInfo as Peers +import Network.BitTorrent.DHT.Search (Search (..)) +import Network.BitTorrent.DHT.Token as Token +import qualified Network.DHT.Routing as R + ;import Network.DHT.Routing (Timestamp, getTimestamp) +import Network.QueryResponse +import Network.Socket +import System.IO +import System.IO.Error +import System.IO.Unsafe (unsafeInterleaveIO) +import qualified Text.ParserCombinators.ReadP as RP +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif +import Control.Exception (SomeException (..), handle) +import qualified Data.Aeson as JSON + ;import Data.Aeson (FromJSON, ToJSON, (.=)) +import Text.Read +import System.Global6 +import Control.TriadCommittee + +newtype NodeId = NodeId ByteString + deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) + +instance BEncode NodeId where + fromBEncode bval = do + bs <- fromBEncode bval + if B.length bs /= 20 + then Left "Invalid length node id." + else Right $ NodeId bs + + toBEncode (NodeId bs) = toBEncode bs + +instance Show NodeId where + show (NodeId bs) = C8.unpack $ Base16.encode bs + +instance S.Serialize NodeId where + get = NodeId <$> S.getBytes 20 + put (NodeId bs) = S.putByteString bs + +instance FiniteBits NodeId where + finiteBitSize _ = 160 + +instance Read NodeId where + readsPrec _ str + | (bs, xs) <- Base16.decode $ C8.pack str + , B.length bs == 20 + = [ (NodeId bs, drop 40 str) ] + | otherwise = [] + +zeroID :: NodeId +zeroID = NodeId $ B.replicate 20 0 + +data NodeInfo = NodeInfo + { nodeId :: NodeId + , nodeIP :: IP + , nodePort :: PortNumber + } + deriving (Eq,Ord) + +instance ToJSON NodeInfo where + toJSON (NodeInfo nid (IPv4 ip) port) + = JSON.object [ "node-id" .= show nid + , "ipv4" .= show ip + , "port" .= (fromIntegral port :: Int) + ] + toJSON (NodeInfo nid (IPv6 ip6) port) + | Just ip <- un4map ip6 + = JSON.object [ "node-id" .= show nid + , "ipv4" .= show ip + , "port" .= (fromIntegral port :: Int) + ] + | otherwise + = JSON.object [ "node-id" .= show nid + , "ipv6" .= show ip6 + , "port" .= (fromIntegral port :: Int) + ] +instance FromJSON NodeInfo where + parseJSON (JSON.Object v) = do + nidstr <- v JSON..: "node-id" + ip6str <- v JSON..:? "ipv6" + ip4str <- v JSON..:? "ipv4" + portnum <- v JSON..: "port" + ip <- maybe empty (return . IPv6) (ip6str >>= readMaybe) + <|> maybe empty (return . IPv4) (ip4str >>= readMaybe) + let (bs,_) = Base16.decode (C8.pack nidstr) + guard (B.length bs == 20) + return $ NodeInfo (NodeId bs) ip (fromIntegral (portnum :: Word16)) + +hexdigit :: Char -> Bool +hexdigit c = ('0' <= c && c <= '9') || ( 'a' <= c && c <= 'f') || ( 'A' <= c && c <= 'F') + +instance Read NodeInfo where + readsPrec i = RP.readP_to_S $ do + RP.skipSpaces + let n = 40 -- characters in node id. + parseAddr = RP.between (RP.char '(') (RP.char ')') (RP.munch (/=')')) + RP.+++ RP.munch (not . isSpace) + nodeidAt = do hexhash <- sequence $ replicate n (RP.satisfy hexdigit) + RP.char '@' RP.+++ RP.satisfy isSpace + addrstr <- parseAddr + nid <- case Base16.decode $ C8.pack hexhash of + (bs,_) | B.length bs==20 -> return (NodeId bs) + _ -> fail "Bad node id." + return (nid,addrstr) + (nid,addrstr) <- ( nodeidAt RP.+++ ( (zeroID,) <$> parseAddr) ) + let raddr = do + ip <- RP.between (RP.char '[') (RP.char ']') + (IPv6 <$> RP.readS_to_P (readsPrec i)) + RP.+++ (IPv4 <$> RP.readS_to_P (readsPrec i)) + _ <- RP.char ':' + port <- toEnum <$> RP.readS_to_P (readsPrec i) + return (ip, port) + + (ip,port) <- case RP.readP_to_S raddr addrstr of + [] -> fail "Bad address." + ((ip,port),_):_ -> return (ip,port) + return $ NodeInfo nid ip port + + + +-- The Hashable instance depends only on the IP address and port number. It is +-- used to compute the announce token. +instance Hashable NodeInfo where + hashWithSalt s ni = hashWithSalt s (nodeIP ni , nodePort ni) + {-# INLINE hashWithSalt #-} + + +instance Show NodeInfo where + showsPrec _ (NodeInfo nid ip port) = + shows nid . ('@' :) . showsip . (':' :) . shows port + where + showsip + | IPv4 ip4 <- ip = shows ip4 + | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = shows ip4 + | otherwise = ('[' :) . shows ip . (']' :) + +{- + +-- | KRPC 'compact list' compatible encoding: contact information for +-- nodes is encoded as a 26-byte string. Also known as "Compact node +-- info" the 20-byte Node ID in network byte order has the compact +-- IP-address/port info concatenated to the end. + get = NodeInfo <$> (NodeId <$> S.getBytes 20 ) <*> S.get <*> S.get +-} + +getNodeInfo4 :: S.Get NodeInfo +getNodeInfo4 = NodeInfo <$> (NodeId <$> S.getBytes 20) + <*> (IPv4 <$> S.get) + <*> S.get + +putNodeInfo4 :: NodeInfo -> S.Put +putNodeInfo4 (NodeInfo (NodeId nid) ip port) + | IPv4 ip4 <- ip = put4 ip4 + | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = put4 ip4 + | otherwise = return () + where + put4 ip4 = S.putByteString nid >> S.put ip4 >> S.put port + +getNodeInfo6 :: S.Get NodeInfo +getNodeInfo6 = NodeInfo <$> (NodeId <$> S.getBytes 20) + <*> (IPv6 <$> S.get) + <*> S.get + +putNodeInfo6 :: NodeInfo -> S.Put +putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port) + = S.putByteString nid >> S.put ip >> S.put port +putNodeInfo6 _ = return () + + +-- | TODO: This should depend on the bind address to support IPv4-only. For +-- now, in order to support dual-stack listen, we're going to assume IPv6 is +-- wanted and map IPv4 addresses accordingly. +nodeAddr :: NodeInfo -> SockAddr +nodeAddr (NodeInfo _ ip port) = + case ip of + IPv4 ip4 -> setPort port $ toSockAddr (ipv4ToIPv6 ip4) + IPv6 ip6 -> setPort port $ toSockAddr ip6 + +nodeInfo :: NodeId -> SockAddr -> Either String NodeInfo +nodeInfo nid saddr + | Just ip <- fromSockAddr saddr + , Just port <- sockAddrPort saddr = Right $ NodeInfo nid ip port + | otherwise = Left "Address family not supported." + +-- | Types of RPC errors. +data ErrorCode + -- | Some error doesn't fit in any other category. + = GenericError + + -- | Occurs when server fail to process procedure call. + | ServerError + + -- | Malformed packet, invalid arguments or bad token. + | ProtocolError + + -- | Occurs when client trying to call method server don't know. + | MethodUnknown + deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data) + +-- | According to the table: +-- +instance Enum ErrorCode where + fromEnum GenericError = 201 + fromEnum ServerError = 202 + fromEnum ProtocolError = 203 + fromEnum MethodUnknown = 204 + {-# INLINE fromEnum #-} + toEnum 201 = GenericError + toEnum 202 = ServerError + toEnum 203 = ProtocolError + toEnum 204 = MethodUnknown + toEnum _ = GenericError + {-# INLINE toEnum #-} + +instance BEncode ErrorCode where + toBEncode = toBEncode . fromEnum + {-# INLINE toBEncode #-} + fromBEncode b = toEnum <$> fromBEncode b + {-# INLINE fromBEncode #-} + +data Error = Error + { errorCode :: !ErrorCode -- ^ The type of error. + , errorMessage :: !ByteString -- ^ Human-readable text message. + } deriving ( Show, Eq, Ord, Typeable, Data, Read ) + +newtype TransactionId = TransactionId ByteString + deriving (Eq, Ord, Show, BEncode) + +newtype Method = Method ByteString + deriving (Eq, Ord, Show, BEncode) + +data Message a = Q { msgOrigin :: NodeId + , msgID :: TransactionId + , qryPayload :: a + , qryMethod :: Method + , qryReadOnly :: Bool } + + | R { msgOrigin :: NodeId + , msgID :: TransactionId + , rspPayload :: Either Error a + , rspReflectedIP :: Maybe SockAddr } + +showBE bval = L8.unpack (showBEncode bval) + +instance BE.BEncode (Message BValue) where + toBEncode m = encodeMessage m + {- + in case m of + Q {} -> trace ("encoded(query): "++showBE r) r + R {} -> trace ("encoded(response): "++showBE r) r -} + fromBEncode bval = decodeMessage bval + {- + in case r of + Left e -> trace (show e) r + Right (Q {}) -> trace ("decoded(query): "++showBE bval) r + Right (R {}) -> trace ("decoded(response): "++showBE bval) r -} + +decodeMessage :: BValue -> Either String (Message BValue) +decodeMessage = fromDict $ do + key <- lookAhead (field (req "y")) + let _ = key :: BKey + f <- case key of + "q" -> do a <- field (req "a") + g <- either fail return $ flip fromDict a $ do + who <- field (req "id") + ro <- fromMaybe False <$> optional (field (req "ro")) + return $ \meth tid -> Q who tid a meth ro + meth <- field (req "q") + return $ g meth + "r" -> do ip <- do + ipstr <- optional (field (req "ip")) + mapM (either fail return . decodeAddr) ipstr + vals <- field (req "r") + either fail return $ flip fromDict vals $ do + who <- field (req "id") + return $ \tid -> R who tid (Right vals) ip + "e" -> do (ecode,emsg) <- field (req "e") + ip <- do + ipstr <- optional (field (req "ip")) + mapM (either fail return . decodeAddr) ipstr + -- FIXME:Spec does not give us the NodeId of the sender. + -- Using 'zeroID' as place holder. + -- We should ignore the msgOrigin for errors in 'updateRouting'. + -- We should consider making msgOrigin a Maybe value. + return $ \tid -> R zeroID tid (Left (Error ecode emsg)) ip + _ -> fail $ "Mainline message is not a query, response, or an error: " + ++ show key + tid <- field (req "t") + return $ f (tid :: TransactionId) + + +encodeMessage :: Message BValue -> BValue +encodeMessage (Q origin tid a meth ro) + = case a of + BDict args -> encodeQuery tid meth (BDict $ genericArgs origin ro `BE.union` args) + _ -> encodeQuery tid meth a -- XXX: Not really a valid query. +encodeMessage (R origin tid v ip) + = case v of + Right (BDict vals) -> encodeResponse tid (BDict $ genericArgs origin False `BE.union` vals) ip + Left err -> encodeError tid err + + +encodeAddr :: SockAddr -> ByteString +encodeAddr = either encode4 encode6 . either4or6 + where + encode4 (SockAddrInet port addr) + = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port)) + + encode6 (SockAddrInet6 port _ addr _) + = S.runPut (S.put addr >> S.putWord16be (fromIntegral port)) + encode6 _ = B.empty + +decodeAddr :: ByteString -> Either String SockAddr +decodeAddr bs = S.runGet g bs + where + g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be) + | otherwise = do host <- S.get -- TODO: Is this right? + port <- fromIntegral <$> S.getWord16be + return $ SockAddrInet6 port 0 host 0 + +genericArgs :: BEncode a => a -> Bool -> BDict +genericArgs nodeid ro = + "id" .=! nodeid + .: "ro" .=? bool Nothing (Just (1 :: Int)) ro + .: endDict + +encodeError :: BEncode a => a -> Error -> BValue +encodeError tid (Error ecode emsg) = encodeAny tid "e" (ecode,emsg) id + +encodeResponse :: (BEncode tid, BEncode vals) => + tid -> vals -> Maybe SockAddr -> BValue +encodeResponse tid rvals rip = + encodeAny tid "r" rvals ("ip" .=? (BString . encodeAddr <$> rip) .:) + +encodeQuery :: (BEncode args, BEncode tid, BEncode method) => + tid -> method -> args -> BValue +encodeQuery tid qmeth qargs = encodeAny tid "q" qmeth ("a" .=! qargs .:) + +encodeAny :: + (BEncode tid, BEncode a) => + tid -> BKey -> a -> (BDict -> BDict) -> BValue +encodeAny tid key val aux = toDict $ + aux $ key .=! val + .: "t" .=! tid + .: "y" .=! key + .: endDict + + +showPacket f addr flow bs = L8.unpack $ L8.unlines es + where + es = map (L8.append prefix) (f $ L8.lines pp) + + prefix = L8.pack (either show show $ either4or6 addr) <> flow + + pp = either L8.pack showBEncode $ BE.decode bs + +-- Add detailed printouts for every packet. +addVerbosity tr = + tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do + forM_ m $ mapM_ $ \(msg,addr) -> do + hPutStrLn stderr (showPacket id addr " --> " msg) + kont m + , sendMessage = \addr msg -> do + hPutStrLn stderr (showPacket id addr " <-- " msg) + sendMessage tr addr msg + } + + + +showParseError bs addr err = showPacket (L8.pack err :) addr " --> " bs + +parsePacket :: ByteString -> SockAddr -> Either String (Message BValue, NodeInfo) +parsePacket bs addr = left (showParseError bs addr) $ do + pkt <- BE.decode bs + -- TODO: Error packets do not include a valid msgOrigin. + -- The BE.decode method is using 'zeroID' as a placeholder. + ni <- nodeInfo (msgOrigin pkt) addr + return (pkt, ni) + +encodePacket :: Message BValue -> NodeInfo -> (ByteString, SockAddr) +encodePacket msg ni = ( toStrict $ BE.encode msg + , nodeAddr ni ) + +classify :: Message BValue -> MessageClass String Method TransactionId +classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid +classify (R { msgID = tid }) = IsResponse tid + +encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue +encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) + +encodeQueryPayload :: BEncode a => + Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue +encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly + +errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a +errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) + +decodePayload :: BEncode a => Message BValue -> Either String a +decodePayload msg = BE.fromBEncode $ qryPayload msg + +type Handler = MethodHandler String TransactionId NodeInfo (Message BValue) + +handler :: ( BEncode a + , BEncode b + ) => + (NodeInfo -> a -> IO b) -> Maybe Handler +handler f = Just $ MethodHandler decodePayload encodeResponsePayload f + + +handlerE :: ( BEncode a + , BEncode b + ) => + (NodeInfo -> a -> IO (Either Error b)) -> Maybe Handler +handlerE f = Just $ MethodHandler decodePayload enc f + where + enc tid self dest (Left e) = errorPayload tid self dest e + enc tid self dest (Right b) = encodeResponsePayload tid self dest b + +type AnnounceSet = Set (InfoHash, PortNumber) + +data SwarmsDatabase = SwarmsDatabase + { contactInfo :: !( TVar PeerStore ) -- ^ Published by other nodes. + , sessionTokens :: !( TVar SessionTokens ) -- ^ Query session IDs. + , announceInfo :: !( TVar AnnounceSet ) -- ^ To publish by this node. + } + +newSwarmsDatabase :: IO SwarmsDatabase +newSwarmsDatabase = do + toks <- nullSessionTokens + atomically + $ SwarmsDatabase <$> newTVar def + <*> newTVar toks + <*> newTVar def + +data Routing = Routing + { tentativeId :: NodeInfo + , sched4 :: !( TVar (Int.PSQ POSIXTime) ) + , routing4 :: !( TVar (R.BucketList NodeInfo) ) + , committee4 :: TriadCommittee NodeId SockAddr + , sched6 :: !( TVar (Int.PSQ POSIXTime) ) + , routing6 :: !( TVar (R.BucketList NodeInfo) ) + , committee6 :: TriadCommittee NodeId SockAddr + } + +traced :: Show tid => TableMethods t tid -> TableMethods t tid +traced (TableMethods ins del lkup) + = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) + (\tid t -> trace ("del "++show tid) $ del tid t) + (\tid t -> trace ("lookup "++show tid) $ lkup tid t) + + +type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) + +newClient :: SwarmsDatabase -> SockAddr -> IO (MainlineClient, Routing) +newClient swarms addr = do + udp <- udpTransport addr + nid <- NodeId <$> getRandomBytes 20 + let tentative_info = NodeInfo + { nodeId = nid + , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr + , nodePort = fromMaybe 0 $ sockAddrPort addr + } + tentative_info6 <- + maybe tentative_info + (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info) + $ bep42 (toSockAddr ip6) (nodeId tentative_info) + , nodeIP = IPv6 ip6 + }) + <$> global6 + addr4 <- atomically $ newTChan + addr6 <- atomically $ newTChan + routing <- atomically $ do + let nobkts = R.defaultBucketCount :: Int + tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts + tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts + let updateIPVote tblvar addrvar a = do + bkts <- readTVar tblvar + case bep42 a (nodeId $ R.thisNode bkts) of + Just nid -> do + let tbl = R.nullTable (comparing nodeId) + (\s -> hashWithSalt s . nodeId) + (NodeInfo nid + (fromMaybe (toEnum 0) $ fromSockAddr a) + (fromMaybe 0 $ sockAddrPort a)) + nobkts + writeTVar tblvar tbl + writeTChan addrvar (a,map fst $ concat $ R.toList bkts) + Nothing -> return () + committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 + committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 + sched4 <- newTVar Int.empty + sched6 <- newTVar Int.empty + return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6 + map_var <- atomically $ newTVar (0, mempty) + let net = onInbound (updateRouting outgoingClient routing) + $ layerTransport parsePacket encodePacket + -- $ addVerbosity + $ udp + + -- Paranoid: It's safe to define /net/ and /client/ to be mutually + -- recursive since 'updateRouting' does not invoke 'awaitMessage' which + -- which was modified by 'onInbound'. However, I'm going to avoid the + -- mutual reference just to be safe. + outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } + + dispatch = DispatchMethods + { classifyInbound = classify -- :: x -> MessageClass err meth tid + , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x) + , tableMethods = mapT -- :: TransactionMethods tbl tid x + } + + handlers :: Method -> Maybe Handler + handlers ( Method "ping" ) = handler pingH + handlers ( Method "find_node" ) = handler $ findNodeH routing + handlers ( Method "get_peers" ) = handler $ getPeersH routing swarms + handlers ( Method "announce_peer" ) = handlerE $ announceH swarms + handlers ( Method meth ) = Just $ defaultHandler meth + + mapT = transactionMethods mapMethods gen + + gen :: Word16 -> (TransactionId, Word16) + gen cnt = (TransactionId $ S.encode cnt, cnt+1) + + client = Client + { clientNet = addHandler (handleMessage client) net + , clientDispatcher = dispatch + , clientErrorReporter = ignoreErrors -- printErrors stderr + , clientPending = map_var + , clientAddress = \maddr -> atomically $ do + let var = case flip prefer4or6 Nothing <$> maddr of + Just Want_IP6 -> routing6 routing + _ -> routing4 routing + R.thisNode <$> readTVar var + , clientResponseId = return + } + + -- TODO: Provide some means of shutting down these four auxillary threads: + + fork $ fix $ \again -> do + myThreadId >>= flip labelThread "addr4" + (addr, ns) <- atomically $ readTChan addr4 + hPutStrLn stderr $ "External IPv4: "++show (addr, length ns) + forM_ ns $ \n -> do + hPutStrLn stderr $ "Change IP, ping: "++show n + ping outgoingClient n + -- TODO: trigger bootstrap ipv4 + again + fork $ fix $ \again -> do + myThreadId >>= flip labelThread "addr6" + (addr,ns) <- atomically $ readTChan addr6 + hPutStrLn stderr $ "External IPv6: "++show (addr, length ns) + forM_ ns $ \n -> do + hPutStrLn stderr $ "Change IP, ping: "++show n + ping outgoingClient n + -- TODO: trigger bootstrap ipv6 + again + + refresh_thread4 <- forkPollForRefresh + (15*60) + (sched4 routing) + (refreshBucket (nodeSearch client) (routing4 routing)) + refresh_thread6 <- forkPollForRefresh + (15*60) + (sched6 routing) + (refreshBucket (nodeSearch client) (routing6 routing)) + + return (client, routing) + +-- | Modifies a purely random 'NodeId' to one that is related to a given +-- routable address in accordance with BEP 42. +-- +-- Test vectors from the spec: +-- +-- IP rand example node ID +-- ============ ===== ========================================== +-- 124.31.75.21 1 5fbfbf f10c5d6a4ec8a88e4c6ab4c28b95eee4 01 +-- 21.75.31.124 86 5a3ce9 c14e7a08645677bbd1cfe7d8f956d532 56 +-- 65.23.51.170 22 a5d432 20bc8f112a3d426c84764f8c2a1150e6 16 +-- 84.124.73.14 65 1b0321 dd1bb1fe518101ceef99462b947a01ff 41 +-- 43.213.53.83 90 e56f6c bf5b7c4be0237986d5243b87aa6d5130 5a +bep42 :: SockAddr -> NodeId -> Maybe NodeId +bep42 addr0 (NodeId r) + | let addr = either id id $ either4or6 addr0 -- unmap 4mapped SockAddrs + , Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4) + <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6) + = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) + | otherwise + = Nothing + where + ip4mask = "\x03\x0f\x3f\xff" :: ByteString + ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString + nbhood_select = B.last r .&. 7 + retr n = pure $ B.drop (B.length r - n) r + crc = S.encode . crc32c . B.pack + applyMask ip = case B.zipWith (.&.) msk ip of + (b:bs) -> (b .|. shiftL nbhood_select 5) : bs + bs -> bs + where msk | B.length ip == 4 = ip4mask + | otherwise = ip6mask + + + +defaultHandler :: ByteString -> Handler +defaultHandler meth = MethodHandler decodePayload errorPayload returnError + where + returnError :: NodeInfo -> BValue -> IO Error + returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) + +mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo +mainlineKademlia client committee var sched + = Kademlia quietInsertions + mainlineSpace + (vanillaIO var $ ping client) + { tblTransition = \tr -> do + io1 <- transitionCommittee committee tr + io2 <- touchBucket mainlineSpace (15*60) var sched tr + return $ do + io1 >> io2 + {- noisy (timestamp updates are currently reported as transitions to Accepted) + hPutStrLn stderr $ unwords + [ show (transitionedTo tr) + , show (transitioningNode tr) + ] -} + } + + +mainlineSpace :: R.KademliaSpace NodeId NodeInfo +mainlineSpace = R.KademliaSpace + { R.kademliaLocation = nodeId + , R.kademliaTestBit = testIdBit + , R.kademliaXor = xor + , R.kademliaSample = genBucketSample' + } + +transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) +transitionCommittee committee (RoutingTransition ni Stranger) = do + delVote committee (nodeId ni) + return $ do + hPutStrLn stderr $ "delVote "++show (nodeId ni) +transitionCommittee committee _ = return $ return () + +updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () +updateRouting client routing naddr msg = do + case prefer4or6 naddr Nothing of + Want_IP4 -> go (routing4 routing) (committee4 routing) (sched4 routing) + Want_IP6 -> go (routing6 routing) (committee6 routing) (sched6 routing) + where + go tbl committee sched = do + self <- atomically $ R.thisNode <$> readTVar tbl + when (nodeIP self /= nodeIP naddr) $ do + case msg of + R { rspReflectedIP = Just sockaddr } + -> do + -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) + atomically $ addVote committee (nodeId naddr) sockaddr + _ -> return () + insertNode (mainlineKademlia client committee tbl sched) naddr + +data Ping = Ping deriving Show + +-- Pong is the same as Ping. +type Pong = Ping +pattern Pong = Ping + +instance BEncode Ping where + toBEncode Ping = toDict endDict + fromBEncode _ = pure Ping + +wantList :: WantIP -> [ByteString] +wantList Want_IP4 = ["ip4"] +wantList Want_IP6 = ["ip6"] +wantList Want_Both = ["ip4","ip6"] + +instance BEncode WantIP where + toBEncode w = toBEncode $ wantList w + fromBEncode bval = do + wants <- fromBEncode bval + let _ = wants :: [ByteString] + case (elem "ip4" wants, elem "ip6" wants) of + (True,True) -> Right Want_Both + (True,False) -> Right Want_IP4 + (False,True) -> Right Want_IP6 + _ -> Left "Unrecognized IP type." + +data FindNode = FindNode NodeId (Maybe WantIP) + +instance BEncode FindNode where + toBEncode (FindNode nid iptyp) = toDict $ target_key .=! nid + .: want_key .=? iptyp + .: endDict + fromBEncode = fromDict $ FindNode <$>! target_key + <*>? want_key + +data NodeFound = NodeFound + { nodes4 :: [NodeInfo] + , nodes6 :: [NodeInfo] + } + +instance BEncode NodeFound where + toBEncode (NodeFound ns ns6) = toDict $ + nodes_key .=? + (if Prelude.null ns then Nothing + else Just (S.runPut (mapM_ putNodeInfo4 ns))) + .: nodes6_key .=? + (if Prelude.null ns6 then Nothing + else Just (S.runPut (mapM_ putNodeInfo6 ns6))) + .: endDict + + fromBEncode bval = NodeFound <$> ns4 <*> ns6 + where + opt ns = fromMaybe [] <$> optional ns + ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval + ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval + +binary :: S.Get a -> BKey -> BE.Get [a] +binary get k = field (req k) >>= either (fail . format) return . + S.runGet (many get) + where + format str = "fail to deserialize " ++ show k ++ " field: " ++ str + +pingH :: NodeInfo -> Ping -> IO Pong +pingH _ Ping = return Pong + +prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP +prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp + +findNodeH :: Routing -> NodeInfo -> FindNode -> IO NodeFound +findNodeH routing addr (FindNode node iptyp) = do + let preferred = prefer4or6 addr iptyp + + (append4,append6) <- atomically $ do + ni4 <- R.thisNode <$> readTVar (routing4 routing) + ni6 <- R.thisNode <$> readTVar (routing6 routing) + return $ case ipFamily (nodeIP addr) of + Want_IP4 -> (id, (++ [ni6])) + Want_IP6 -> ((++ [ni4]), id) + ks <- bool (return []) (go append4 $ routing4 routing) (preferred /= Want_IP6) + ks6 <- bool (return []) (go append6 $ routing6 routing) (preferred /= Want_IP4) + return $ NodeFound ks ks6 + where + go f var = f . R.kclosest mainlineSpace k node <$> atomically (readTVar var) + + k = R.defaultK + + +data GetPeers = GetPeers InfoHash (Maybe WantIP) + +instance BEncode GetPeers where + toBEncode (GetPeers ih iptyp) + = toDict $ info_hash_key .=! ih + .: want_key .=? iptyp + .: endDict + fromBEncode = fromDict $ GetPeers <$>! info_hash_key <*>? want_key + + +data GotPeers = GotPeers + { -- | If the queried node has no peers for the infohash, returned + -- the K nodes in the queried nodes routing table closest to the + -- infohash supplied in the query. + peers :: [PeerAddr] + + , nodes :: NodeFound + + -- | The token value is a required argument for a future + -- announce_peer query. + , grantedToken :: Token + } -- deriving (Show, Eq, Typeable) + +nodeIsIPv6 :: NodeInfo -> Bool +nodeIsIPv6 (NodeInfo _ (IPv6 _) _) = True +nodeIsIPv6 _ = False + +instance BEncode GotPeers where + toBEncode GotPeers { nodes = NodeFound ns4 ns6, ..} = toDict $ + nodes_key .=? (if null ns4 then Nothing + else Just $ S.runPut (mapM_ putNodeInfo4 ns4)) + .: nodes6_key .=? (if null ns6 then Nothing + else Just $ S.runPut (mapM_ putNodeInfo4 ns6)) + .: token_key .=! grantedToken + .: peers_key .=! map S.encode peers + .: endDict + + fromBEncode = fromDict $ do + ns4 <- fromMaybe [] <$> optional (binary getNodeInfo4 nodes_key) -- "nodes" + ns6 <- fromMaybe [] <$> optional (binary getNodeInfo6 nodes6_key) -- "nodes6" + -- TODO: BEP 42... + -- + -- Once enforced, responses to get_peers requests whose node ID does not + -- match its external IP should be considered to not contain a token and + -- thus not be eligible as storage target. Implementations should take + -- care that they find the closest set of nodes which return a token and + -- whose IDs matches their IPs before sending a store request to those + -- nodes. + -- + -- Sounds like something to take care of at peer-search time, so I'll + -- ignore it for now. + tok <- field (req token_key) -- "token" + ps <- fromMaybe [] <$> optional (field (req peers_key) >>= decodePeers) -- "values" + pure $ GotPeers ps (NodeFound ns4 ns6) tok + where + decodePeers = either fail pure . mapM S.decode + +getPeersH :: Routing -> SwarmsDatabase -> NodeInfo -> GetPeers -> IO GotPeers +getPeersH routing (SwarmsDatabase peers toks _) naddr (GetPeers ih iptyp) = do + ps <- do + tm <- getTimestamp + atomically $ do + (ps,store') <- Peers.freshPeers ih tm <$> readTVar peers + writeTVar peers store' + return ps + -- Filter peer results to only a single address family, IPv4 or IPv6, as + -- per BEP 32. + let notboth = iptyp >>= \case Want_Both -> Nothing + specific -> Just specific + selected = prefer4or6 naddr notboth + ps' = filter ( (== selected) . ipFamily . peerHost ) ps + tok <- grantToken toks naddr + ns <- findNodeH routing naddr (FindNode (coerce ih) iptyp) + return $ GotPeers ps' ns tok + +-- | Announce that the peer, controlling the querying node, is +-- downloading a torrent on a port. +data Announce = Announce + { -- | If set, the 'port' field should be ignored and the source + -- port of the UDP packet should be used as the peer's port + -- instead. This is useful for peers behind a NAT that may not + -- know their external port, and supporting uTP, they accept + -- incoming connections on the same port as the DHT port. + impliedPort :: Bool + + -- | infohash of the torrent; + , topic :: InfoHash + + -- | some clients announce the friendly name of the torrent here. + , announcedName :: Maybe ByteString + + -- | the port /this/ peer is listening; + , port :: PortNumber + + -- TODO: optional boolean "seed" key + + -- | received in response to a previous get_peers query. + , sessionToken :: Token + + } deriving (Show, Eq, Typeable) + +peer_ip_key = "ip" +peer_id_key = "peer id" +peer_port_key = "port" +msg_type_key = "msg_type" +piece_key = "piece" +total_size_key = "total_size" +node_id_key :: BKey +node_id_key = "id" +read_only_key :: BKey +read_only_key = "ro" +want_key :: BKey +want_key = "want" +target_key :: BKey +target_key = "target" +nodes_key :: BKey +nodes_key = "nodes" +nodes6_key :: BKey +nodes6_key = "nodes6" +info_hash_key :: BKey +info_hash_key = "info_hash" +peers_key :: BKey +peers_key = "values" +token_key :: BKey +token_key = "token" +name_key :: BKey +name_key = "name" +port_key :: BKey +port_key = "port" +implied_port_key :: BKey +implied_port_key = "implied_port" + +instance BEncode Announce where + toBEncode Announce {..} = toDict $ + implied_port_key .=? flagField impliedPort + .: info_hash_key .=! topic + .: name_key .=? announcedName + .: port_key .=! port + .: token_key .=! sessionToken + .: endDict + where + flagField flag = if flag then Just (1 :: Int) else Nothing + + fromBEncode = fromDict $ do + Announce <$> (boolField <$> optional (field (req implied_port_key))) + <*>! info_hash_key + <*>? name_key + <*>! port_key + <*>! token_key + where + boolField = maybe False (/= (0 :: Int)) + + + +-- | The queried node must verify that the token was previously sent +-- to the same IP address as the querying node. Then the queried node +-- should store the IP address of the querying node and the supplied +-- port number under the infohash in its store of peer contact +-- information. +data Announced = Announced + deriving (Show, Eq, Typeable) + +instance BEncode Announced where + toBEncode _ = toBEncode Ping + fromBEncode _ = pure Announced + +announceH :: SwarmsDatabase -> NodeInfo -> Announce -> IO (Either Error Announced) +announceH (SwarmsDatabase peers toks _) naddr announcement = do + checkToken toks naddr (sessionToken announcement) + >>= bool (Left <$> return (Error ProtocolError "invalid parameter: token")) + (Right <$> go) + where + go = atomically $ do + modifyTVar' peers + $ insertPeer (topic announcement) (announcedName announcement) + $ PeerAddr + { peerId = Nothing + -- Avoid storing IPv4-mapped addresses. + , peerHost = case nodeIP naddr of + IPv6 ip6 | Just ip4 <- un4map ip6 -> IPv4 ip4 + a -> a + , peerPort = if impliedPort announcement + then nodePort naddr + else port announcement + } + return Announced + +isReadonlyClient client = False -- TODO + +mainlineSend meth unwrap msg client nid addr = do + reply <- sendQuery client serializer (msg nid) addr + -- sendQuery will return (Just (Left _)) on a parse error. We're going to + -- blow it away with the join-either sequence. + -- TODO: Do something with parse errors. + return $ join $ either (const Nothing) Just <$> reply + where + serializer = MethodSerializer + { methodTimeout = 5 + , method = meth + , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) + , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) + (Right . unwrap) + . BE.fromBEncode) + . rspPayload + } + +ping :: MainlineClient -> NodeInfo -> IO Bool +ping client addr = + fromMaybe False + <$> mainlineSend (Method "ping") (\Pong -> True) (const Ping) client () addr + +-- searchQuery :: ni -> IO (Maybe [ni], [r], tok)) +getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],())) +getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) + +unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6,()) + +getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Token)) +getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce + +unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok) + +mainlineSearch qry = Search + { searchSpace = mainlineSpace + , searchNodeAddress = nodeIP &&& nodePort + , searchQuery = qry + } + +nodeSearch client = mainlineSearch (getNodes client) + +peerSearch client = mainlineSearch (getPeers client) + +-- | List of bootstrap nodes maintained by different bittorrent +-- software authors. +bootstrapNodes :: WantIP -> IO [NodeInfo] +bootstrapNodes want = unsafeInterleaveIO $ do + let wellknowns = + [ "router.bittorrent.com:6881" -- by BitTorrent Inc. + + -- doesn't work at the moment (use git blame) of commit + , "dht.transmissionbt.com:6881" -- by Transmission project + + , "router.utorrent.com:6881" + ] + nss <- forM wellknowns $ \hostAndPort -> do + e <- resolve want hostAndPort + case e of + Left _ -> return [] + Right sockaddr -> either (const $ return []) + (return . (: [])) + $ nodeInfo zeroID sockaddr + return $ concat nss + +-- | Resolve either a numeric network address or a hostname to a +-- numeric IP address of the node. +resolve :: WantIP -> String -> IO (Either IOError SockAddr) +resolve want hostAndPort = do + let hints = defaultHints { addrSocketType = Datagram + , addrFamily = case want of + Want_IP4 -> AF_INET + _ -> AF_INET6 + } + (rport,rhost) = span (/= ':') $ reverse hostAndPort + (host,port) = case rhost of + [] -> (hostAndPort, Nothing) + (_:hs) -> (reverse hs, Just (reverse rport)) + tryIOError $ do + -- getAddrInfo throws exception on empty list, so this + -- pattern matching never fails. + info : _ <- getAddrInfo (Just hints) (Just host) port + return $ addrAddress info + + -- cgit v1.2.3