From 7e61a78b975e586cde5c7f2729e5943d7a44699a Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Thu, 23 Jan 2014 02:47:42 +0400 Subject: Add class MonadDHT --- src/Network/BitTorrent/DHT.hs | 32 ++++++++++++++++++- src/Network/BitTorrent/DHT/Session.hs | 59 ++++++++++++++++++++++------------- 2 files changed, 69 insertions(+), 22 deletions(-) (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 77bb9da9..a97ebcf7 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -20,6 +20,7 @@ module Network.BitTorrent.DHT ( -- * Distributed Hash Table DHT + , MonadDHT (..) , dht -- * Initialization @@ -31,6 +32,14 @@ module Network.BitTorrent.DHT , Network.BitTorrent.DHT.lookup , Network.BitTorrent.DHT.insert , Network.BitTorrent.DHT.delete + + -- * Internal + -- | Can be used to implement instance of 'MonadDHT'. + , LogFun + , Node + , handlers + , startNode + , runDHT ) where import Control.Applicative @@ -45,6 +54,15 @@ import Data.Torrent.InfoHash import Network.BitTorrent.Core import Network.BitTorrent.DHT.Session +{----------------------------------------------------------------------- +-- DHT types +-----------------------------------------------------------------------} + +class MonadDHT m where + liftDHT :: DHT IPv4 a -> m a + +instance MonadDHT (DHT IPv4) where + liftDHT = id -- | Run DHT on specified port. dht :: Address ip @@ -52,9 +70,17 @@ dht :: Address ip -> NodeAddr ip -- ^ address to bind this node; -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; -> IO a -- ^ result. -dht = runDHT handlers +dht opts addr action = do + runResourceT $ do + runStderrLoggingT $ LoggingT $ \ logger -> do + node <- startNode handlers opts addr logger + runDHT node action {-# INLINE dht #-} +{----------------------------------------------------------------------- +-- Initialization +-----------------------------------------------------------------------} + -- | One good node may be sufficient. The list of bootstrapping nodes -- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping -- process can take up to 5 minutes. @@ -87,6 +113,10 @@ restore = error "DHT.restore: not implemented" snapshot :: DHT ip ByteString snapshot = error "DHT.snapshot: not implemented" +{----------------------------------------------------------------------- +-- Operations +-----------------------------------------------------------------------} + -- | Get list of peers which downloading this torrent. -- -- This operation is incremental and do block. diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 9455c465..50ca6db3 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -1,3 +1,10 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} @@ -14,6 +21,11 @@ module Network.BitTorrent.DHT.Session , defaultK , Options (..) + -- * Node + , LogFun + , Node + , startNode + -- * Session , DHT , runDHT @@ -233,7 +245,7 @@ invalidateTokens curTime ts @ SessionTokens {..} type AnnounceSet = Set (InfoHash, PortNumber) -- | Logger function. -type Logger = Loc -> LogSource -> LogLevel -> LogStr -> IO () +type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () -- | DHT session keep track state of /this/ node. data Node ip = Node @@ -244,7 +256,7 @@ data Node ip = Node , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs. - , loggerFun :: !Logger + , loggerFun :: !LogFun } -- | DHT keep track current session and proper resource allocation for @@ -274,28 +286,35 @@ instance MonadLogger (DHT ip) where logger <- asks loggerFun liftIO $ logger loc src lvl (toLogStr msg) +type NodeHandler ip = Handler (DHT ip) + -- | Run DHT session. Some resources like listener thread may live for -- some short period of time right after this DHT session closed. -runDHT :: forall ip a. Address ip - => [Handler (DHT ip)] -- ^ handlers to run on accepted queries; - -> Options -- ^ various dht options; - -> NodeAddr ip -- ^ node address to bind; - -> DHT ip a -- ^ DHT action to run; - -> IO a -- ^ result. -runDHT hs opts naddr action = runResourceT $ do - runStderrLoggingT $ LoggingT $ \ logger -> do - let rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } - let nodeAddr = toSockAddr naddr - (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager +startNode :: Address ip + => [NodeHandler ip] -- ^ handlers to run on accepted queries; + -> Options -- ^ various dht options; + -> NodeAddr ip -- ^ node address to bind; + -> LogFun -- ^ + -> ResIO (Node ip) -- ^ a new DHT node running at given address. +startNode hs opts naddr logger = do +-- (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager + m <- liftIO $ newManager rpcOpts nodeAddr hs myId <- liftIO genNodeId node <- liftIO $ Node opts myId m - <$> newMVar (nullTable myId (optBucketCount opts)) - <*> newTVarIO def - <*> newTVarIO S.empty - <*> (newTVarIO =<< nullSessionTokens) - <*> pure logger - runReaderT (unDHT (listen >> action)) node + <$> newMVar (nullTable myId (optBucketCount opts)) + <*> newTVarIO def + <*> newTVarIO S.empty + <*> (newTVarIO =<< nullSessionTokens) + <*> pure logger + runReaderT (unDHT listen) node + return node + where + rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } + nodeAddr = toSockAddr naddr +runDHT :: Node ip -> DHT ip a -> ResIO a +runDHT node action = runReaderT (unDHT action) node +{-# INLINE runDHT #-} askOption :: (Options -> a) -> DHT ip a askOption f = asks (f . options) @@ -475,8 +494,6 @@ ping addr = do (nid, Ping) <- queryNode addr Ping return (NodeInfo nid addr) -type NodeHandler ip = Handler (DHT ip) - nodeHandler :: Address ip => KRPC (Query a) (Response b) => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do -- cgit v1.2.3