-- | -- Copyright : (c) Sam Truzjan 2013-2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module defines internal state of a node instance. You can -- have multiple nodes per application but usually you don't have -- to. Normally, you don't need to import this module, use -- "Network.BitTorrent.DHT" instead. -- {-# LANGUAGE CPP #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TemplateHaskell #-} module Network.BitTorrent.DHT.Session ( -- * Options -- | Use @optFooBar def@ to get default 'Alpha' or 'K'. Alpha , K , Options (..) -- * Session , Node , options , tentativeNodeId , myNodeIdAccordingTo , myNodeIdAccordingTo1 , routingInfo , routableAddress , getTimestamp , SessionTokens , sessionTokens , contactInfo , PeerStore , manager -- ** Initialization , LogFun , logt , NodeHandler , newNode , closeNode -- * DHT -- | Use @asks options@ to get options passed to 'startNode' -- or @asks thisNodeId@ to get id of locally running node. , DHT , runDHT -- ** Tokens , grantToken , checkToken -- ** Routing table , getTable , getClosest , getClosest1 #ifdef VERSION_bencoding -- ** Peer storage , insertPeer , getPeerList , getPeerList1 , insertTopic , deleteTopic , getSwarms , savePeerStore , mergeSavedPeers , allPeers #endif -- ** Messaging , queryParallel ) where import Prelude hiding (ioError) import Control.Concurrent.STM #ifdef THREAD_DEBUG import Control.Concurrent.Async.Lifted.Instrument #else import Control.Concurrent.Async.Lifted #endif import Control.Exception.Lifted hiding (Handler) import Control.Monad.Base import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Typeable import Data.String import Data.Bits import Data.ByteString import Data.Conduit.Lazy import Data.Default import Data.Fixed import Data.Hashable import Data.List as L import Data.Maybe import Data.Monoid import Data.Set as S import Data.Time import Network (PortNumber) import System.Random (randomIO) import Data.Time.Clock.POSIX import Data.Text as Text import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Serialize as S import Data.Torrent as Torrent import Network.DatagramServer as KRPC hiding (Options, def) import qualified Network.DatagramServer as KRPC (def) #ifdef VERSION_bencoding import Data.BEncode (BValue) import Network.DatagramServer.Mainline (KMessageOf) #else import Network.DatagramServer.Tox as Tox #endif import Network.Address import Network.BitTorrent.DHT.ContactInfo (PeerStore) import qualified Network.BitTorrent.DHT.ContactInfo as P import Network.DHT.Mainline import Network.DHT.Routing as R import Network.BitTorrent.DHT.Token as T import GHC.Stack as GHC {----------------------------------------------------------------------- -- Options -----------------------------------------------------------------------} -- | Node lookups can proceed asynchronously. type Alpha = Int -- NOTE: libtorrent uses 5, azureus uses 10 -- | The quantity of simultaneous lookups is typically three. defaultAlpha :: Alpha defaultAlpha = 3 -- TODO add replication loop -- TODO do not insert infohash -> peeraddr if infohash is too far from -- this node id {- data Order = NearFirst | FarFirst | Random data Traversal = Greedy -- ^ aggressive short-circuit traversal | Exhaustive -- ^ -} -- | Original Kamelia DHT uses term /publish/ for data replication -- process. BitTorrent DHT uses term /announce/ since the purpose of -- the DHT is peer discovery. Later in documentation, we use terms -- /publish/ and /announce/ interchangible. data Options = Options { -- | The degree of parallelism in 'find_node' queries. More -- parallism lead to faster bootstrapping and lookup operations, -- but also increase resource usage. -- -- Normally this parameter should not exceed 'optK'. optAlpha :: {-# UNPACK #-} !Alpha -- | /K/ parameter - number of nodes to return in 'find_node' -- responses. , optK :: {-# UNPACK #-} !K -- | Number of buckets to maintain. This parameter depends on -- amount of nodes in the DHT network. , optBucketCount :: {-# UNPACK #-} !BucketCount -- | RPC timeout. , optTimeout :: !NominalDiffTime -- | /R/ parameter - how many target nodes the 'announce' query -- should affect. -- -- A large replica set compensates for inconsistent routing and -- reduces the need to frequently republish data for -- persistence. This comes at an increased cost for -- 'Network.BitTorrent.DHT.insert' in terms of time, nodes -- contacted, and storage. , optReplication :: {-# UNPACK #-} !NodeCount -- | How often this node should republish (or reannounce) its -- data. -- -- Large replica set ('optReplication') should require -- smaller reannounce intervals ('optReannounce'). , optReannounce :: !NominalDiffTime -- | The time it takes for data to expire in the -- network. Publisher of the data should republish (or -- reannounce) data to keep it in the network. -- -- The /data expired timeout/ should be more than 'optReannounce' -- interval. , optDataExpired :: !NominalDiffTime } deriving (Show, Eq) -- | Optimal options for bittorrent client. For short-lifetime -- utilities you most likely need to tune 'optAlpha' and -- 'optBucketCount'. instance Default Options where def = Options { optAlpha = defaultAlpha , optK = defaultK -- see Fig.2 from "BitTorrent Mainline DHT Measurement" paper. , optBucketCount = defaultBucketCount -- see Fig.4 from "Profiling a Million User DHT" paper. , optTimeout = 5 -- seconds , optReplication = 20 -- nodes , optReannounce = 15 * 60 , optDataExpired = 60 * 60 } seconds :: NominalDiffTime -> Int seconds dt = fromEnum (realToFrac dt :: Uni) {----------------------------------------------------------------------- -- Tokens policy -----------------------------------------------------------------------} data SessionTokens = SessionTokens { tokenMap :: !TokenMap , lastUpdate :: !UTCTime , maxInterval :: !NominalDiffTime } nullSessionTokens :: IO SessionTokens nullSessionTokens = SessionTokens <$> (tokens <$> liftIO randomIO) <*> liftIO getCurrentTime <*> pure defaultUpdateInterval -- TODO invalidate *twice* if needed invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens invalidateTokens curTime ts @ SessionTokens {..} | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens { tokenMap = update tokenMap , lastUpdate = curTime , maxInterval = maxInterval } | otherwise = ts {----------------------------------------------------------------------- -- Session -----------------------------------------------------------------------} -- | A set of torrents this peer intends to share. type AnnounceSet = Set (InfoHash, PortNumber) -- | Logger function. type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () -- | DHT session keep track state of /this/ node. data Node raw dht u ip = Node { -- | Session configuration; options :: !Options -- | Pseudo-unique self-assigned session identifier. This value is -- constant during DHT session and (optionally) between sessions. , tentativeNodeId :: !(NodeId dht) , resources :: !InternalState , manager :: !(Manager raw dht) -- ^ RPC manager; , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. , loggerFun :: !LogFun } -- | DHT keep track current session and proper resource allocation for -- safe multithreading. newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } deriving ( Functor, Applicative, Monad, MonadIO , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow ) #if MIN_VERSION_monad_control(1,0,0) newtype DHTStM raw dht u ip a = StM { unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif instance MonadBaseControl IO (DHT raw dht u ip) where #if MIN_VERSION_monad_control(1,0,0) type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a #else newtype StM (DHT raw dht u ip) a = StM { unSt :: StM (ReaderT (Node raw dht u ip) IO) a } #endif liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> cc $ \ (DHT m) -> StM <$> cc' m {-# INLINE liftBaseWith #-} restoreM = DHT . restoreM . unSt {-# INLINE restoreM #-} -- | Check is it is possible to run 'queryNode' or handle pending -- query from remote node. instance MonadActive (DHT raw dht u ip) where monadActive = getManager >>= liftIO . isActive {-# INLINE monadActive #-} -- | All allocated resources will be closed at 'closeNode'. instance MonadResource (DHT raw dht u ip) where liftResourceT m = do s <- asks resources liftIO $ runInternalState m s -- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where getManager :: DHT raw dht u ip (Manager raw dht) getManager = asks manager instance MonadLogger (DHT raw dht u ip) where monadLoggerLog loc src lvl msg = do logger <- asks loggerFun liftIO $ logger loc src lvl (toLogStr msg) #ifdef VERSION_bencoding type NodeHandler = Handler IO KMessageOf BValue #else type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString #endif logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () logt lf c m txt = lf (locFromCS callStack) (fromString m) (lvl c) (fromString $ Text.unpack txt) where lvl 'D' = LevelDebug lvl 'I' = LevelInfo lvl 'W' = LevelWarn lvl 'E' = LevelError lvl ch = LevelOther $ Text.cons ch Text.empty mkLoggerLoc :: GHC.SrcLoc -> Loc mkLoggerLoc loc = Loc { loc_filename = GHC.srcLocFile loc , loc_package = GHC.srcLocPackage loc , loc_module = GHC.srcLocModule loc , loc_start = ( GHC.srcLocStartLine loc , GHC.srcLocStartCol loc) , loc_end = ( GHC.srcLocEndLine loc , GHC.srcLocEndCol loc) } locFromCS :: GHC.CallStack -> Loc locFromCS cs = case getCallStack cs of ((_, loc):_) -> mkLoggerLoc loc _ -> Loc "" "" "" (0,0) (0,0) -- | Run DHT session. You /must/ properly close session using -- 'closeNode' function, otherwise socket or other scarce resources may -- leak. newNode :: ( Address ip , FiniteBits (NodeId dht) , Serialize (NodeId dht) ) => -- [NodeHandler] -- ^ handlers to run on accepted queries; Options -- ^ various dht options; -> NodeAddr ip -- ^ node address to bind; -> LogFun -- ^ invoked on log messages; -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. newNode opts naddr logger mbid = do s <- createInternalState runInternalState initNode s `onException` closeInternalState s where rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } nodeAddr = toSockAddr naddr initNode = do s <- getInternalState (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr []) closeManager liftIO $ do myId <- maybe genNodeId return mbid node <- Node opts myId s m <$> atomically (newTVar Nothing) <*> newTVarIO def <*> newTVarIO S.empty <*> (newTVarIO =<< nullSessionTokens) <*> pure logger return node -- | Some resources like listener thread may live for -- some short period of time right after this DHT session closed. closeNode :: Node raw dht u ip -> IO () closeNode Node {..} = closeInternalState resources -- | Run DHT operation on the given session. runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a runDHT node action = runReaderT (unDHT action) node {-# INLINE runDHT #-} {----------------------------------------------------------------------- -- Routing -----------------------------------------------------------------------} -- /pick a random ID/ in the range of the bucket and perform a -- find_nodes search on it. {----------------------------------------------------------------------- -- Tokens -----------------------------------------------------------------------} tryUpdateSecret :: TVar SessionTokens -> IO () tryUpdateSecret toks = do curTime <- liftIO getCurrentTime liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime) grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token grantToken sessionTokens addr = do tryUpdateSecret sessionTokens toks <- readTVarIO sessionTokens return $ T.lookup addr $ tokenMap toks -- | Throws 'HandlerError' if the token is invalid or already -- expired. See 'TokenMap' for details. checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool checkToken sessionTokens addr questionableToken = do tryUpdateSecret sessionTokens toks <- readTVarIO sessionTokens return $ T.member addr questionableToken (tokenMap toks) {----------------------------------------------------------------------- -- Routing table -----------------------------------------------------------------------} -- | This nodes externally routable address reported by remote peers. routableAddress :: DHT raw dht u ip (Maybe SockAddr) routableAddress = do info <- asks routingInfo >>= liftIO . atomically . readTVar return $ myAddress <$> info -- | The current NodeId that the given remote node should know us by. myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) myNodeIdAccordingTo _ = do info <- asks routingInfo >>= liftIO . atomically . readTVar maybe (asks tentativeNodeId) (return . myNodeId) info myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) myNodeIdAccordingTo1 = do var <- asks routingInfo tid <- asks tentativeNodeId return $ \ _ -> do info <- atomically $ readTVar var return $ maybe tid myNodeId info -- | Get current routing table. Normally you don't need to use this -- function, but it can be usefull for debugging and profiling purposes. getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) getTable = do Node { tentativeNodeId = myId , routingInfo = var , options = opts } <- ask let nil = nullTable myId (optBucketCount opts) liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] getSwarms = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.knownSwarms store savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString savePeerStore = do var <- asks contactInfo peers <- liftIO $ atomically $ readTVar var return $ S.encode peers mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () mergeSavedPeers bs = do var <- asks contactInfo case S.decode bs of Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies) Left _ -> return () allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] allPeers ih = do store <- asks contactInfo >>= liftIO . atomically . readTVar return $ P.lookup ih store -- | Find a set of closest nodes from routing table of this node. (in -- no particular order) -- -- This operation used for 'find_nodes' query. -- getClosest :: ( Eq ip , Ord (NodeId dht) , FiniteBits (NodeId dht) , TableKey dht k ) => k -> DHT raw dht u ip [NodeInfo dht ip u] getClosest node = do k <- asks (optK . options) kclosest k node <$> getTable getClosest1 :: ( Eq ip , Ord (NodeId dht) , FiniteBits (NodeId dht) , TableKey dht k ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) getClosest1 = do k <- asks (optK . options) nobkts <- asks (optBucketCount . options) myid <- asks tentativeNodeId var <- asks routingInfo return $ \node -> do nfo <- atomically $ readTVar var let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo return $ kclosest k node tbl {----------------------------------------------------------------------- -- Peer storage -----------------------------------------------------------------------} refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO () refreshContacts var = -- TODO limit dht peer store in size (probably by removing oldest peers) return () -- | Insert peer to peer store. Used to handle announce requests. insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO () insertPeer var ih name addr = do refreshContacts var atomically $ modifyTVar' var (P.insertPeer ih name addr) -- | Get peer set for specific swarm. lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip] lookupPeers var ih = do refreshContacts var tm <- getTimestamp atomically $ do (ps,store') <- P.freshPeers ih tm <$> readTVar var writeTVar var store' return ps getTimestamp :: IO Timestamp getTimestamp = do utcTime <- getCurrentTime -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) return $ utcTimeToPOSIXSeconds utcTime #ifdef VERSION_bencoding -- | Prepare result for 'get_peers' query. -- -- This operation use 'getClosest' as failback so it may block. -- getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) getPeerList ih = do var <- asks contactInfo ps <- liftIO $ lookupPeers var ih if L.null ps then Left <$> getClosest ih else return (Right ps) getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) getPeerList1 = do var <- asks contactInfo getclosest <- getClosest1 return $ \ih -> do ps <- lookupPeers var ih if L.null ps then Left <$> getclosest ih else return (Right ps) insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () insertTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () deleteTopic ih p = do var <- asks announceInfo liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) #endif {----------------------------------------------------------------------- -- Messaging -----------------------------------------------------------------------} -- | Failed queries are ignored. queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] queryParallel queries = do -- TODO: use alpha -- alpha <- asks (optAlpha . options) cleanup <$> mapConcurrently try queries where cleanup :: [Either QueryFailure a] -> [a] cleanup = mapMaybe (either (const Nothing) Just)