-- | -- Copyright : (c) Sam Truzjan 2013 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- BitTorrent uses a \"distributed sloppy hash table\" (DHT) for -- storing peer contact information for \"trackerless\" torrents. In -- effect, each peer becomes a tracker. -- -- Normally you don't need to import other DHT modules. -- -- For more info see: -- -- {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE CPP #-} module Network.BitTorrent.DHT ( -- * Distributed Hash Table DHT , Options (..) , fullLogging , dht -- * Bootstrapping -- $bootstrapping-terms , tNodes , defaultBootstrapNodes , resolveHostName , bootstrap , isBootstrapped -- * Initialization , snapshot -- * Operations -- , Network.BitTorrent.DHT.lookup , Network.BitTorrent.DHT.insert , Network.BitTorrent.DHT.delete -- * Embedding -- ** Session , LogFun , Node , defaultHandlers , newNode , closeNode -- ** Monad -- , MonadDHT (..) , runDHT ) where import Control.Monad.Logger import Control.Monad.Reader import Control.Exception import qualified Data.ByteString as BS import Data.Conduit as C import qualified Data.Conduit.List as C import Data.Serialize import Network.Socket import Text.PrettyPrint.HughesPJClass as PP (pPrint,render) import Data.Torrent import Network.Address import Network.BitTorrent.DHT.Query import Network.BitTorrent.DHT.Session import Network.DHT.Routing as T hiding (null) import qualified Data.Text as Text import Data.Typeable import Data.Monoid import Network.DatagramServer.Mainline (KMessageOf) import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) import Network.DatagramServer.Types import Network.DHT.Types import Data.Bits import Data.Default import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) import Network.KRPC.Method import Network.BitTorrent.DHT.Query (DataHandlers) {----------------------------------------------------------------------- -- DHT types -----------------------------------------------------------------------} #if 0 class MonadDHT m where liftDHT :: DHT raw dht u IPv4 a -> m a instance MonadDHT (DHT raw dht u IPv4) where liftDHT = id #endif -- | Convenience method. Pass this to 'dht' to enable full logging. fullLogging :: LogSource -> LogLevel -> Bool fullLogging _ _ = True -- | Run DHT on specified port. dht :: ( Ord ip , Address ip , Functor dht , Ord (NodeId dht) , FiniteBits (NodeId dht) , Serialize (NodeId dht) , Show (NodeId dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , Eq (QueryMethod dht) , Show (QueryMethod dht) , Pretty (NodeInfo dht ip u) , Kademlia dht , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , DataHandlers raw dht , WireFormat raw dht , Show u , Default u ) => Options -- ^ normally you need to use 'Data.Default.def'; -> NodeAddr ip -- ^ address to bind this node; -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; -> IO a -- ^ result. dht opts addr logfilter action = do runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do bracket (newNode opts addr logger Nothing) closeNode $ \ node -> runDHT node $ do hs <- defaultHandlers logger m <- asks manager liftIO $ KRPC.listen m hs (KRPC.Protocol Proxy Proxy) action {-# INLINE dht #-} {----------------------------------------------------------------------- -- Bootstrapping -----------------------------------------------------------------------} -- $bootstrapping-terms -- -- [@Bootstrapping@] DHT @bootstrapping@ is the process of filling -- routing 'Table' by /good/ nodes. -- -- [@Bootstrapping time@] Bootstrapping process can take up to 5 -- minutes. Bootstrapping should only happen at first application -- startup, if possible you should use 'snapshot' & 'restore' -- mechanism which must work faster. -- -- [@Bootstrap nodes@] DHT @bootstrap node@ is either: -- -- * a specialized high performance node maintained by bittorrent -- software authors\/maintainers, like those listed in -- 'defaultBootstrapNodes'. /Specialized/ means that those nodes -- may not support 'insert' queries and is running for the sake of -- bootstrapping only. -- -- * an ordinary bittorrent client running DHT node. The list of -- such bootstrapping nodes usually obtained from -- 'Data.Torrent.tNodes' field or -- 'Network.BitTorrent.Exchange.Message.Port' messages. -- Do not include the following hosts in the default bootstrap nodes list: -- -- * "dht.aelitis.com" and "dht6.azureusplatform.com" - since -- Azureus client have a different (and probably incompatible) DHT -- protocol implementation. -- -- * "router.utorrent.com" since it is just an alias to -- "router.bittorrent.com". -- XXX: ignoring this advise as it resolves to a different -- ip address for me. -- | List of bootstrap nodes maintained by different bittorrent -- software authors. defaultBootstrapNodes :: [NodeAddr HostName] defaultBootstrapNodes = [ NodeAddr "router.bittorrent.com" 6881 -- by BitTorrent Inc. -- doesn't work at the moment (use git blame) of commit , NodeAddr "dht.transmissionbt.com" 6881 -- by Transmission project , NodeAddr "router.utorrent.com" 6881 ] -- TODO Multihomed hosts -- | Resolve either a numeric network address or a hostname to a -- numeric IP address of the node. Usually used to resolve -- 'defaultBootstrapNodes' or 'Data.Torrent.tNodes' lists. resolveHostName :: NodeAddr HostName -> IO (NodeAddr IPv4) resolveHostName NodeAddr {..} = do let hints = defaultHints { addrFamily = AF_INET, addrSocketType = Datagram } -- getAddrInfo throws exception on empty list, so the pattern matching never fail info : _ <- getAddrInfo (Just hints) (Just nodeHost) (Just (show nodePort)) case fromSockAddr (addrAddress info) of Nothing -> error "resolveNodeAddr: impossible" Just addr -> return addr -- | One good node may be sufficient. -- -- This operation do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. bootstrap :: forall raw dht u ip. ( Ord ip , Address ip , Functor dht , Ord (NodeId dht) , FiniteBits (NodeId dht) , Serialize (NodeId dht) , Show (NodeId dht) , Pretty (NodeId dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , SerializableTo raw (Response dht (NodeFound dht ip)) , SerializableTo raw (Query dht (FindNode dht ip)) , Ord (TransactionID dht) , Serialize (TransactionID dht) , Eq (QueryMethod dht) , Show (QueryMethod dht) , Pretty (NodeInfo dht ip u) , Kademlia dht , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) , DataHandlers raw dht , WireFormat raw dht , Show u , Default u , Serialize u ) => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip () bootstrap mbs startNodes = do restored <- case decode <$> mbs of Just (Right tbl) -> return (T.toList tbl) Just (Left e) -> do $(logWarnS) "restore" (Text.pack e) return [] Nothing -> return [] $(logInfoS) "bootstrap" "Start node bootstrapping" let searchAll aliveNodes = do nid <- myNodeIdAccordingTo (error "FIXME") ns <- bgsearch ioFindNodes nid return ( ns :: [NodeInfo dht ip u] ) input_nodes <- (restored ++) . T.toList <$> getTable -- Step 1: Use iterative searches to flesh out the table.. do let knowns = map (map $ nodeAddr . fst) input_nodes -- Below, we reverse the nodes since the table serialization puts the -- nearest nodes last and we want to choose a similar node id to bootstrap -- faster. (alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns)) b <- isBootstrapped -- If our cached nodes are alive and our IP address did not change, it's possible -- we are already bootsrapped, so no need to do any searches. when (not b) $ do ns <- searchAll $ take 2 alive_knowns -- We only use the supplied bootstrap nodes when we don't know of any -- others to try. when (null ns) $ do -- TODO filter duplicated in startNodes list -- TODO retransmissions for startNodes (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) _ <- searchAll $ take 2 aliveNodes return () -- Step 2: Repeatedly refresh incomplete buckets until the table is full. maxbuckets <- asks $ optBucketCount . options flip fix 0 $ \loop icnt -> do tbl <- getTable let unfull = filter ((/=defaultBucketSize) . snd) us = zip -- is_last = True for the last bucket (True:repeat False) -- Only non-full buckets unless it is the last one and the -- maximum number of buckets has not been reached. $ case reverse $ zip [0..] $ T.shape tbl of p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps) p:ps -> p:unfull ps [] -> [] forM_ us $ \(is_last,(index,_)) -> do nid <- myNodeIdAccordingTo (error "FIXME") sample <- liftIO $ genBucketSample nid (bucketRange index is_last) $(logDebugS) "bootstrapping" $ "BOOTSTRAP sample" <> Text.pack (show (is_last,index,T.shape tbl)) <> " " <> Text.pack (render $ pPrint sample) refreshNodes sample $(logDebugS) "bootstrapping" $ "BOOTSTRAP finished iteration " <> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize)) when (not (null us) && icnt < div (3*maxbuckets) 2) $ loop (succ icnt) $(logInfoS) "bootstrap" "Node bootstrapping finished" -- | Check if this node is already bootstrapped. -- @bootstrap [good_node] >> isBootstrapped@@ should always return 'True'. -- -- This operation do not block. -- isBootstrapped :: Eq ip => DHT raw dht u ip Bool isBootstrapped = T.full <$> getTable {----------------------------------------------------------------------- -- Initialization -----------------------------------------------------------------------} -- | Serialize current DHT session to byte string. -- -- This is blocking operation, use -- 'Control.Concurrent.Async.Lifted.async' if needed. snapshot :: ( Address ip , Ord (NodeId dht) , Serialize u , Serialize (NodeId dht) ) => DHT raw dht u ip BS.ByteString snapshot = do tbl <- getTable return $ encode tbl {----------------------------------------------------------------------- -- Operations -----------------------------------------------------------------------} #if 0 -- | Get list of peers which downloading this torrent. -- -- This operation is incremental and do block. -- lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip] lookup topic = do -- TODO retry getClosest if bucket is empty closest <- lift $ getClosest topic C.sourceList [closest] $= search topic (getPeersQ topic) #endif -- TODO do not republish if the topic is already in announceSet -- | Announce that /this/ peer may have some pieces of the specified -- torrent. DHT will reannounce this data periodically using -- 'optReannounce' interval. -- -- This operation is synchronous and do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -- insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip () insert ih p = do publish ih p insertTopic ih p -- | Stop announcing /this/ peer for the specified torrent. -- -- This operation is atomic and may block for a while. -- delete :: InfoHash -> PortNumber -> DHT raw dht u ip () delete = deleteTopic {-# INLINE delete #-}