From 84486e1bed75ae3ece0217fb330c6ca648931bb3 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Mon, 13 Jan 2014 11:13:42 +0400 Subject: Refactor DHT module --- src/Network/BitTorrent/DHT.hs | 61 ++++--------------------- src/Network/BitTorrent/DHT/Session.hs | 86 ++++++++++++++++++++++++++++++----- 2 files changed, 84 insertions(+), 63 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 7c892349..71803ccf 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -28,58 +28,17 @@ module Network.BitTorrent.DHT ) where import Control.Applicative -import Control.Exception.Lifted -import Control.Monad as M import Control.Monad.Logger import Control.Monad.Trans import Data.Conduit as C import Data.Conduit.List as C -import Data.List as L -import Data.Monoid -import Data.Text as T import Network.Socket (PortNumber) -import Text.PrettyPrint as PP hiding ((<>), ($$)) -import Text.PrettyPrint.Class import Data.Torrent.InfoHash -import Network.KRPC (QueryFailure) import Network.BitTorrent.Core -import Network.BitTorrent.DHT.Message -import Network.BitTorrent.DHT.Routing import Network.BitTorrent.DHT.Session -{----------------------------------------------------------------------- --- Handlers ------------------------------------------------------------------------} - -pingH :: Address ip => NodeHandler ip -pingH = nodeHandler $ \ _ Ping -> do - return Ping - -findNodeH :: Address ip => NodeHandler ip -findNodeH = nodeHandler $ \ _ (FindNode nid) -> do - NodeFound <$> getClosest nid - -getPeersH :: Address ip => NodeHandler ip -getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do - GotPeers <$> getPeerList ih <*> grantToken naddr - -announceH :: Address ip => NodeHandler ip -announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do - checkToken naddr sessionToken - let annPort = if impliedPort then nodePort else port - let peerAddr = PeerAddr Nothing nodeHost annPort - insertPeer topic peerAddr - return Announced - -handlers :: Address ip => [NodeHandler ip] -handlers = [pingH, findNodeH, getPeersH, announceH] - -{----------------------------------------------------------------------- --- DHT operations ------------------------------------------------------------------------} - -- | Run DHT on specified port. dht :: Address ip => Options -- ^ normally you need to use 'Data.Default.def'; @@ -93,7 +52,8 @@ dht = runDHT handlers -- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping -- process can take up to 5 minutes. -- --- This operation is synchronous and do block, use 'async' if needed. +-- This operation is synchronous and do block, use +-- 'Control.Concurrent.Async.Lifted.async' if needed. -- bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () bootstrap startNodes = do @@ -118,19 +78,18 @@ lookup topic = do -- TODO retry getClosest if bucket is empty -- | Announce that /this/ peer may have some pieces of the specified -- torrent. -- --- This operation is synchronous and do block, use 'async' if needed. +-- This operation is synchronous and do block, use +-- 'Control.Concurrent.Async.Lifted.async' if needed. -- insert :: Address ip => InfoHash -> PortNumber -> DHT ip () -insert ih port = do - nodes <- getClosest ih - forM_ (nodeAddr <$> nodes) $ \ addr -> do --- GotPeers {..} <- GetPeers ih <@> addr --- Announced <- Announce False ih undefined grantedToken <@> addr - return () +insert ih p = do + publish ih p + insertTopic ih p -- | Stop announcing /this/ peer for the specified torrent. -- -- This operation is atomic and may block for a while. -- -delete :: Address ip => InfoHash -> DHT ip () -delete = error "DHT.delete: not implemented" \ No newline at end of file +delete :: Address ip => InfoHash -> PortNumber -> DHT ip () +delete = deleteTopic +{-# INLINE delete #-} \ No newline at end of file diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 1a199595..e26cbad1 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -31,26 +31,36 @@ module Network.BitTorrent.DHT.Session -- * Peer storage , insertPeer , getPeerList + , insertTopic + , deleteTopic -- * Messaging -- ** Initiate , queryNode , queryParallel , (<@>) + , ping -- ** Accept , NodeHandler , nodeHandler - - -- ** Search - , ping - + , pingH + , findNodeH + , getPeersH + , announceH + , handlers + + -- * Search + -- ** Step , Iteration , findNodeQ , getPeersQ + , announceQ + -- ** Traversal , Search , search + , publish ) where import Prelude hiding (ioError) @@ -66,6 +76,7 @@ import Control.Monad.Reader import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Conduit +import Data.Conduit.List as C hiding (mapMaybe, mapM_) import Data.Default import Data.Either import Data.Fixed @@ -76,9 +87,10 @@ import Data.Monoid import Data.Text as T import Data.Time import Data.Time.Clock.POSIX +import Network (PortNumber) import System.Log.FastLogger import System.Random (randomIO) -import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.Class import Data.Torrent.InfoHash @@ -399,6 +411,12 @@ getPeerList ih = do then Left <$> getClosest ih else return (Right ps) +insertTopic :: InfoHash -> PortNumber -> DHT ip () +insertTopic = undefined + +deleteTopic :: InfoHash -> PortNumber -> DHT ip () +deleteTopic = undefined + {----------------------------------------------------------------------- -- Messaging -----------------------------------------------------------------------} @@ -428,6 +446,11 @@ queryParallel queries = do cleanup :: [Either QueryFailure a] -> [a] cleanup = mapMaybe (either (const Nothing) Just) +ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) +ping addr = do + (nid, Ping) <- queryNode addr Ping + return (NodeInfo nid addr) + type NodeHandler ip = Handler (DHT ip) nodeHandler :: Address ip => KRPC (Query a) (Response b) @@ -443,11 +466,6 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do -- Search -----------------------------------------------------------------------} -ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) -ping addr = do - (nid, Ping) <- queryNode addr Ping - return (NodeInfo nid addr) - type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) -- TODO match with expected node id @@ -469,15 +487,59 @@ getPeersQ topic NodeInfo {..} = do <> if isLeft peers then "NODES" else "PEERS" return peers +announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr +announceQ ih p NodeInfo {..} = do + GotPeers {..} <- GetPeers ih <@> nodeAddr + case peers of + Left ns + | False -> undefined -- TODO check if we can announce + | otherwise -> return (Left ns) + Right ps -> do -- TODO *probably* add to peer cache + Announced <- Announce False ih p grantedToken <@> nodeAddr + return (Right [nodeAddr]) + type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] -- TODO: use reorder and filter (Traversal option) leftovers search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o search k action = do - awaitForever $ \ inputs -> unless (L.null inputs) $ do + awaitForever $ \ batch -> unless (L.null batch) $ do $(logWarnS) "search" "start query" - responses <- lift $ queryParallel (action <$> inputs) + responses <- lift $ queryParallel (action <$> batch) let (nodes, results) = partitionEithers responses $(logWarnS) "search" "done query" leftover $ L.concat nodes mapM_ yield results + +publish :: Address ip => InfoHash -> PortNumber -> DHT ip () +publish ih port = do + nodes <- getClosest ih + _ <- sourceList [nodes] $= search ih (announceQ ih port) $$ C.take 20 + return () + +{----------------------------------------------------------------------- +-- Handlers +-----------------------------------------------------------------------} + +pingH :: Address ip => NodeHandler ip +pingH = nodeHandler $ \ _ Ping -> do + return Ping + +findNodeH :: Address ip => NodeHandler ip +findNodeH = nodeHandler $ \ _ (FindNode nid) -> do + NodeFound <$> getClosest nid + +getPeersH :: Address ip => NodeHandler ip +getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do + GotPeers <$> getPeerList ih <*> grantToken naddr + +announceH :: Address ip => NodeHandler ip +announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do + checkToken naddr sessionToken + let annPort = if impliedPort then nodePort else port + let peerAddr = PeerAddr Nothing nodeHost annPort + insertPeer topic peerAddr + return Announced + +handlers :: Address ip => [NodeHandler ip] +handlers = [pingH, findNodeH, getPeersH, announceH] -- cgit v1.2.3