{-# LANGUAGE CPP #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveFoldable #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TupleSections #-} module Mainline where import Control.Applicative import Control.Arrow import Control.Concurrent.STM import Control.Monad import Crypto.Random import Data.BEncode as BE import qualified Data.BEncode.BDict as BE ;import Data.BEncode.BDict (BKey) import Data.BEncode.Pretty import Data.BEncode.Types (BDict) import Data.Bits import Data.Bits.ByteString import Data.Bool import qualified Data.ByteArray as BA ;import Data.ByteArray (ByteArrayAccess) import qualified Data.ByteString as B ;import Data.ByteString (ByteString) import qualified Data.ByteString.Base16 as Base16 import qualified Data.ByteString.Char8 as Char8 import Data.ByteString.Lazy (toStrict) import qualified Data.ByteString.Lazy.Char8 as L8 import Data.Char import Data.Coerce import Data.Data import Data.Default import Data.Digest.CRC32C import Data.Function (fix) import Data.Hashable import Data.IP import Data.List import Data.Maybe import Data.Monoid import Data.Ord import qualified Data.Serialize as S import Data.Set (Set) import Data.Time.Clock.POSIX (POSIXTime) import Data.Torrent import Data.Typeable import Data.Word import qualified Data.Wrapper.PSQInt as Int import Debug.Trace import Kademlia import Network.Address (Address, fromAddr, fromSockAddr, setPort, sockAddrPort, testIdBit, toSockAddr) import Network.BitTorrent.DHT.ContactInfo as Peers import Network.BitTorrent.DHT.Search (Search (..)) import Network.BitTorrent.DHT.Token as Token import Network.DatagramServer.Types (genBucketSample') import qualified Network.DHT.Routing as R ;import Network.DHT.Routing (Info, Timestamp, getTimestamp) import Network.QueryResponse import Network.Socket import System.IO import System.IO.Error import System.IO.Unsafe (unsafeInterleaveIO) import qualified Text.ParserCombinators.ReadP as RP #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Exception (SomeException (..), handle) import qualified Data.Aeson as JSON ;import Data.Aeson (FromJSON, ToJSON, (.=)) import Text.Read newtype NodeId = NodeId ByteString deriving (Eq,Ord,ByteArrayAccess, BEncode, Bits, Hashable) instance Show NodeId where show (NodeId bs) = Char8.unpack $ Base16.encode bs instance S.Serialize NodeId where get = NodeId <$> S.getBytes 20 put (NodeId bs) = S.putByteString bs instance FiniteBits NodeId where finiteBitSize _ = 160 zeroID :: NodeId zeroID = NodeId $ B.replicate 20 0 data NodeInfo = NodeInfo { nodeId :: NodeId , nodeIP :: IP , nodePort :: PortNumber } deriving (Eq,Ord) instance ToJSON NodeInfo where toJSON (NodeInfo nid (IPv4 ip) port) = JSON.object [ "node-id" .= show nid , "ipv4" .= show ip , "port" .= (fromIntegral port :: Int) ] toJSON (NodeInfo nid (IPv6 ip6) port) | Just ip <- un4map ip6 = JSON.object [ "node-id" .= show nid , "ipv4" .= show ip , "port" .= (fromIntegral port :: Int) ] | otherwise = JSON.object [ "node-id" .= show nid , "ipv6" .= show ip6 , "port" .= (fromIntegral port :: Int) ] instance FromJSON NodeInfo where parseJSON (JSON.Object v) = do nidstr <- v JSON..: "node-id" ip6str <- v JSON..:? "ipv6" ip4str <- v JSON..:? "ipv4" portnum <- v JSON..: "port" ip <- maybe empty (return . IPv6) (ip6str >>= readMaybe) <|> maybe empty (return . IPv4) (ip4str >>= readMaybe) let (bs,_) = Base16.decode (Char8.pack nidstr) guard (B.length bs == 20) return $ NodeInfo (NodeId bs) ip (fromIntegral (portnum :: Word16)) hexdigit :: Char -> Bool hexdigit c = ('0' <= c && c <= '9') || ( 'a' <= c && c <= 'f') || ( 'A' <= c && c <= 'F') instance Read NodeInfo where readsPrec i = RP.readP_to_S $ do RP.skipSpaces let n = 40 hexhash <- sequence $ replicate n (RP.satisfy hexdigit) RP.char '@' RP.+++ RP.satisfy isSpace addrstr <- RP.between (RP.char '(') (RP.char ')') (RP.munch (/=')')) RP.+++ RP.munch (not . isSpace) let raddr = do -- TODO: Support IPv6 ipv4 <- RP.readS_to_P (readsPrec i) _ <- RP.char ':' port <- toEnum <$> RP.readS_to_P (readsPrec i) return (IPv4 ipv4, port) (ip,port) <- case RP.readP_to_S raddr addrstr of [] -> fail "Bad address." ((ip,port),_):_ -> return (ip,port) nid <- case Base16.decode $ Char8.pack hexhash of (bs,_) | B.length bs==20 -> return (NodeId bs) _ -> fail "Bad node id." return $ NodeInfo nid ip port -- The Hashable instance depends only on the IP address and port number. It is -- used to compute the announce token. instance Hashable NodeInfo where hashWithSalt s ni = hashWithSalt s (nodeIP ni , nodePort ni) {-# INLINE hashWithSalt #-} instance Show NodeInfo where show (NodeInfo (NodeId nid) ip port) = Char8.unpack (Base16.encode nid) ++ "@" ++ show ip' ++ ":" ++ show port where ip' | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = IPv4 ip4 | otherwise = ip {- -- | KRPC 'compact list' compatible encoding: contact information for -- nodes is encoded as a 26-byte string. Also known as "Compact node -- info" the 20-byte Node ID in network byte order has the compact -- IP-address/port info concatenated to the end. get = NodeInfo <$> (NodeId <$> S.getBytes 20 ) <*> S.get <*> S.get -} getNodeInfo4 :: S.Get NodeInfo getNodeInfo4 = NodeInfo <$> (NodeId <$> S.getBytes 20) <*> (IPv4 <$> S.get) <*> S.get putNodeInfo4 :: NodeInfo -> S.Put putNodeInfo4 (NodeInfo (NodeId nid) ip port) | IPv4 ip4 <- ip = put4 ip4 | IPv6 ip6 <- ip , Just ip4 <- un4map ip6 = put4 ip4 | otherwise = return () where put4 ip4 = S.putByteString nid >> S.put ip4 >> S.put port getNodeInfo6 :: S.Get NodeInfo getNodeInfo6 = NodeInfo <$> (NodeId <$> S.getBytes 20) <*> (IPv6 <$> S.get) <*> S.get putNodeInfo6 :: NodeInfo -> S.Put putNodeInfo6 (NodeInfo (NodeId nid) (IPv6 ip) port) = S.putByteString nid >> S.put ip >> S.put port putNodeInfo6 _ = return () -- | TODO: This should depend on the bind address to support IPv4-only. For -- now, in order to support dual-stack listen, we're going to assume IPv6 is -- wanted and map IPv4 addresses accordingly. nodeAddr :: NodeInfo -> SockAddr nodeAddr (NodeInfo _ ip port) = case ip of IPv4 ip4 -> setPort port $ toSockAddr (ipv4ToIPv6 ip4) IPv6 ip6 -> setPort port $ toSockAddr ip6 nodeInfo :: NodeId -> SockAddr -> Either String NodeInfo nodeInfo nid saddr | Just ip <- fromSockAddr saddr , Just port <- sockAddrPort saddr = Right $ NodeInfo nid ip port | otherwise = Left "Address family not supported." -- | Types of RPC errors. data ErrorCode -- | Some error doesn't fit in any other category. = GenericError -- | Occurs when server fail to process procedure call. | ServerError -- | Malformed packet, invalid arguments or bad token. | ProtocolError -- | Occurs when client trying to call method server don't know. | MethodUnknown deriving (Show, Read, Eq, Ord, Bounded, Typeable, Data) -- | According to the table: -- instance Enum ErrorCode where fromEnum GenericError = 201 fromEnum ServerError = 202 fromEnum ProtocolError = 203 fromEnum MethodUnknown = 204 {-# INLINE fromEnum #-} toEnum 201 = GenericError toEnum 202 = ServerError toEnum 203 = ProtocolError toEnum 204 = MethodUnknown toEnum _ = GenericError {-# INLINE toEnum #-} instance BEncode ErrorCode where toBEncode = toBEncode . fromEnum {-# INLINE toBEncode #-} fromBEncode b = toEnum <$> fromBEncode b {-# INLINE fromBEncode #-} data Error = Error { errorCode :: !ErrorCode -- ^ The type of error. , errorMessage :: !ByteString -- ^ Human-readable text message. } deriving ( Show, Eq, Ord, Typeable, Data, Read ) newtype TransactionId = TransactionId ByteString deriving (Eq, Ord, Show, BEncode) newtype Method = Method ByteString deriving (Eq, Ord, Show, BEncode) data Message a = Q { msgOrigin :: NodeId , msgID :: TransactionId , qryPayload :: a , qryMethod :: Method , qryReadOnly :: Bool } | R { msgOrigin :: NodeId , msgID :: TransactionId , rspPayload :: Either Error a , rspReflectedIP :: Maybe SockAddr } showBE bval = L8.unpack (showBEncode bval) instance BE.BEncode (Message BValue) where toBEncode m = encodeMessage m {- in case m of Q {} -> trace ("encoded(query): "++showBE r) r R {} -> trace ("encoded(response): "++showBE r) r -} fromBEncode bval = decodeMessage bval {- in case r of Left e -> trace (show e) r Right (Q {}) -> trace ("decoded(query): "++showBE bval) r Right (R {}) -> trace ("decoded(response): "++showBE bval) r -} decodeMessage :: BValue -> Either String (Message BValue) decodeMessage = fromDict $ do key <- lookAhead (field (req "y")) let _ = key :: BKey f <- case key of "q" -> do a <- field (req "a") g <- either fail return $ flip fromDict a $ do who <- field (req "id") ro <- fromMaybe False <$> optional (field (req "ro")) return $ \meth tid -> Q who tid a meth ro meth <- field (req "q") return $ g meth "r" -> do ip <- do ipstr <- optional (field (req "ip")) mapM (either fail return . decodeAddr) ipstr vals <- field (req "r") either fail return $ flip fromDict vals $ do who <- field (req "id") return $ \tid -> R who tid (Right vals) ip "e" -> do (ecode,emsg) <- field (req "e") ip <- do ipstr <- optional (field (req "ip")) mapM (either fail return . decodeAddr) ipstr -- FIXME:Spec does not give us the NodeId of the sender. -- Using 'zeroID' as place holder. -- We should ignore the msgOrigin for errors in 'updateRouting'. -- We should consider making msgOrigin a Maybe value. return $ \tid -> R zeroID tid (Left (Error ecode emsg)) ip _ -> fail $ "Mainline message is not a query, response, or an error: " ++ show key tid <- field (req "t") return $ f (tid :: TransactionId) encodeMessage :: Message BValue -> BValue encodeMessage (Q origin tid a meth ro) = case a of BDict args -> encodeQuery tid meth (BDict $ genericArgs origin ro `BE.union` args) _ -> encodeQuery tid meth a -- XXX: Not really a valid query. encodeMessage (R origin tid v ip) = case v of Right (BDict vals) -> encodeResponse tid (BDict $ genericArgs origin False `BE.union` vals) ip Left err -> encodeError tid err encodeAddr :: SockAddr -> ByteString encodeAddr (SockAddrInet port addr) = S.runPut (S.putWord32host addr >> S.putWord16be (fromIntegral port)) encodeAddr saddr@(SockAddrInet6 port _ addr _) | Just ip4 <- (fromSockAddr saddr >>= un4map) = encodeAddr (setPort port $ toSockAddr ip4) | otherwise = S.runPut (S.put addr >> S.putWord16be (fromIntegral port)) encodeAddr _ = B.empty decodeAddr :: ByteString -> Either String SockAddr decodeAddr bs = S.runGet g bs where g | (B.length bs == 6) = flip SockAddrInet <$> S.getWord32host <*> (fromIntegral <$> S.getWord16be) | otherwise = do host <- S.get -- TODO: Is this right? port <- fromIntegral <$> S.getWord16be return $ SockAddrInet6 port 0 host 0 genericArgs :: BEncode a => a -> Bool -> BDict genericArgs nodeid ro = "id" .=! nodeid .: "ro" .=? bool Nothing (Just (1 :: Int)) ro .: endDict encodeError :: BEncode a => a -> Error -> BValue encodeError tid (Error ecode emsg) = encodeAny tid "e" (ecode,emsg) id encodeResponse :: (BEncode tid, BEncode vals) => tid -> vals -> Maybe SockAddr -> BValue encodeResponse tid rvals rip = encodeAny tid "r" rvals ("ip" .=? (BString . encodeAddr <$> rip) .:) encodeQuery :: (BEncode args, BEncode tid, BEncode method) => tid -> method -> args -> BValue encodeQuery tid qmeth qargs = encodeAny tid "q" qmeth ("a" .=! qargs .:) encodeAny :: (BEncode tid, BEncode a) => tid -> BKey -> a -> (BDict -> BDict) -> BValue encodeAny tid key val aux = toDict $ aux $ key .=! val .: "t" .=! tid .: "y" .=! key .: endDict parsePacket :: ByteString -> SockAddr -> Either String (Message BValue, NodeInfo) parsePacket bs addr = do pkt <- BE.decode bs -- TODO: Error packets do not inclucde a valid msgOrigin. -- The BE.decode method is using 'zeroID' as a placeholder. ni <- nodeInfo (msgOrigin pkt) addr return (pkt, ni) encodePacket :: Message BValue -> NodeInfo -> (ByteString, SockAddr) encodePacket msg ni = ( toStrict $ BE.encode msg , nodeAddr ni ) classify :: Message BValue -> MessageClass String Method TransactionId classify (Q { msgID = tid, qryMethod = meth }) = IsQuery meth tid classify (R { msgID = tid }) = IsResponse tid encodeResponsePayload :: BEncode a => TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue encodeResponsePayload tid self dest b = R (nodeId self) tid (Right $ BE.toBEncode b) (Just $ nodeAddr dest) encodeQueryPayload :: BEncode a => Method -> Bool -> TransactionId -> NodeInfo -> NodeInfo -> a -> Message BValue encodeQueryPayload meth isReadonly tid self dest b = Q (nodeId self) tid (BE.toBEncode b) meth isReadonly errorPayload :: TransactionId -> NodeInfo -> NodeInfo -> Error -> Message a errorPayload tid self dest e = R (nodeId self) tid (Left e) (Just $ nodeAddr dest) decodePayload :: BEncode a => Message BValue -> Either String a decodePayload msg = BE.fromBEncode $ qryPayload msg type Handler = MethodHandler String TransactionId NodeInfo (Message BValue) handler :: ( BEncode a , BEncode b ) => (NodeInfo -> a -> IO b) -> Maybe Handler handler f = Just $ MethodHandler decodePayload encodeResponsePayload f handlerE :: ( BEncode a , BEncode b ) => (NodeInfo -> a -> IO (Either Error b)) -> Maybe Handler handlerE f = Just $ MethodHandler decodePayload enc f where enc tid self dest (Left e) = errorPayload tid self dest e enc tid self dest (Right b) = encodeResponsePayload tid self dest b type AnnounceSet = Set (InfoHash, PortNumber) data SwarmsDatabase = SwarmsDatabase { contactInfo :: !( TVar PeerStore ) -- ^ Published by other nodes. , sessionTokens :: !( TVar SessionTokens ) -- ^ Query session IDs. , announceInfo :: !( TVar AnnounceSet ) -- ^ To publish by this node. } newSwarmsDatabase :: IO SwarmsDatabase newSwarmsDatabase = do toks <- nullSessionTokens atomically $ SwarmsDatabase <$> newTVar def <*> newTVar toks <*> newTVar def type RoutingInfo = Info NodeInfo NodeId data Routing = Routing { tentativeId :: NodeInfo , sched4 :: !( TVar (Int.PSQ POSIXTime) ) , routing4 :: !( TVar (R.BucketList NodeInfo) ) , committee4 :: TriadCommittee NodeId SockAddr , sched6 :: !( TVar (Int.PSQ POSIXTime) ) , routing6 :: !( TVar (R.BucketList NodeInfo) ) , committee6 :: TriadCommittee NodeId SockAddr } traced :: Show tid => TableMethods t tid -> TableMethods t tid traced (TableMethods ins del lkup) = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) (\tid t -> trace ("del "++show tid) $ del tid t) (\tid t -> trace ("lookup "++show tid) $ lkup tid t) type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) newClient :: SockAddr -> IO (MainlineClient, Routing) newClient addr = do udp <- udpTransport addr nid <- NodeId <$> getRandomBytes 20 let tenative_info = NodeInfo { nodeId = nid , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr , nodePort = fromMaybe 0 $ sockAddrPort addr } addr4 <- atomically $ newTChan addr6 <- atomically $ newTChan fork $ fix $ \again -> do myThreadId >>= flip labelThread "addr6" addr <- atomically $ readTChan addr6 hPutStrLn stderr $ "External IPv6: "++show addr again routing <- atomically $ do let nobkts = R.defaultBucketCount :: Int tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tenative_info nobkts committee4 <- newTriadCommittee $ \a -> do t4 <- readTVar tbl4 case bep42 a (nodeId $ R.thisNode t4) of Just nid -> do let tbl = R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) (NodeInfo nid (fromMaybe (toEnum 0) $ fromSockAddr a) (fromMaybe 0 $ sockAddrPort a)) nobkts writeTVar tbl4 tbl writeTChan addr4 (a,map fst $ concat $ R.toList t4) Nothing -> return () committee6 <- newTriadCommittee (writeTChan addr6) -- TODO: update tbl6 sched4 <- newTVar Int.empty sched6 <- newTVar Int.empty return $ Routing tenative_info sched4 tbl4 committee4 sched6 tbl6 committee6 swarms <- newSwarmsDatabase map_var <- atomically $ newTVar (0, mempty) let net = onInbound (updateRouting outgoingClient routing) $ layerTransport parsePacket encodePacket $ udp -- Paranoid: It's safe to define /net/ and /client/ to be mutually -- recursive since 'updateRouting' does not invoke 'awaitMessage' which -- which was modified by 'onInbound'. However, I'm going to avoid the -- mutual reference just to be safe. outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } dispatch = DispatchMethods { classifyInbound = classify -- :: x -> MessageClass err meth tid , lookupHandler = handlers -- :: meth -> Maybe (MethodHandler err tid addr x) , tableMethods = mapT -- :: TransactionMethods tbl tid x } handlers :: Method -> Maybe Handler handlers ( Method "ping" ) = handler pingH handlers ( Method "find_node" ) = handler $ findNodeH routing handlers ( Method "get_peers" ) = handler $ getPeersH routing swarms handlers ( Method "announce_peer" ) = handlerE $ announceH swarms handlers ( Method meth ) = Just $ defaultHandler meth mapT = transactionMethods mapMethods gen gen :: Word16 -> (TransactionId, Word16) gen cnt = (TransactionId $ S.encode cnt, cnt+1) client = Client { clientNet = net , clientDispatcher = dispatch , clientErrorReporter = printErrors stderr , clientPending = map_var , clientAddress = \maddr -> atomically $ do let var = case flip prefer4or6 Nothing <$> maddr of Just Want_IP6 -> routing6 routing _ -> routing4 routing R.selfNode <$> readTVar var , clientResponseId = return } fork $ fix $ \again -> do myThreadId >>= flip labelThread "addr4" (addr, ns) <- atomically $ readTChan addr4 hPutStrLn stderr $ "External IPv4: "++show (addr, length ns) forM_ ns $ \n -> do hPutStrLn stderr $ "Change IP, ping: "++show n ping outgoingClient n again -- TODO: Provide some means of shutting down these two auxillary threads: refresh_thread4 <- forkPollForRefresh (15*60) (sched4 routing) (refreshBucket (nodeSearch client) (routing4 routing) (nodeId tenative_info)) refresh_thread6 <- forkPollForRefresh (15*60) (sched6 routing) (refreshBucket (nodeSearch client) (routing6 routing) (nodeId tenative_info)) return (client, routing) -- | Modifies a purely random 'NodeId' to one that is related to a given -- routable address in accordance with BEP 42. -- -- Test vectors from the spec: -- -- IP rand example node ID -- ============ ===== ========================================== -- 124.31.75.21 1 5fbfbf f10c5d6a4ec8a88e4c6ab4c28b95eee4 01 -- 21.75.31.124 86 5a3ce9 c14e7a08645677bbd1cfe7d8f956d532 56 -- 65.23.51.170 22 a5d432 20bc8f112a3d426c84764f8c2a1150e6 16 -- 84.124.73.14 65 1b0321 dd1bb1fe518101ceef99462b947a01ff 41 -- 43.213.53.83 90 e56f6c bf5b7c4be0237986d5243b87aa6d5130 5a bep42 :: SockAddr -> NodeId -> Maybe NodeId bep42 addr (NodeId r) | Just ip <- fmap S.encode (fromSockAddr addr :: Maybe IPv4) <|> fmap S.encode (fromSockAddr addr :: Maybe IPv6) = genBucketSample' retr (NodeId $ crc $ applyMask ip) (3,0x07,0) | otherwise = Nothing where ip4mask = "\x03\x0f\x3f\xff" :: ByteString ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString nbhood_select = B.last r .&. 7 retr n = pure $ B.drop (B.length r - n) r crc = S.encode . crc32c . B.pack applyMask ip = case B.zipWith (.&.) msk ip of (b:bs) -> (b .|. shiftL nbhood_select 5) : bs bs -> bs where msk | B.length ip == 4 = ip4mask | otherwise = ip6mask defaultHandler :: ByteString -> Handler defaultHandler meth = MethodHandler decodePayload errorPayload returnError where returnError :: NodeInfo -> BValue -> IO Error returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo mainlineKademlia client committee var sched = Kademlia quietInsertions mainlineSpace (vanillaIO var $ ping client) { tblTransition = \tr -> do io1 <- transitionCommittee committee tr io2 <- touchBucket mainlineSpace (15*60) var sched tr return $ do io1 >> io2 hPutStrLn stderr $ unwords [ "Buckets: " , show (transitionedTo tr) , show (transitioningNode tr) ] } mainlineSpace :: R.KademliaSpace NodeId NodeInfo mainlineSpace = R.KademliaSpace { R.kademliaLocation = nodeId , R.kademliaTestBit = testIdBit , R.kademliaXor = xor } transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) transitionCommittee committee (RoutingTransition ni Stranger) = do delVote committee (nodeId ni) return $ do hPutStrLn stderr $ "delVote "++show (nodeId ni) transitionCommittee committee _ = return $ return () updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () updateRouting client routing naddr msg = do case prefer4or6 naddr Nothing of Want_IP4 -> go (routing4 routing) (committee4 routing) (sched4 routing) Want_IP6 -> go (routing6 routing) (committee6 routing) (sched6 routing) where go tbl committee sched = do case msg of R { rspReflectedIP = Just sockaddr } -> do -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) atomically $ addVote committee (nodeId naddr) sockaddr _ -> return () insertNode (mainlineKademlia client committee tbl sched) naddr data Ping = Ping deriving Show -- Pong is the same as Ping. type Pong = Ping pattern Pong = Ping instance BEncode Ping where toBEncode Ping = toDict endDict fromBEncode _ = pure Ping data WantIP = Want_IP4 | Want_IP6 | Want_Both deriving (Eq, Enum, Ord, Show) wantList :: WantIP -> [ByteString] wantList Want_IP4 = ["ip4"] wantList Want_IP6 = ["ip6"] wantList Want_Both = ["ip4","ip6"] instance BEncode WantIP where toBEncode w = toBEncode $ wantList w fromBEncode bval = do wants <- fromBEncode bval let _ = wants :: [ByteString] case (elem "ip4" wants, elem "ip6" wants) of (True,True) -> Right Want_Both (True,False) -> Right Want_IP4 (False,True) -> Right Want_IP6 _ -> Left "Unrecognized IP type." data FindNode = FindNode NodeId (Maybe WantIP) instance BEncode FindNode where toBEncode (FindNode nid iptyp) = toDict $ target_key .=! nid .: want_key .=? iptyp .: endDict fromBEncode = fromDict $ FindNode <$>! target_key <*>? want_key data NodeFound = NodeFound { nodes4 :: [NodeInfo] , nodes6 :: [NodeInfo] } instance BEncode NodeFound where toBEncode (NodeFound ns ns6) = toDict $ nodes_key .=? (if Prelude.null ns then Nothing else Just (S.runPut (mapM_ putNodeInfo4 ns))) .: nodes6_key .=? (if Prelude.null ns6 then Nothing else Just (S.runPut (mapM_ putNodeInfo6 ns6))) .: endDict fromBEncode bval = NodeFound <$> ns4 <*> ns6 where opt ns = fromMaybe [] <$> optional ns ns4 = opt $ fromDict (binary getNodeInfo4 nodes_key) bval ns6 = opt $ fromDict (binary getNodeInfo6 nodes6_key) bval binary :: S.Get a -> BKey -> BE.Get [a] binary get k = field (req k) >>= either (fail . format) return . S.runGet (many get) where format str = "fail to deserialize " ++ show k ++ " field: " ++ str pingH :: NodeInfo -> Ping -> IO Pong pingH _ Ping = return Pong -- | True if the argument is an IPv4-mapped address with prefix ::FFFF:0:0/96 -- as defined in RFC 4291. is4mapped :: IPv6 -> Bool is4mapped ip | [0,0,0,0,0,0xffff,_,_] <- fromIPv6 ip = True | otherwise = False un4map :: IPv6 -> Maybe IPv4 un4map ip | [0,0,0,0,0,0xffff,x,y] <- fromIPv6 ip = Just $ toIPv4 $ map (.&. 0xFF) [x `shiftR` 8, x, y `shiftR` 8, y ] | otherwise = Nothing prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp ipFamily :: IP -> WantIP ipFamily ip = case ip of IPv4 _ -> Want_IP4 IPv6 a | is4mapped a -> Want_IP4 | otherwise -> Want_IP6 findNodeH :: Routing -> NodeInfo -> FindNode -> IO NodeFound findNodeH routing addr (FindNode node iptyp) = do let preferred = prefer4or6 addr iptyp ks <- bool (return []) (go $ routing4 routing) (preferred /= Want_IP6) ks6 <- bool (return []) (go $ routing6 routing) (preferred /= Want_IP4) return $ NodeFound ks ks6 where go var = R.kclosest mainlineSpace k node <$> atomically (readTVar var) k = R.defaultK data GetPeers = GetPeers InfoHash (Maybe WantIP) instance BEncode GetPeers where toBEncode (GetPeers ih iptyp) = toDict $ info_hash_key .=! ih .: want_key .=? iptyp .: endDict fromBEncode = fromDict $ GetPeers <$>! info_hash_key <*>? want_key data GotPeers = GotPeers { -- | If the queried node has no peers for the infohash, returned -- the K nodes in the queried nodes routing table closest to the -- infohash supplied in the query. peers :: [PeerAddr] , nodes :: NodeFound -- | The token value is a required argument for a future -- announce_peer query. , grantedToken :: Token } -- deriving (Show, Eq, Typeable) nodeIsIPv6 :: NodeInfo -> Bool nodeIsIPv6 (NodeInfo _ (IPv6 _) _) = True nodeIsIPv6 _ = False instance BEncode GotPeers where toBEncode GotPeers { nodes = NodeFound ns4 ns6, ..} = toDict $ nodes_key .=? (if null ns4 then Nothing else Just $ S.runPut (mapM_ putNodeInfo4 ns4)) .: nodes6_key .=? (if null ns6 then Nothing else Just $ S.runPut (mapM_ putNodeInfo4 ns6)) .: token_key .=! grantedToken .: peers_key .=! map S.encode peers .: endDict fromBEncode = fromDict $ do ns4 <- fromMaybe [] <$> optional (binary getNodeInfo4 nodes_key) -- "nodes" ns6 <- fromMaybe [] <$> optional (binary getNodeInfo6 nodes6_key) -- "nodes6" -- TODO: BEP 42... -- -- Once enforced, responses to get_peers requests whose node ID does not -- match its external IP should be considered to not contain a token and -- thus not be eligible as storage target. Implementations should take -- care that they find the closest set of nodes which return a token and -- whose IDs matches their IPs before sending a store request to those -- nodes. -- -- Sounds like something to take care of at peer-search time, so I'll -- ignore it for now. tok <- field (req token_key) -- "token" ps <- fromMaybe [] <$> optional (field (req peers_key) >>= decodePeers) -- "values" pure $ GotPeers ps (NodeFound ns4 ns6) tok where decodePeers = either fail pure . mapM S.decode getPeersH :: Routing -> SwarmsDatabase -> NodeInfo -> GetPeers -> IO GotPeers getPeersH routing (SwarmsDatabase peers toks _) naddr (GetPeers ih iptyp) = do ps <- do tm <- getTimestamp atomically $ do (ps,store') <- Peers.freshPeers ih tm <$> readTVar peers writeTVar peers store' return ps -- Filter peer results to only a single address family, IPv4 or IPv6, as -- per BEP 32. let notboth = iptyp >>= \case Want_Both -> Nothing specific -> Just specific selected = prefer4or6 naddr notboth ps' = filter ( (== selected) . ipFamily . peerHost ) ps tok <- grantToken toks naddr ns <- findNodeH routing naddr (FindNode (coerce ih) iptyp) return $ GotPeers ps' ns tok -- | Announce that the peer, controlling the querying node, is -- downloading a torrent on a port. data Announce = Announce { -- | If set, the 'port' field should be ignored and the source -- port of the UDP packet should be used as the peer's port -- instead. This is useful for peers behind a NAT that may not -- know their external port, and supporting uTP, they accept -- incoming connections on the same port as the DHT port. impliedPort :: Bool -- | infohash of the torrent; , topic :: InfoHash -- | some clients announce the friendly name of the torrent here. , announcedName :: Maybe ByteString -- | the port /this/ peer is listening; , port :: PortNumber -- TODO: optional boolean "seed" key -- | received in response to a previous get_peers query. , sessionToken :: Token } deriving (Show, Eq, Typeable) peer_ip_key = "ip" peer_id_key = "peer id" peer_port_key = "port" msg_type_key = "msg_type" piece_key = "piece" total_size_key = "total_size" node_id_key :: BKey node_id_key = "id" read_only_key :: BKey read_only_key = "ro" want_key :: BKey want_key = "want" target_key :: BKey target_key = "target" nodes_key :: BKey nodes_key = "nodes" nodes6_key :: BKey nodes6_key = "nodes6" info_hash_key :: BKey info_hash_key = "info_hash" peers_key :: BKey peers_key = "values" token_key :: BKey token_key = "token" name_key :: BKey name_key = "name" port_key :: BKey port_key = "port" implied_port_key :: BKey implied_port_key = "implied_port" instance BEncode Announce where toBEncode Announce {..} = toDict $ implied_port_key .=? flagField impliedPort .: info_hash_key .=! topic .: name_key .=? announcedName .: port_key .=! port .: token_key .=! sessionToken .: endDict where flagField flag = if flag then Just (1 :: Int) else Nothing fromBEncode = fromDict $ do Announce <$> (boolField <$> optional (field (req implied_port_key))) <*>! info_hash_key <*>? name_key <*>! port_key <*>! token_key where boolField = maybe False (/= (0 :: Int)) -- | The queried node must verify that the token was previously sent -- to the same IP address as the querying node. Then the queried node -- should store the IP address of the querying node and the supplied -- port number under the infohash in its store of peer contact -- information. data Announced = Announced deriving (Show, Eq, Typeable) instance BEncode Announced where toBEncode _ = toBEncode Ping fromBEncode _ = pure Announced announceH :: SwarmsDatabase -> NodeInfo -> Announce -> IO (Either Error Announced) announceH (SwarmsDatabase peers toks _) naddr announcement = do checkToken toks naddr (sessionToken announcement) >>= bool (Left <$> return (Error ProtocolError "invalid parameter: token")) (Right <$> go) where go = atomically $ do modifyTVar' peers $ insertPeer (topic announcement) (announcedName announcement) $ PeerAddr { peerId = Nothing -- Avoid storing IPv4-mapped addresses. , peerHost = case nodeIP naddr of IPv6 ip6 | Just ip4 <- un4map ip6 -> IPv4 ip4 a -> a , peerPort = if impliedPort announcement then nodePort naddr else port announcement } return Announced isReadonlyClient client = False -- TODO ping :: MainlineClient -> NodeInfo -> IO Bool ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr where serializer = MethodSerializer { methodTimeout = 5 , method = Method "ping" , wrapQuery = encodeQueryPayload (Method "ping") (isReadonlyClient client) , unwrapResponse = const True } -- searchQuery :: ni -> IO ([ni], [r]) getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO ([NodeInfo],[NodeInfo]) getNodes client nid addr = fromMaybe ([],[]) <$> sendQuery client serializer (FindNode nid (Just Want_Both)) addr where serializer = MethodSerializer { methodTimeout = 5 , method = Method "find_node" , wrapQuery = encodeQueryPayload (Method "find_node") (isReadonlyClient client) , unwrapResponse = \case R { rspPayload = Right bval } | Right (NodeFound ns4 ns6) <- BE.fromBEncode bval -> (ns4++ns6, ns4++ns6) _ -> ([],[]) } data TriadSlot = SlotA | SlotB | SlotC deriving (Eq,Ord,Enum,Show,Read) data TriadCommittee voter a = TriadCommittee { triadDecider :: TVar TriadSlot , triadA :: TVar (Maybe (voter,a)) , triadB :: TVar (Maybe (voter,a)) , triadC :: TVar (Maybe (voter,a)) , triadNewDecision :: a -> STM () } triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a)) triadSlot SlotA = triadA triadSlot SlotB = triadB triadSlot SlotC = triadC triadDecision :: a -> TriadCommittee voter a -> STM a triadDecision fallback triad = do slot <- readTVar (triadDecider triad) maybe fallback snd <$> readTVar (triadSlot slot triad) newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a) newTriadCommittee onChange = TriadCommittee <$> newTVar SlotA <*> newTVar Nothing <*> newTVar Nothing <*> newTVar Nothing <*> pure onChange triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM () triadCountVotes prior triad = do a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) let (slot,vote) = case catMaybes [a,b,c] of [ (x,xvote) , (y,yvote) , (z,zvote) ] -> if xvote == yvote then (x,Just xvote) else (z,Just zvote) [] -> (SlotA,Nothing) ((slot,vote):_) -> (slot, Just vote) writeTVar (triadDecider triad) slot case vote of Just v | vote /= prior -> triadNewDecision triad v _ -> return () addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM () addVote triad voter vote = do a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) let avail (_,Nothing) = True avail (_,Just x ) = (x == voter) slots = filter avail [a,b,c] forM_ (take 1 slots) $ \(slot,_) -> do prior <- do slotp <- readTVar (triadDecider triad) fmap snd <$> readTVar (triadSlot slotp triad) writeTVar (triadSlot slot triad) (Just (voter,vote)) triadCountVotes prior triad delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () delVote triad voter = do a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) let match (_,Just x ) = (x == voter) slots = filter match [a,b,c] forM_ (take 1 slots) $ \(slot,_) -> do prior <- do slotp <- readTVar (triadDecider triad) fmap snd <$> readTVar (triadSlot slotp triad) writeTVar (triadSlot slot triad) Nothing triadCountVotes prior triad nodeSearch client = Search { searchSpace = mainlineSpace , searchNodeAddress = nodeIP &&& nodePort , searchQuery = \nid ni -> do hPutStrLn stderr $ "findNodes "++show nid++" --> "++show ni handle (\(SomeException e) -> do hPutStrLn stderr $ "got "++show e -- threadDelay 1000000 return ([],[])) $ do (xs,y) <- getNodes client nid ni forM_ xs $ \x -> do hPutStrLn stderr $ "got "++show x -- threadDelay 1000000 return (xs,y) } -- | List of bootstrap nodes maintained by different bittorrent -- software authors. bootstrapNodes :: WantIP -> IO [NodeInfo] bootstrapNodes want = unsafeInterleaveIO $ do let wellknowns = [ "router.bittorrent.com:6881" -- by BitTorrent Inc. -- doesn't work at the moment (use git blame) of commit , "dht.transmissionbt.com:6881" -- by Transmission project , "router.utorrent.com:6881" ] nss <- forM wellknowns $ \hostAndPort -> do e <- resolve want hostAndPort case e of Left _ -> return [] Right sockaddr -> either (const $ return []) (return . (: [])) $ nodeInfo zeroID sockaddr return $ concat nss -- | Resolve either a numeric network address or a hostname to a -- numeric IP address of the node. resolve :: WantIP -> String -> IO (Either IOError SockAddr) resolve want hostAndPort = do let hints = defaultHints { addrSocketType = Datagram , addrFamily = case want of Want_IP4 -> AF_INET _ -> AF_INET6 } (rport,rhost) = span (/= ':') $ reverse hostAndPort (host,port) = case rhost of [] -> (hostAndPort, Nothing) (_:hs) -> (reverse hs, Just (reverse rport)) tryIOError $ do -- getAddrInfo throws exception on empty list, so this -- pattern matching never fails. info : _ <- getAddrInfo (Just hints) (Just host) port return $ addrAddress info