From a43a810ff24bc1ab946e5459c0f914aca286c62f Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 12 Jan 2014 13:55:14 +0400 Subject: Unify all iterative queries --- src/Network/BitTorrent/Core/NodeInfo.hs | 7 +++ src/Network/BitTorrent/DHT.hs | 58 ++++++-------------- src/Network/BitTorrent/DHT/Session.hs | 96 ++++++++++++++++++++++++++------- 3 files changed, 99 insertions(+), 62 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/Core/NodeInfo.hs b/src/Network/BitTorrent/Core/NodeInfo.hs index fe2357a4..2fd7e575 100644 --- a/src/Network/BitTorrent/Core/NodeInfo.hs +++ b/src/Network/BitTorrent/Core/NodeInfo.hs @@ -47,6 +47,7 @@ import Data.ByteString.Base16 as Base16 import Data.BEncode as BE import Data.Default import Data.Hashable +import Data.Foldable import Data.IP import Data.List as L import Data.Monoid @@ -125,6 +126,12 @@ genNodeId = NodeId <$> getEntropy nodeIdSize newtype NodeDistance = NodeDistance BS.ByteString deriving (Eq, Ord) +instance Pretty NodeDistance where + pretty (NodeDistance bs) = foldMap bitseq $ BS.unpack bs + where + listBits w = L.map (testBit w) (L.reverse [0..bitSize w - 1]) + bitseq = foldMap (int . fromEnum) . listBits + -- | distance(A,B) = |A xor B| Smaller values are closer. distance :: NodeId -> NodeId -> NodeDistance distance (NodeId a) (NodeId b) = NodeDistance (BS.pack (BS.zipWith xor a b)) diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 7f1fa295..7c892349 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -76,20 +76,6 @@ announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do handlers :: Address ip => [NodeHandler ip] handlers = [pingH, findNodeH, getPeersH, announceH] -{----------------------------------------------------------------------- --- Query ------------------------------------------------------------------------} - -findNodeQ :: Address ip => NodeId -> Iteration ip NodeAddr NodeInfo -findNodeQ nid addr = do - NodeFound closest <- FindNode nid <@> addr - return $ Right closest - -getPeersQ :: Address ip => InfoHash -> Iteration ip NodeInfo PeerAddr -getPeersQ topic NodeInfo {..} = do - GotPeers {..} <- GetPeers topic <@> nodeAddr - return peers - {----------------------------------------------------------------------- -- DHT operations -----------------------------------------------------------------------} @@ -107,48 +93,36 @@ dht = runDHT handlers -- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping -- process can take up to 5 minutes. -- --- (TODO) This operation is asynchronous and do not block. +-- This operation is synchronous and do block, use 'async' if needed. -- bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () bootstrap startNodes = do - $(logInfoS) "bootstrap" "Start node bootstrapping" - M.mapM_ insertClosest startNodes - $(logInfoS) "bootstrap" "Node bootstrapping finished" - where - insertClosest addr = do - t <- getTable - unless (full t) $ do - nid <- getNodeId - result <- try $ FindNode nid <@> addr - case result of - Left e -> do - $(logWarnS) "bootstrap" $ T.pack $ show (e :: QueryFailure) - - Right (NodeFound closest) -> do - $(logDebug) $ "Get a list of closest nodes: " <> - T.pack (PP.render (pretty closest)) - forM_ closest $ \ info @ NodeInfo {..} -> do - let prettyAddr = T.pack (show (pretty nodeAddr)) - $(logInfoS) "bootstrap" $ "table detalization" <> prettyAddr - insertClosest nodeAddr + $(logInfoS) "bootstrap" "Start node bootstrapping" + nid <- getNodeId + aliveNodes <- queryParallel (ping <$> startNodes) + _ <- sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume + $(logInfoS) "bootstrap" "Node bootstrapping finished" +-- t <- getTable +-- unless (full t) $ do +-- nid <- getNodeId -- | Get list of peers which downloading this torrent. -- --- (TODO) This operation is synchronous and do block. +-- This operation is incremental and do block. -- lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip] -lookup topic = do -- TODO retry getClosestHash if bucket is empty - closest <- lift $ getClosestHash topic - sourceList [closest] $= search (getPeersQ topic) +lookup topic = do -- TODO retry getClosest if bucket is empty + closest <- lift $ getClosest topic + sourceList [closest] $= search topic (getPeersQ topic) -- | Announce that /this/ peer may have some pieces of the specified -- torrent. -- --- (TODO) This operation is asynchronous and do not block. +-- This operation is synchronous and do block, use 'async' if needed. -- insert :: Address ip => InfoHash -> PortNumber -> DHT ip () insert ih port = do - nodes <- getClosestHash ih + nodes <- getClosest ih forM_ (nodeAddr <$> nodes) $ \ addr -> do -- GotPeers {..} <- GetPeers ih <@> addr -- Announced <- Announce False ih undefined grantedToken <@> addr @@ -156,7 +130,7 @@ insert ih port = do -- | Stop announcing /this/ peer for the specified torrent. -- --- This operation is atomic and do not block. +-- This operation is atomic and may block for a while. -- delete :: Address ip => InfoHash -> DHT ip () delete = error "DHT.delete: not implemented" \ No newline at end of file diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index f08d85c6..1a199595 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -35,14 +35,20 @@ module Network.BitTorrent.DHT.Session -- * Messaging -- ** Initiate , queryNode + , queryParallel , (<@>) -- ** Accept , NodeHandler , nodeHandler - -- ** Iterate + -- ** Search + , ping + , Iteration + , findNodeQ + , getPeersQ + , Search , search ) where @@ -52,6 +58,7 @@ import Prelude hiding (ioError) import Control.Applicative import Control.Concurrent.STM import Control.Concurrent.Lifted hiding (yield) +import Control.Concurrent.Async.Lifted import Control.Exception.Lifted hiding (Handler) import Control.Monad.Base import Control.Monad.Logger @@ -60,9 +67,11 @@ import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Conduit import Data.Default +import Data.Either import Data.Fixed import Data.Hashable import Data.List as L +import Data.Maybe import Data.Monoid import Data.Text as T import Data.Time @@ -95,6 +104,18 @@ defaultAlpha = 3 -- TODO add replication loop +-- TODO do not insert infohash -> peeraddr if infohash is too far from +-- this node id + +data Order + = NearFirst + | FarFirst + | Random + +data Traversal + = Greedy -- ^ aggressive short-circuit traversal + | Exhaustive -- ^ + -- | Original Kamelia DHT uses term /publish/ for data replication -- process. BitTorrent DHT uses term /announce/ since the purpose of -- the DHT is peer discovery. Later in documentation, we use terms @@ -253,10 +274,10 @@ runDHT handlers opts naddr action = runResourceT $ do -----------------------------------------------------------------------} routing :: Address ip => Routing ip a -> DHT ip (Maybe a) -routing = runRouting ping refreshNodes getTimestamp +routing = runRouting probeNode refreshNodes getTimestamp -ping :: Address ip => NodeAddr ip -> DHT ip Bool -ping addr = do +probeNode :: Address ip => NodeAddr ip -> DHT ip Bool +probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) result <- try $ Ping <@> addr let _ = result :: Either SomeException Ping @@ -384,19 +405,29 @@ getPeerList ih = do -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) - => NodeAddr ip -> a -> DHT ip b + => NodeAddr ip -> a -> DHT ip (NodeId, b) queryNode addr q = do nid <- getNodeId Response remoteId r <- query (toSockAddr addr) (Query nid q) insertNode (NodeInfo remoteId addr) - return r + return (remoteId, r) -- | Infix version of 'queryNode' function. (<@>) :: Address ip => KRPC (Query a) (Response b) => a -> NodeAddr ip -> DHT ip b -(<@>) = flip queryNode +q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-} +-- TODO: use alpha +-- | Failed queries are ignored. +queryParallel :: [DHT ip a] -> DHT ip [a] +queryParallel queries = do + alpha <- asks (optAlpha . options) + cleanup <$> mapConcurrently try queries + where + cleanup :: [Either QueryFailure a] -> [a] + cleanup = mapMaybe (either (const Nothing) Just) + type NodeHandler ip = Handler (DHT ip) nodeHandler :: Address ip => KRPC (Query a) (Response b) @@ -412,16 +443,41 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do -- Search -----------------------------------------------------------------------} -type Iteration ip i o = i ip -> DHT ip (Either [i ip] [o ip]) -type Search ip i o = Conduit [i ip] (DHT ip) [o ip] - --- TODO: use all inputs -search :: Address ip => Iteration ip i o -> Search ip i o -search action = do - alpha <- lift $ asks (optAlpha . options) - awaitForever $ \ inputs -> do - forM_ (L.take alpha inputs) $ \ input -> do - result <- lift $ try $ action input - case result of - Left e -> let _ = e :: IOError in return () - Right r -> either leftover yield r +ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) +ping addr = do + (nid, Ping) <- queryNode addr Ping + return (NodeInfo nid addr) + +type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) + +-- TODO match with expected node id +findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo +findNodeQ nid NodeInfo {..} = do + NodeFound closest <- FindNode nid <@> nodeAddr + return $ Right closest + +isLeft :: Either a b -> Bool +isLeft (Right _) = False +isLeft (Left _) = True + +getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr +getPeersQ topic NodeInfo {..} = do + GotPeers {..} <- GetPeers topic <@> nodeAddr + let dist = distance (toNodeId topic) nodeId + $(logInfoS) "getPeersQ" $ T.pack + $ "distance: " <> render (pretty dist) <> " , result: " + <> if isLeft peers then "NODES" else "PEERS" + return peers + +type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] + +-- TODO: use reorder and filter (Traversal option) leftovers +search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o +search k action = do + awaitForever $ \ inputs -> unless (L.null inputs) $ do + $(logWarnS) "search" "start query" + responses <- lift $ queryParallel (action <$> inputs) + let (nodes, results) = partitionEithers responses + $(logWarnS) "search" "done query" + leftover $ L.concat nodes + mapM_ yield results -- cgit v1.2.3