From 51e1c37e415e502902d58f8552ac09e379c12504 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Thu, 2 Jan 2014 23:28:05 +0400 Subject: Implement DHT lookup operation --- src/Network/BitTorrent/DHT.hs | 36 ++++++++++++++++++++++++----------- src/Network/BitTorrent/DHT/Session.hs | 6 ++++-- 2 files changed, 29 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 38de1f91..c35b5bd6 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -16,6 +16,7 @@ -- {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeOperators #-} module Network.BitTorrent.DHT ( -- * Distributed Hash Table DHT @@ -29,13 +30,16 @@ module Network.BitTorrent.DHT import Control.Applicative import Control.Concurrent.Lifted hiding (yield) import Control.Exception.Lifted -import Control.Monad +import Control.Monad as M import Control.Monad.Logger +import Control.Monad.Trans +import Data.Conduit as C +import Data.Conduit.List as C import Data.List as L import Data.Monoid import Data.Text as T import Network.Socket (PortNumber) -import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.Class import Data.Torrent.InfoHash @@ -82,6 +86,20 @@ handlers = [pingH, findNodeH, getPeersH, announceH] -- Query -----------------------------------------------------------------------} +findNodeQ :: Address ip => NodeId -> Iteration ip NodeAddr NodeInfo +findNodeQ nid addr = do + NodeFound closest <- FindNode nid <@> addr + return $ Right closest + +getPeersQ :: Address ip => InfoHash -> Iteration ip NodeInfo PeerAddr +getPeersQ topic NodeInfo {..} = do + GotPeers {..} <- GetPeers topic <@> nodeAddr + return peers + +{----------------------------------------------------------------------- +-- DHT operations +-----------------------------------------------------------------------} + -- | Run DHT on specified port. dht :: Address ip => Options -- ^ normally you need to use 'Data.Default.def'; @@ -100,7 +118,7 @@ dht = runDHT handlers bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () bootstrap startNodes = do $(logInfoS) "bootstrap" "Start node bootstrapping" - mapM_ insertClosest startNodes + M.mapM_ insertClosest startNodes $(logInfoS) "bootstrap" "Node bootstrapping finished" where insertClosest addr = do @@ -124,14 +142,10 @@ bootstrap startNodes = do -- -- (TODO) This operation is synchronous and do block. -- -lookup :: Address ip => InfoHash -> DHT ip [PeerAddr ip] -lookup topic = getClosestHash topic >>= collect - -- TODO retry getClosestHash if bucket is empty - where - collect nodes = L.concat <$> forM (nodeAddr <$> nodes) retrieve - retrieve addr = do - GotPeers {..} <- GetPeers topic <@> addr - either collect pure peers +lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip] +lookup topic = do -- TODO retry getClosestHash if bucket is empty + closest <- lift $ getClosestHash topic + sourceList [closest] $= search (getPeersQ topic) -- | Announce that /this/ peer may have some pieces of the specified -- torrent. diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index d3315a42..6c43c732 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -383,5 +383,7 @@ search action = do alpha <- lift $ asks (optAlpha . options) awaitForever $ \ inputs -> do forM_ (L.take alpha inputs) $ \ input -> do - result <- lift $ action input - either leftover yield result + result <- lift $ try $ action input + case result of + Left e -> let _ = e :: IOError in return () + Right r -> either leftover yield r -- cgit v1.2.3