From a43a810ff24bc1ab946e5459c0f914aca286c62f Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 12 Jan 2014 13:55:14 +0400 Subject: Unify all iterative queries --- src/Network/BitTorrent/DHT/Session.hs | 96 +++++++++++++++++++++++++++-------- 1 file changed, 76 insertions(+), 20 deletions(-) (limited to 'src/Network/BitTorrent/DHT/Session.hs') diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index f08d85c6..1a199595 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -35,14 +35,20 @@ module Network.BitTorrent.DHT.Session -- * Messaging -- ** Initiate , queryNode + , queryParallel , (<@>) -- ** Accept , NodeHandler , nodeHandler - -- ** Iterate + -- ** Search + , ping + , Iteration + , findNodeQ + , getPeersQ + , Search , search ) where @@ -52,6 +58,7 @@ import Prelude hiding (ioError) import Control.Applicative import Control.Concurrent.STM import Control.Concurrent.Lifted hiding (yield) +import Control.Concurrent.Async.Lifted import Control.Exception.Lifted hiding (Handler) import Control.Monad.Base import Control.Monad.Logger @@ -60,9 +67,11 @@ import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Conduit import Data.Default +import Data.Either import Data.Fixed import Data.Hashable import Data.List as L +import Data.Maybe import Data.Monoid import Data.Text as T import Data.Time @@ -95,6 +104,18 @@ defaultAlpha = 3 -- TODO add replication loop +-- TODO do not insert infohash -> peeraddr if infohash is too far from +-- this node id + +data Order + = NearFirst + | FarFirst + | Random + +data Traversal + = Greedy -- ^ aggressive short-circuit traversal + | Exhaustive -- ^ + -- | Original Kamelia DHT uses term /publish/ for data replication -- process. BitTorrent DHT uses term /announce/ since the purpose of -- the DHT is peer discovery. Later in documentation, we use terms @@ -253,10 +274,10 @@ runDHT handlers opts naddr action = runResourceT $ do -----------------------------------------------------------------------} routing :: Address ip => Routing ip a -> DHT ip (Maybe a) -routing = runRouting ping refreshNodes getTimestamp +routing = runRouting probeNode refreshNodes getTimestamp -ping :: Address ip => NodeAddr ip -> DHT ip Bool -ping addr = do +probeNode :: Address ip => NodeAddr ip -> DHT ip Bool +probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) result <- try $ Ping <@> addr let _ = result :: Either SomeException Ping @@ -384,19 +405,29 @@ getPeerList ih = do -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip b + => NodeAddr ip -> a -> DHT ip (NodeId, b) queryNode addr q = do nid <- getNodeId Response remoteId r <- query (toSockAddr addr) (Query nid q) insertNode (NodeInfo remoteId addr) - return r + return (remoteId, r) -- | Infix version of 'queryNode' function. (<@>) :: Address ip => KRPC (Query a) (Response b) => a -> NodeAddr ip -> DHT ip b -(<@>) = flip queryNode +q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-} +-- TODO: use alpha +-- | Failed queries are ignored. +queryParallel :: [DHT ip a] -> DHT ip [a] +queryParallel queries = do + alpha <- asks (optAlpha . options) + cleanup <$> mapConcurrently try queries + where + cleanup :: [Either QueryFailure a] -> [a] + cleanup = mapMaybe (either (const Nothing) Just) + type NodeHandler ip = Handler (DHT ip) nodeHandler :: Address ip => KRPC (Query a) (Response b) @@ -412,16 +443,41 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do -- Search -----------------------------------------------------------------------} -type Iteration ip i o = i ip -> DHT ip (Either [i ip] [o ip]) -type Search ip i o = Conduit [i ip] (DHT ip) [o ip] - --- TODO: use all inputs -search :: Address ip => Iteration ip i o -> Search ip i o -search action = do - alpha <- lift $ asks (optAlpha . options) - awaitForever $ \ inputs -> do - forM_ (L.take alpha inputs) $ \ input -> do - result <- lift $ try $ action input - case result of - Left e -> let _ = e :: IOError in return () - Right r -> either leftover yield r +ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) +ping addr = do + (nid, Ping) <- queryNode addr Ping + return (NodeInfo nid addr) + +type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) + +-- TODO match with expected node id +findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo +findNodeQ nid NodeInfo {..} = do + NodeFound closest <- FindNode nid <@> nodeAddr + return $ Right closest + +isLeft :: Either a b -> Bool +isLeft (Right _) = False +isLeft (Left _) = True + +getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr +getPeersQ topic NodeInfo {..} = do + GotPeers {..} <- GetPeers topic <@> nodeAddr + let dist = distance (toNodeId topic) nodeId + $(logInfoS) "getPeersQ" $ T.pack + $ "distance: " <> render (pretty dist) <> " , result: " + <> if isLeft peers then "NODES" else "PEERS" + return peers + +type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] + +-- TODO: use reorder and filter (Traversal option) leftovers +search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o +search k action = do + awaitForever $ \ inputs -> unless (L.null inputs) $ do + $(logWarnS) "search" "start query" + responses <- lift $ queryParallel (action <$> inputs) + let (nodes, results) = partitionEithers responses + $(logWarnS) "search" "done query" + leftover $ L.concat nodes + mapM_ yield results -- cgit v1.2.3