-- | -- Copyright : (c) Sam Truzjan 2013 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- BitTorrent uses a \"distributed sloppy hash table\" (DHT) for -- storing peer contact information for \"trackerless\" torrents. In -- effect, each peer becomes a tracker. -- -- Normally you don't need to import other DHT modules. -- -- For more info see: -- -- {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.BitTorrent.DHT ( -- * Distributed Hash Table DHT , Options (..) , fullLogging , dht -- * Bootstrapping -- $bootstrapping-terms , tNodes , defaultBootstrapNodes , resolveHostName , bootstrap , isBootstrapped -- * Initialization , snapshot -- * Operations , Network.BitTorrent.DHT.lookup , Network.BitTorrent.DHT.insert , Network.BitTorrent.DHT.delete -- * Embedding -- ** Session , LogFun , Node , defaultHandlers , newNode , closeNode -- ** Monad , MonadDHT (..) , runDHT ) where import Control.Monad.Logger import Control.Monad.Reader import Control.Exception import qualified Data.ByteString as BS import Data.Conduit as C import qualified Data.Conduit.List as C import Data.Serialize import Network.Socket import Text.PrettyPrint.HughesPJClass as PP (pPrint,render) import Data.Torrent import Network.Address import Network.BitTorrent.DHT.Query import Network.BitTorrent.DHT.Session import Network.DHT.Routing as T hiding (null) import qualified Data.Text as Text import Data.Typeable import Data.Monoid import Network.DatagramServer.Mainline (KMessageOf) import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) {----------------------------------------------------------------------- -- DHT types -----------------------------------------------------------------------} class MonadDHT m where liftDHT :: DHT IPv4 a -> m a instance MonadDHT (DHT IPv4) where liftDHT = id -- | Convenience method. Pass this to 'dht' to enable full logging. fullLogging :: LogSource -> LogLevel -> Bool fullLogging _ _ = True -- | Run DHT on specified port. dht :: (Ord ip, Address ip) => Options -- ^ normally you need to use 'Data.Default.def'; -> NodeAddr ip -- ^ address to bind this node; -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; -> IO a -- ^ result. dht opts addr logfilter action = do runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do bracket (newNode opts addr logger Nothing) closeNode $ \ node -> runDHT node $ do hs <- defaultHandlers logger m <- asks manager liftIO $ KRPC.listen m hs (KRPC.Protocol Proxy Proxy) action {-# INLINE dht #-} {----------------------------------------------------------------------- -- Bootstrapping -----------------------------------------------------------------------} -- $bootstrapping-terms -- -- [@Bootstrapping@] DHT @bootstrapping@ is the process of filling -- routing 'Table' by /good/ nodes. -- -- [@Bootstrapping time@] Bootstrapping process can take up to 5 -- minutes. Bootstrapping should only happen at first application -- startup, if possible you should use 'snapshot' & 'restore' -- mechanism which must work faster. -- -- [@Bootstrap nodes@] DHT @bootstrap node@ is either: -- -- * a specialized high performance node maintained by bittorrent -- software authors\/maintainers, like those listed in -- 'defaultBootstrapNodes'. /Specialized/ means that those nodes -- may not support 'insert' queries and is running for the sake of -- bootstrapping only. -- -- * an ordinary bittorrent client running DHT node. The list of -- such bootstrapping nodes usually obtained from -- 'Data.Torrent.tNodes' field or -- 'Network.BitTorrent.Exchange.Message.Port' messages. -- Do not include the following hosts in the default bootstrap nodes list: -- -- * "dht.aelitis.com" and "dht6.azureusplatform.com" - since -- Azureus client have a different (and probably incompatible) DHT -- protocol implementation. -- -- * "router.utorrent.com" since it is just an alias to -- "router.bittorrent.com". -- XXX: ignoring this advise as it resolves to a different -- ip address for me. -- | List of bootstrap nodes maintained by different bittorrent -- software authors. defaultBootstrapNodes :: [NodeAddr HostName] defaultBootstrapNodes = [ NodeAddr "router.bittorrent.com" 6881 -- by BitTorrent Inc. -- doesn't work at the moment (use git blame) of commit , NodeAddr "dht.transmissionbt.com" 6881 -- by Transmission project , NodeAddr "router.utorrent.com" 6881 ] -- TODO Multihomed hosts -- | Resolve either a numeric network address or a hostname to a -- numeric IP address of the node. Usually used to resolve -- 'defaultBootstrapNodes' or 'Data.Torrent.tNodes' lists. resolveHostName :: NodeAddr HostName -> IO (NodeAddr IPv4) resolveHostName NodeAddr {..} = do let hints = defaultHints { addrFamily = AF_INET, addrSocketType = Datagram } -- getAddrInfo throws exception on empty list, so the pattern matching never fail info : _ <- getAddrInfo (Just hints) (Just nodeHost) (Just (show nodePort)) case fromSockAddr (addrAddress info) of Nothing -> error "resolveNodeAddr: impossible" Just addr -> return addr -- | One good node may be sufficient. -- -- This operation do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. bootstrap :: forall ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () bootstrap mbs startNodes = do restored <- case decode <$> mbs of Just (Right tbl) -> return (T.toList tbl) Just (Left e) -> do $(logWarnS) "restore" (Text.pack e) return [] Nothing -> return [] $(logInfoS) "bootstrap" "Start node bootstrapping" let searchAll aliveNodes = do nid <- myNodeIdAccordingTo (error "FIXME") nss <- C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume return ( nss :: [[NodeInfo KMessageOf ip ()]] ) input_nodes <- (restored ++) . T.toList <$> getTable -- Step 1: Use iterative searches to flesh out the table.. do let knowns = map (map $ nodeAddr . fst) input_nodes -- Below, we reverse the nodes since the table serialization puts the -- nearest nodes last and we want to choose a similar node id to bootstrap -- faster. (alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns)) b <- isBootstrapped -- If our cached nodes are alive and our IP address did not change, it's possible -- we are already bootsrapped, so no need to do any searches. when (not b) $ do nss <- searchAll $ take 2 alive_knowns -- We only use the supplied bootstrap nodes when we don't know of any -- others to try. when (null nss) $ do -- TODO filter duplicated in startNodes list -- TODO retransmissions for startNodes (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) _ <- searchAll $ take 2 aliveNodes return () -- Step 2: Repeatedly refresh incomplete buckets until the table is full. maxbuckets <- asks $ optBucketCount . options flip fix 0 $ \loop icnt -> do tbl <- getTable let unfull = filter ((/=defaultBucketSize) . snd) us = zip -- is_last = True for the last bucket (True:repeat False) -- Only non-full buckets unless it is the last one and the -- maximum number of buckets has not been reached. $ case reverse $ zip [0..] $ T.shape tbl of p@(n,_):ps | n+1==maxbuckets -> unfull (p:ps) p:ps -> p:unfull ps [] -> [] forM_ us $ \(is_last,(index,_)) -> do nid <- myNodeIdAccordingTo (error "FIXME") sample <- liftIO $ genBucketSample nid (bucketRange index is_last) $(logDebugS) "bootstrapping" $ "BOOTSTRAP sample" <> Text.pack (show (is_last,index,T.shape tbl)) <> " " <> Text.pack (render $ pPrint sample) refreshNodes sample $(logDebugS) "bootstrapping" $ "BOOTSTRAP finished iteration " <> Text.pack (show (icnt,T.shape tbl,us,defaultBucketSize)) when (not (null us) && icnt < div (3*maxbuckets) 2) $ loop (succ icnt) $(logInfoS) "bootstrap" "Node bootstrapping finished" -- | Check if this node is already bootstrapped. -- @bootstrap [good_node] >> isBootstrapped@@ should always return 'True'. -- -- This operation do not block. -- isBootstrapped :: Eq ip => DHT ip Bool isBootstrapped = T.full <$> getTable {----------------------------------------------------------------------- -- Initialization -----------------------------------------------------------------------} -- | Serialize current DHT session to byte string. -- -- This is blocking operation, use -- 'Control.Concurrent.Async.Lifted.async' if needed. snapshot :: Address ip => DHT ip BS.ByteString snapshot = do tbl <- getTable return $ encode tbl {----------------------------------------------------------------------- -- Operations -----------------------------------------------------------------------} -- | Get list of peers which downloading this torrent. -- -- This operation is incremental and do block. -- lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip] lookup topic = do -- TODO retry getClosest if bucket is empty closest <- lift $ getClosest topic C.sourceList [closest] $= search topic (getPeersQ topic) -- TODO do not republish if the topic is already in announceSet -- | Announce that /this/ peer may have some pieces of the specified -- torrent. DHT will reannounce this data periodically using -- 'optReannounce' interval. -- -- This operation is synchronous and do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -- insert :: Address ip => InfoHash -> PortNumber -> DHT ip () insert ih p = do publish ih p insertTopic ih p -- | Stop announcing /this/ peer for the specified torrent. -- -- This operation is atomic and may block for a while. -- delete :: InfoHash -> PortNumber -> DHT ip () delete = deleteTopic {-# INLINE delete #-}