{-# LANGUAGE CPP #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveFoldable #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TupleSections #-} module Network.BitTorrent.MainlineDHT where import Control.Applicative import Control.Arrow import Control.Concurrent.STM import Control.Monad import Crypto.Random import Data.BEncode as BE import qualified Data.BEncode.BDict as BE ;import Data.BEncode.BDict (BKey) import Data.BEncode.Pretty import Data.BEncode.Types (BDict) import Data.Bits import Data.Bits.ByteString () import Data.Bool import Data.ByteArray (ByteArrayAccess) import qualified Data.ByteString as B ;import Data.ByteString (ByteString) import qualified Data.ByteString.Base16 as Base16 import qualified Data.ByteString.Char8 as C8 import Data.ByteString.Lazy (toStrict) import qualified Data.ByteString.Lazy.Char8 as L8 import Data.Char import Data.Coerce import Data.Data import Data.Default import Data.Digest.CRC32C import Data.Function (fix) import Data.Hashable #if MIN_VERSION_iproute(1,7,4) import Data.IP hiding ( fromSockAddr #if MIN_VERSION_iproute(1,7,8) , toSockAddr #endif ) #else import Data.IP #endif import Data.Maybe import Data.Monoid import Data.Ord import qualified Data.Serialize as S import Data.Set (Set) import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) import Data.Torrent import Data.Word import qualified Data.Wrapper.PSQInt as Int import Debug.Trace import Network.BitTorrent.MainlineDHT.Symbols import Network.Kademlia import Network.Kademlia.Bootstrap import Network.Address (fromSockAddr, setPort, sockAddrPort, testIdBit, toSockAddr, genBucketSample', WantIP(..), un4map,either4or6,ipFamily) import Network.BitTorrent.DHT.ContactInfo as Peers import Network.Kademlia.Search (Search (..)) import Network.BitTorrent.DHT.Token as Token import qualified Network.Kademlia.Routing as R ;import Network.Kademlia.Routing (getTimestamp) import Network.QueryResponse as QR import Network.Socket import System.IO.Error import System.IO.Unsafe (unsafeInterleaveIO) import qualified Text.ParserCombinators.ReadP as RP #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import qualified Data.Aeson as JSON ;import Data.Aeson (FromJSON, ToJSON, (.=)) import Text.Read import System.Global6 import Control.TriadCommittee import Data.TableMethods import DPut import DebugTag newtype NodeId = NodeId ByteString deriving (Eq,Ord,ByteArrayAccess, Bits, Hashable) instance BEncode NodeId where fromBEncode bval = do bs <- fromBEncode bval if B.length bs /= 20 then Left "Invalid length node id." else Right $ NodeId bs toBEncode (NodeId bs) = toBEncode bs instance Show NodeId where show (NodeId bs) = C8.unpack $ Base16.encode bs instance S.Serialize NodeId where get = NodeId <$> S.getBytes 20 put (NodeId bs) = S.putByteString bs instance FiniteBits NodeId where finiteBitSize _ = 160 instance Read NodeId where readsPrec _ str | (bs, xs) <- Base16.decode $ C8.pack str , B.length bs == 20 = [ (NodeId bs, drop 40 str) ] | otherwise = [] zeroID :: NodeId zeroID = NodeId $ B.replicate 20 0 data NodeInfo = NodeInfo { nodeId :: NodeId , nodeIP :: IP , nodePort :: PortNumber } deriving (Eq,Ord) instance ToJSON NodeInfo where toJSON (NodeInfo nid (IPv4 ip) port) = JSON.object [ "node-id" .= show nid , "ipv4" .= show ip , "port" .= (fromIntegral port :: Int) ] toJSON (NodeInfo nid (IPv6 ip6) port) | Just ip <- un4map ip6 = JSON.object [ "node-id" .= show nid , "ipv4" .= show ip , "port" .= (fromIntegral port :: Int) ] | otherwise = JSON.object [ "node-id" .= show nid , "ipv6" .= show ip6 , "port" .= (fromIntegral port :: Int) ] instance FromJSON NodeInfo where parseJSON (JSON.Object v) = do nidstr <- v JSON..: "node-id" ip6str <- v JSON..:? "ipv6" ip4str <- v JSON..:? "ipv4" portnum <- v JSON..: "port" ip <- maybe empty (return . IPv6) (ip6str >>= readMaybe) <|> maybe empty (return . IPv4) (ip4str >>= readMaybe) let (bs,_) = Base16.decode (C8.pack nidstr) guard (B.length bs == 20) return $ NodeInfo (NodeId bs) ip (fromIntegral (portnum :: Word16)) hexdigit :: Char -> Bool hexdigit c = ('0' <= c && c <= '9') || ( 'a' <= c && c <= 'f') || ( 'A' <= c && c <= 'F') instance Read NodeInfo where readsPrec i = RP.readP_to_S $ do RP.skipSpaces let n = 40 -- characters in node id. parseAddr = RP.between (RP.char '(') (RP.char ')') (RP.munch (/=')')) RP.+++ RP.munch (not . isSpace) nodeidAt = do hexhash <- sequence $ replicate n (RP.satisfy hexdigit) RP.char '@' RP.+++ RP.satisfy isSpace addrstr <- parseAddr nid <- case Base16.decode $ C8.pack hexhash of (bs,_) | B.length bs==20 -> return (NodeId bs) _ -> fail "Bad node id." return (nid,addrstr) (nid,addrstr) <- ( nodeidAt RP.+++ ( (zeroID,) <$> parseAddr) ) let raddr = do ip <- RP.between (RP.char '[') (RP.char ']') (IPv6 <$> RP.readS_to_P (readsPrec i)) RP.+++ (IPv4 <$> RP.readS_to_P (readsPrec i)) _ <- RP.char ':' port <- toEnum <$> RP.readS_to_P (readsPrec i) return (ip, port) (ip,port) <- case RP.readP_to_S raddr addrstr of [] -> fail "Bad address." ((ip,port),_):_ -> return (ip,port) return $ NodeInfo nid ip port -- The Hashable instance depends only on the IP address and port number. It is -- used to compute the announce token. instance Hashable NodeInfo where hashWithSalt s ni = hashWithSalt s (nodeIP ni , nodePort ni) {-# INLINE hashWithSalt #-} instance Show NodeInfo where showsPrec _ (NodeInfo nid ip port) = shows nid . ('@' :) . showsip . (':' :) . shows port where showsip | IPv4 ip4 <- ip = shows ip4 | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = shows ip4 | otherwise = ('[' :) . shows ip . (']' :) {- -- | KRPC 'compact list' compatible encoding: contact information for -- nodes is encoded as a 26-byte string. Also known as "Compact node -- info" the 20-byte Node ID in network byte order has the compact -- IP-address/port info concatenated to the end. get = NodeInfo <$> (NodeId <$> S.getBytes 20 ) <*> S.get <*> S.get -} getNodeInfo4 :: S.Get NodeInfo getNodeInfo4 = NodeInfo <$> (NodeId <$> S.getBytes 20) <*> (IPv4 <$> S.get) <*> S.get putNodeInfo4 :: NodeInfo -> S.Put putNodeInfo4 (NodeInfo (NodeId nid) ip port) | IPv4 ip4 <- ip = put4 ip4 | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = put4 ip4 | otherwise = return () where put4 ip4 = S.putByteString nid >> S.put ip4 >> S.put port getNodeInfo6 :: S.Get NodeInfo getNodeInfo6 = NodeInfo <$> (NodeId <$> S.getBytes 20) <*> (IPv6 <$> S.get) <*> S.get putNodeInfo6 :: NodeInfo -> S.Put putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port) = S.putByteString nid >> S.put ip >> S.put port putNodeInfo6 _ = return () -- | TODO: This should depend on the bind address to support IPv4-only. For -- now, in order to support dual-stack listen, we're going to assume IPv6 is -- wanted and map IPv4 addresses accordingly. nodeAddr :: NodeInfo -> SockAddr nodeAddr (NodeInfo _ ip port) = case ip of IPv4 ip4 -> setPort port $ toSockAddr (ipv4ToIPv6 ip4) IPv6 ip6 -> setPort port $ toSockAddr ip6 nodeInfo :: NodeId -> SockAddr -> Either String NodeInfo nodeInfo nid saddr | Just ip <- fromSockAddr saddr , Just port <- sockAddrPort saddr = Right $ NodeInfo nid ip port | otherwise = Left "Address family not supported." -- | Types of RPC errors. data ErrorCode -- | Some error doesn't fit in any other category. = GenericError -- | Occurs when server fail to process procedure call. | ServerError -- | Malformed packet, invalid arguments or bad token. | ProtocolError -- | Occurs when client trying to call method server don't know. | MethodUnknown deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data) -- | According to the table: -- instance Enum ErrorCode where fromEnum GenericError = 201 fromEnum ServerError = 202 fromEnum ProtocolError = 203 fromEnum MethodUnknown = 204 {-# INLINE fromEnum #-} toEnum 201 = GenericError toEnum 202 = ServerError toEnum 203 = ProtocolError toEnum 204 = MethodUnknown toEnum _ = GenericError {-# INLINE toEnum #-} instance BEncode ErrorCode where toBEncode = toBEncode . fromEnum {-# INLINE toBEncode #-} fromBEncode b = toEnum <$> fromBEncode b {-# INLINE fromBEncode #-} data Error = Error { errorCode :: !ErrorCode -- ^ The type of error. , errorMessage :: !ByteString -- ^ Human-readable text message. } deriving ( Show, Eq, Ord, Typeable, Data, Read ) newtype TransactionId = TransactionId ByteString deriving (Eq, Ord, Show, BEncode) newtype Method = Method ByteString deriving (Eq, Ord, Show, BEncode) data Message a = Q { msgOrigin :: NodeId , msgID :: TransactionId , qryPayload :: a , qryMethod :: Method , qryReadOnly :: Bool } | R { msgOrigin :: NodeId , msgID :: TransactionId , rspPayload :: Either Error a , rspReflectedIP :: Maybe SockAddr } showBE :: BValue -> String showBE bval = L8.unpack (showBEncode bval) instance BE.BEncode (Message BValue) where toBEncode m = encodeMessage m {- in case m of Q {} -> trace ("encoded(query): "++showBE r) r R {} -> trace ("encoded(response): "++showBE r) r -} fromBEncode bval = decodeMessage bval {- in case r of Left e -> trace (show e) r Right (Q {}) -> trace ("decoded(query): "++showBE bval) r Right (R {}) -> trace ("decoded(response): "++showBE bval) r -} decodeMessage :: BValue -> Either String (Message BValue) decodeMessage = fromDict $ do key <- lookAhead (field (req "y")) let _ = key :: BKey f <- case key of "q" -> do a <- field (req "a") g <- either fail return $ flip fromDict a $ do who <- field (req "id") ro <- fromMaybe False <$> optional (field (req "ro")) return $ \meth tid -> Q who tid a meth ro meth <- field (req "q") return $ g meth "r" -> do ip <- do ipstr <- optional (field (req "ip")) mapM (either fail return . decodeAddr) ipstr vals <- field (req "r") either fail return $ flip fromDict vals $ do who <- field (req "id") return $ \tid -> R who tid (Right vals) ip "e" -> do (ecode,emsg) <- field (req "e") ip <- do ipstr <- optional (field (req "ip")) mapM (either fail return . decodeAddr) ipstr -- FIXME:Spec does not give us the NodeId of the sender. -- Using 'zeroID' as place holder. -- We should ignore the msgOrigin for errors in 'updateRouting'. -- We should consider making msgOrigin a Maybe value. return $ \tid -> R zeroID tid (Left (Error ecode emsg)) ip _ -> fail $ "Mainline message is not a query, response, or an error: " ++ show key tid <- field (req "t") return $ f (tid :: TransactionId) encodeMessage :: Message BValue -> BValue encodeMessage (Q origin tid a meth ro) = case a of BDict args -> encodeQuery tid meth (BDict $ genericArgs origin ro `BE.union` args) _ -> encodeQuery tid meth a -- XXX: Not really a valid query. encodeMessage (R origin tid v ip) = case v of Right (BDict vals) -> encodeResponse tid (BDict $ genericArgs origin False `BE.union` vals) ip Left err -> encodeError tid err encodeAddr :: SockAddr -> ByteString encodeAddr = either encode4 encode6 . either4or6 where encode4 (SockAddrInet port addr) = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port)) encode6 (SockAddrInet6 port _ addr _) = S.runPut (S.put addr >> S.putWord16be (fromIntegral port)) encode6 _ = B.empty decodeAddr :: ByteString -> Either String SockAddr decodeAddr bs = S.runGet g bs where g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be) | otherwise = do host <- S.get -- TODO: Is this right? port <- fromIntegral <$> S.getWord16be return $ SockAddrInet6 port 0 host 0 genericArgs :: BEncode a => a -> Bool -> BDict genericArgs nodeid ro = "id" .=! nodeid .: "ro" .=? bool Nothing (Just (1 :: Int)) ro .: endDict encodeError :: BEncode a => a -> Error -> BValue encodeError tid (Error ecode emsg) = encodeAny tid "e" (ecode,emsg) id encodeResponse :: (BEncode tid, BEncode vals) => tid -> vals -> Maybe SockAddr -> BValue encodeResponse tid rvals rip = encodeAny tid "r" rvals ("ip" .=? (BString . encodeAddr <$> rip) .:) encodeQuery :: (BEncode args, BEncode tid, BEncode method) => tid -> method -> args -> BValue encodeQuery tid qmeth qargs = encodeAny tid "q" qmeth ("a" .=! qargs .:) encodeAny :: (BEncode tid, BEncode a) => tid -> BKey -> a -> (BDict -> BDict) -> BValue encodeAny tid key val aux = toDict $ aux $ key .=! val .: "t" .=! tid .: "y" .=! key .: endDict showPacket :: ([L8.ByteString] -> [L8.ByteString]) -> SockAddr -> L8.ByteString -> ByteString -> String showPacket f addr flow bs = L8.unpack $ L8.unlines es where es = map (L8.append prefix) (f $ L8.lines pp) prefix = L8.pack (either show show $ either4or6 addr) <> flow pp = either L8.pack showBEncode $ BE.decode bs -- Add detailed printouts for every packet. addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString addVerbosity tr = tr { awaitMessage = do (m,io) <- awaitMessage tr case m of Arrival addr msg -> return (m, io >> dput XBitTorrent (showPacket id addr " --> " msg)) _ -> return (m, io) , sendMessage = \addr msg -> do dput XBitTorrent (showPacket id addr " <-- " msg) sendMessage tr addr msg } showParseError :: ByteString -> SockAddr -> String -> String showParseError bs addr err = showPacket (L8.pack err :) addr " --> " bs parsePacket :: ByteString -> SockAddr -> Either String (Message BValue, NodeInfo) parsePacket bs addr = left (showParseError bs addr) $ do pkt <- BE.decode bs -- TODO: Error packets do not include a valid msgOrigin. -- The BE.decode method is using 'zeroID' as a placeholder. ni <- nodeInfo (msgOrigin pkt) addr return (pkt, ni) encodePacket :: Message BValue -> NodeInfo -> (ByteString, SockAddr) encodePacket msg ni = ( toStrict $ BE.encode msg , nodeAddr ni ) classify :: Message BValue -> MessageClass String Method TransactionId NodeInfo (Message BValue) classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid classify (R { msgID = tid }) = IsResponse tid encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) encodeQueryPayload :: BEncode a => Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) decodePayload :: BEncode a => Message BValue -> Either String a decodePayload msg = BE.fromBEncode $ qryPayload msg type Handler = MethodHandler String TransactionId NodeInfo (Message BValue) handler :: ( BEncode a , BEncode b ) => (NodeInfo -> a -> IO b) -> Maybe Handler handler f = Just $ MethodHandler decodePayload encodeResponsePayload f handlerE :: ( BEncode a , BEncode b ) => (NodeInfo -> a -> IO (Either Error b)) -> Maybe Handler handlerE f = Just $ MethodHandler decodePayload enc f where enc tid self dest (Left e) = errorPayload tid self dest e enc tid self dest (Right b) = encodeResponsePayload tid self dest b type AnnounceSet = Set (InfoHash, PortNumber) data SwarmsDatabase = SwarmsDatabase { contactInfo :: !( TVar PeerStore ) -- ^ Published by other nodes. , sessionTokens :: !( TVar SessionTokens ) -- ^ Query session IDs. , announceInfo :: !( TVar AnnounceSet ) -- ^ To publish by this node. } newSwarmsDatabase :: IO SwarmsDatabase newSwarmsDatabase = do toks <- nullSessionTokens atomically $ SwarmsDatabase <$> newTVar def <*> newTVar toks <*> newTVar def data Routing = Routing { tentativeId :: NodeInfo , committee4 :: TriadCommittee NodeId SockAddr , committee6 :: TriadCommittee NodeId SockAddr , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId } sched4 :: Routing -> TVar (Int.PSQ POSIXTime) sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue sched6 :: Routing -> TVar (Int.PSQ POSIXTime) sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue routing4 :: Routing -> TVar (R.BucketList NodeInfo) routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets routing6 :: Routing -> TVar (R.BucketList NodeInfo) routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets traced :: Show tid => TableMethods t tid -> TableMethods t tid traced (TableMethods ins del lkup) = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) (\tid t -> trace ("del "++show tid) $ del tid t) (\tid t -> trace ("lookup "++show tid) $ lkup tid t) type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) -- | Like 'nodeInfo' but falls back to 'iNADDR_ANY' for nodeIP' and 'nodePort'. mkNodeInfo :: NodeId -> SockAddr -> NodeInfo mkNodeInfo nid addr = NodeInfo { nodeId = nid , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr , nodePort = fromMaybe 0 $ sockAddrPort addr } newClient :: SwarmsDatabase -> SockAddr -- ^ Tentative IP address for this node (the bind address is suitable). -> Transport String SockAddr ByteString -- ^ UDP transport -> IO ( MainlineClient , Routing , [NodeInfo] -> [NodeInfo] -> IO () , [NodeInfo] -> [NodeInfo] -> IO () , IO () ) newClient swarms addr udp = do nid <- NodeId <$> getRandomBytes 20 let tentative_info = mkNodeInfo nid addr tentative_info6 <- maybe tentative_info (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info) $ bep42 (toSockAddr ip6) (nodeId tentative_info) , nodeIP = IPv6 ip6 }) <$> global6 addr4 <- atomically $ newTChan addr6 <- atomically $ newTChan mkrouting <- atomically $ do -- We defer initializing the refreshSearch and refreshPing until we -- have a client to send queries with. let nullPing = const $ return False tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount refresher4 <- newBucketRefresher tbl4 nullSearch nullPing tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount refresher6 <- newBucketRefresher tbl6 nullSearch nullPing let updateIPVote tblvar addrvar a = do bkts <- readTVar tblvar case bep42 a (nodeId $ R.thisNode bkts) of Just nid -> do let tbl = R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) (mkNodeInfo nid a) (R.defaultBucketCount) writeTVar tblvar tbl writeTChan addrvar $ Just (a,map fst $ concat $ R.toList bkts) Nothing -> return () committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 return $ \client -> -- Now we have a client, so tell the BucketRefresher how to search and ping. let updIO r = updateRefresherIO (nodeSearch client) (ping client) r in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6) map_var <- atomically $ newTVar (0, mempty) let routing = mkrouting outgoingClient net = onInbound (updateRouting outgoingClient routing) $ layerTransport parsePacket encodePacket $ udp -- Paranoid: It's safe to define /net/ and /client/ to be mutually -- recursive since 'updateRouting' does not invoke 'awaitMessage' which -- which was modified by 'onInbound'. However, I'm going to avoid the -- mutual reference just to be safe. outgoingClient = client { clientNet = net { awaitMessage = pure (Terminated, return ()) } } dispatch = DispatchMethods { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x) , tableMethods = mapT -- :: TransactionMethods tbl tid x } handlers :: Method -> Maybe Handler handlers ( Method "ping" ) = handler pingH handlers ( Method "find_node" ) = handler $ findNodeH routing handlers ( Method "get_peers" ) = handler $ getPeersH routing swarms handlers ( Method "announce_peer" ) = handlerE $ announceH swarms handlers ( Method meth ) = Just $ defaultHandler meth mapT = transactionMethods mapMethods gen gen :: Word16 -> (TransactionId, Word16) gen cnt = (TransactionId $ S.encode cnt, cnt+1) client = Client { clientNet = addHandler (handleMessage client) net , clientDispatcher = dispatch , clientErrorReporter = ignoreErrors -- printErrors stderr , clientPending = map_var , clientAddress = \maddr -> atomically $ do let var = case flip prefer4or6 Nothing <$> maddr of Just Want_IP6 -> routing6 routing _ -> routing4 routing R.thisNode <$> readTVar var , clientResponseId = return } fork $ fix $ \again -> do myThreadId >>= flip labelThread "addr4" x <- atomically (readTChan addr4) forM_ x $ \(addr, ns) -> do dput XBitTorrent $ "External IPv4: "++show (addr, length ns) forM_ ns $ \n -> do dput XBitTorrent $ "Change IP, ping: "++show n ping outgoingClient n -- TODO: trigger bootstrap ipv4 again fork $ fix $ \again -> do myThreadId >>= flip labelThread "addr6" x <- atomically (readTChan addr6) forM_ x $ \(addr,ns) -> do dput XBitTorrent $ "External IPv6: "++show (addr, length ns) forM_ ns $ \n -> do dput XBitTorrent $ "Change IP, ping: "++show n ping outgoingClient n -- TODO: trigger bootstrap ipv6 again refresh_thread4 <- forkPollForRefresh $ refresher4 routing refresh_thread6 <- forkPollForRefresh $ refresher6 routing gc_peers <- forkAnnouncedInfohashesGC (contactInfo swarms) return (client, routing, bootstrap (refresher4 routing), bootstrap (refresher6 routing) , do killThread refresh_thread4 -- TODO: Better termination mechanism. killThread refresh_thread6 -- TODO: Better termination mechanism. atomically $ writeTChan addr4 Nothing -- Terminate "addr4" thread. atomically $ writeTChan addr6 Nothing -- Terminate "addr6" thread. killThread gc_peers ) -- Note that you should call .put() every hour for content that you want to -- keep alive, since nodes may discard data nodes older than 2 hours. (source: -- https://www.npmjs.com/package/bittorrent-dht) -- -- This function will discard records between 3 and 6 hours old. forkAnnouncedInfohashesGC :: TVar PeerStore -> IO ThreadId forkAnnouncedInfohashesGC vpeers = fork $ do myThreadId >>= flip labelThread "gc:bt-peers" fix $ \loop -> do cutoff <- getPOSIXTime threadDelay 10800000000 -- 3 hours atomically $ modifyTVar' vpeers $ deleteOlderThan cutoff loop -- | Modifies a purely random 'NodeId' to one that is related to a given -- routable address in accordance with BEP 42. -- -- Test vectors from the spec: -- -- IP rand example node ID -- ============ ===== ========================================== -- 124.31.75.21 1 5fbfbf f10c5d6a4ec8a88e4c6ab4c28b95eee4 01 -- 21.75.31.124 86 5a3ce9 c14e7a08645677bbd1cfe7d8f956d532 56 -- 65.23.51.170 22 a5d432 20bc8f112a3d426c84764f8c2a1150e6 16 -- 84.124.73.14 65 1b0321 dd1bb1fe518101ceef99462b947a01ff 41 -- 43.213.53.83 90 e56f6c bf5b7c4be0237986d5243b87aa6d5130 5a bep42 :: SockAddr -> NodeId -> Maybe NodeId bep42 addr0 (NodeId r) | let addr = either id id $ either4or6 addr0 -- unmap 4mapped SockAddrs , Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4) <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6) = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) | otherwise = Nothing where ip4mask = "\x03\x0f\x3f\xff" :: ByteString ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString nbhood_select = B.last r .&. 7 retr n = pure $ B.drop (B.length r - n) r crc = S.encode . crc32c . B.pack applyMask ip = case B.zipWith (.&.) msk ip of (b:bs) -> (b .|. shiftL nbhood_select 5) : bs bs -> bs where msk | B.length ip == 4 = ip4mask | otherwise = ip6mask defaultHandler :: ByteString -> Handler defaultHandler meth = MethodHandler decodePayload errorPayload returnError where returnError :: NodeInfo -> BValue -> IO Error returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo TransactionId -> Kademlia NodeId NodeInfo mainlineKademlia client committee refresher = Kademlia quietInsertions mainlineSpace (vanillaIO (refreshBuckets refresher) $ ping client) { tblTransition = \tr -> do io1 <- transitionCommittee committee tr io2 <- touchBucket refresher tr return $ do io1 >> io2 {- noisy (timestamp updates are currently reported as transitions to Accepted) dput XBitTorrent $ unwords [ show (transitionedTo tr) , show (transitioningNode tr) ] -} } mainlineSpace :: R.KademliaSpace NodeId NodeInfo mainlineSpace = R.KademliaSpace { R.kademliaLocation = nodeId , R.kademliaTestBit = testIdBit , R.kademliaXor = xor , R.kademliaSample = genBucketSample' } transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) transitionCommittee committee (RoutingTransition ni Stranger) = do delVote committee (nodeId ni) return $ do dput XBitTorrent $ "delVote "++show (nodeId ni) transitionCommittee committee _ = return $ return () updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () updateRouting client routing naddr msg = do case prefer4or6 naddr Nothing of Want_IP4 -> go (committee4 routing) (refresher4 routing) Want_IP6 -> go (committee6 routing) (refresher6 routing) where go committee refresher = do self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) when (nodeIP self /= nodeIP naddr) $ do case msg of R { rspReflectedIP = Just sockaddr } -> do -- dput XBitTorrent $ "External: "++show (nodeId naddr,sockaddr) atomically $ addVote committee (nodeId naddr) sockaddr _ -> return () insertNode (mainlineKademlia client committee refresher) naddr data Ping = Ping deriving Show -- Pong is the same as Ping. type Pong = Ping pattern Pong = Ping instance BEncode Ping where toBEncode Ping = toDict endDict fromBEncode _ = pure Ping wantList :: WantIP -> [ByteString] wantList Want_IP4 = ["ip4"] wantList Want_IP6 = ["ip6"] wantList Want_Both = ["ip4","ip6"] instance BEncode WantIP where toBEncode w = toBEncode $ wantList w fromBEncode bval = do wants <- fromBEncode bval let _ = wants :: [ByteString] case (elem "ip4" wants, elem "ip6" wants) of (True,True) -> Right Want_Both (True,False) -> Right Want_IP4 (False,True) -> Right Want_IP6 _ -> Left "Unrecognized IP type." data FindNode = FindNode NodeId (Maybe WantIP) instance BEncode FindNode where toBEncode (FindNode nid iptyp) = toDict $ target_key .=! nid .: want_key .=? iptyp .: endDict fromBEncode = fromDict $ FindNode <$>! target_key <*>? want_key data NodeFound = NodeFound { nodes4 :: [NodeInfo] , nodes6 :: [NodeInfo] } instance BEncode NodeFound where toBEncode (NodeFound ns ns6) = toDict $ nodes_key .=? (if Prelude.null ns then Nothing else Just (S.runPut (mapM_ putNodeInfo4 ns))) .: nodes6_key .=? (if Prelude.null ns6 then Nothing else Just (S.runPut (mapM_ putNodeInfo6 ns6))) .: endDict fromBEncode bval = NodeFound <$> ns4 <*> ns6 where opt ns = fromMaybe [] <$> optional ns ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval binary :: S.Get a -> BKey -> BE.Get [a] binary get k = field (req k) >>= either (fail . format) return . S.runGet (many get) where format str = "fail to deserialize " ++ show k ++ " field: " ++ str pingH :: NodeInfo -> Ping -> IO Pong pingH _ Ping = return Pong prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp findNodeH :: Routing -> NodeInfo -> FindNode -> IO NodeFound findNodeH routing addr (FindNode node iptyp) = do let preferred = prefer4or6 addr iptyp (append4,append6) <- atomically $ do ni4 <- R.thisNode <$> readTVar (routing4 routing) ni6 <- R.thisNode <$> readTVar (routing6 routing) return $ case ipFamily (nodeIP addr) of Want_IP4 -> (id, (++ [ni6])) Want_IP6 -> ((++ [ni4]), id) ks <- bool (return []) (go append4 $ routing4 routing) (preferred /= Want_IP6) ks6 <- bool (return []) (go append6 $ routing6 routing) (preferred /= Want_IP4) return $ NodeFound ks ks6 where go f var = f . R.kclosest mainlineSpace k node <$> atomically (readTVar var) k = R.defaultK data GetPeers = GetPeers InfoHash (Maybe WantIP) instance BEncode GetPeers where toBEncode (GetPeers ih iptyp) = toDict $ info_hash_key .=! ih .: want_key .=? iptyp .: endDict fromBEncode = fromDict $ GetPeers <$>! info_hash_key <*>? want_key data GotPeers = GotPeers { -- | If the queried node has no peers for the infohash, returned -- the K nodes in the queried nodes routing table closest to the -- infohash supplied in the query. peers :: [PeerAddr] , nodes :: NodeFound -- | The token value is a required argument for a future -- announce_peer query. , grantedToken :: Token } -- deriving (Show, Eq, Typeable) nodeIsIPv6 :: NodeInfo -> Bool nodeIsIPv6 (NodeInfo _ (IPv6 _) _) = True nodeIsIPv6 _ = False instance BEncode GotPeers where toBEncode GotPeers { nodes = NodeFound ns4 ns6, ..} = toDict $ nodes_key .=? (if null ns4 then Nothing else Just $ S.runPut (mapM_ putNodeInfo4 ns4)) .: nodes6_key .=? (if null ns6 then Nothing else Just $ S.runPut (mapM_ putNodeInfo4 ns6)) .: token_key .=! grantedToken .: peers_key .=! map S.encode peers .: endDict fromBEncode = fromDict $ do ns4 <- fromMaybe [] <$> optional (binary getNodeInfo4 nodes_key) -- "nodes" ns6 <- fromMaybe [] <$> optional (binary getNodeInfo6 nodes6_key) -- "nodes6" -- TODO: BEP 42... -- -- Once enforced, responses to get_peers requests whose node ID does not -- match its external IP should be considered to not contain a token and -- thus not be eligible as storage target. Implementations should take -- care that they find the closest set of nodes which return a token and -- whose IDs matches their IPs before sending a store request to those -- nodes. -- -- Sounds like something to take care of at peer-search time, so I'll -- ignore it for now. tok <- field (req token_key) -- "token" ps <- fromMaybe [] <$> optional (field (req peers_key) >>= decodePeers) -- "values" pure $ GotPeers ps (NodeFound ns4 ns6) tok where decodePeers = either fail pure . mapM S.decode getPeersH :: Routing -> SwarmsDatabase -> NodeInfo -> GetPeers -> IO GotPeers getPeersH routing (SwarmsDatabase peers toks _) naddr (GetPeers ih iptyp) = do ps <- do tm <- getTimestamp atomically $ do (ps,store') <- Peers.freshPeers ih tm <$> readTVar peers writeTVar peers store' return ps -- Filter peer results to only a single address family, IPv4 or IPv6, as -- per BEP 32. let notboth = iptyp >>= \case Want_Both -> Nothing specific -> Just specific selected = prefer4or6 naddr notboth ps' = filter ( (== selected) . ipFamily . peerHost ) ps tok <- grantToken toks naddr ns <- findNodeH routing naddr (FindNode (coerce ih) iptyp) return $ GotPeers ps' ns tok -- | Announce that the peer, controlling the querying node, is -- downloading a torrent on a port. data Announce = Announce { -- | If set, the 'port' field should be ignored and the source -- port of the UDP packet should be used as the peer's port -- instead. This is useful for peers behind a NAT that may not -- know their external port, and supporting uTP, they accept -- incoming connections on the same port as the DHT port. impliedPort :: Bool -- | infohash of the torrent; , topic :: InfoHash -- | some clients announce the friendly name of the torrent here. , announcedName :: Maybe ByteString -- | the port /this/ peer is listening; , port :: PortNumber -- TODO: optional boolean "seed" key -- | received in response to a previous get_peers query. , sessionToken :: Token } deriving (Show, Eq, Typeable) mkAnnounce :: PortNumber -> InfoHash -> Token -> Announce mkAnnounce portnum info token = Announce { topic = info , port = portnum , sessionToken = token , announcedName = Nothing , impliedPort = False } instance BEncode Announce where toBEncode Announce {..} = toDict $ implied_port_key .=? flagField impliedPort .: info_hash_key .=! topic .: name_key .=? announcedName .: port_key .=! port .: token_key .=! sessionToken .: endDict where flagField flag = if flag then Just (1 :: Int) else Nothing fromBEncode = fromDict $ do Announce <$> (boolField <$> optional (field (req implied_port_key))) <*>! info_hash_key <*>? name_key <*>! port_key <*>! token_key where boolField = maybe False (/= (0 :: Int)) -- | The queried node must verify that the token was previously sent -- to the same IP address as the querying node. Then the queried node -- should store the IP address of the querying node and the supplied -- port number under the infohash in its store of peer contact -- information. data Announced = Announced deriving (Show, Eq, Typeable) instance BEncode Announced where toBEncode _ = toBEncode Ping fromBEncode _ = pure Announced announceH :: SwarmsDatabase -> NodeInfo -> Announce -> IO (Either Error Announced) announceH (SwarmsDatabase peers toks _) naddr announcement = do checkToken toks naddr (sessionToken announcement) >>= bool (Left <$> return (Error ProtocolError "invalid parameter: token")) (Right <$> go) where go = atomically $ do modifyTVar' peers $ insertPeer (topic announcement) (announcedName announcement) $ PeerAddr { peerId = Nothing -- Avoid storing IPv4-mapped addresses. , peerHost = case nodeIP naddr of IPv6 ip6 | Just ip4 <- un4map ip6 -> IPv4 ip4 a -> a , peerPort = if impliedPort announcement then nodePort naddr else port announcement } return Announced isReadonlyClient :: MainlineClient -> Bool isReadonlyClient client = False -- TODO mainlineAsync :: ( BEncode xqry , BEncode xrsp ) => Method -> (xrsp -> rsp) -> (qry -> xqry) -> MainlineClient -> qry -> NodeInfo -> (TransactionId -> QR.Result rsp -> IO ()) -> IO TransactionId mainlineAsync meth unwrap msg client nid addr withResult = do asyncQuery client serializer (msg nid) addr $ \qid reply -> do withResult qid $ case reply of Success (Right x) -> Success x Success (Left e) -> Canceled -- TODO: Do something with parse errors. Canceled -> Canceled TimedOut -> TimedOut where serializer = MethodSerializer { methodTimeout = \ni -> return (ni, 5000000) , method = meth , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) (Right . unwrap) . BE.fromBEncode) . rspPayload } mainlineSend :: ( BEncode xqry , BEncode xrsp ) => Method -> (xrsp -> rsp) -> (qry -> xqry) -> MainlineClient -> qry -> NodeInfo -> IO (QR.Result rsp) mainlineSend meth unwrap msg client nid addr = do reply <- sendQuery client serializer (msg nid) addr return $ case reply of Success (Right x) -> Success x Success (Left e) -> Canceled -- TODO: Do something with parse errors. Canceled -> Canceled TimedOut -> TimedOut where serializer = MethodSerializer { methodTimeout = \ni -> return (ni, 5000000) , method = meth , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) (Right . unwrap) . BE.fromBEncode) . rspPayload } ping :: MainlineClient -> NodeInfo -> IO Bool ping client addr = fromMaybe False . resultToMaybe <$> mainlineSend (Method "ping") (\Pong -> True) (const Ping) client () addr -- searchQuery :: ni -> IO (Maybe [ni], [r], tok)) getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) asyncGetNodes :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) -> IO TransactionId asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce asyncGetPeers :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[PeerAddr],Maybe Token) -> IO ()) -> IO TransactionId asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) nullTransactionId :: TransactionId nullTransactionId = TransactionId B.empty nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId nullSearch = Search { searchSpace = mainlineSpace , searchNodeAddress = nodeIP &&& nodePort , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId , searchQueryCancel = \_ _ -> return () , searchAlpha = 8 , searchK = 16 } mainlineSearch :: MainlineClient -> (MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO TransactionId) -> Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId mainlineSearch client qry = Search { searchSpace = mainlineSpace , searchNodeAddress = nodeIP &&& nodePort , searchQuery = qry client , searchQueryCancel = cancelQuery client , searchAlpha = 8 , searchK = 16 } nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo TransactionId nodeSearch client = mainlineSearch client asyncGetNodes peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr TransactionId peerSearch client = mainlineSearch client asyncGetPeers -- | List of bootstrap nodes maintained by different bittorrent -- software authors. bootstrapNodes :: WantIP -> IO [NodeInfo] bootstrapNodes want = unsafeInterleaveIO $ do let wellknowns = [ "router.bittorrent.com:6881" -- by BitTorrent Inc. -- doesn't work at the moment (use git blame) of commit , "dht.transmissionbt.com:6881" -- by Transmission project , "router.utorrent.com:6881" ] nss <- forM wellknowns $ \hostAndPort -> do e <- resolve want hostAndPort case e of Left _ -> return [] Right sockaddr -> either (const $ return []) (return . (: [])) $ nodeInfo zeroID sockaddr return $ concat nss -- | Resolve either a numeric network address or a hostname to a -- numeric IP address of the node. resolve :: WantIP -> String -> IO (Either IOError SockAddr) resolve want hostAndPort = do let hints = defaultHints { addrSocketType = Datagram , addrFamily = case want of Want_IP4 -> AF_INET _ -> AF_INET6 } (rport,rhost) = span (/= ':') $ reverse hostAndPort (host,port) = case rhost of [] -> (hostAndPort, Nothing) (_:hs) -> (reverse hs, Just (reverse rport)) tryIOError $ do -- getAddrInfo throws exception on empty list, so this -- pattern matching never fails. info : _ <- getAddrInfo (Just hints) (Just host) port return $ addrAddress info announce :: MainlineClient -> Announce -> NodeInfo -> IO (Maybe Announced) announce client msg addr = resultToMaybe <$> mainlineSend (Method "announce_peer") id (\() -> msg) client () addr