summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Session.hs
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-01-12 13:55:14 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-01-12 13:55:14 +0400
commita43a810ff24bc1ab946e5459c0f914aca286c62f (patch)
treeb716159332dd71661da49fe361dd7daf2052a778 /src/Network/BitTorrent/DHT/Session.hs
parente3d3fb375ca01aa844e86b8a4c5ca507919518d3 (diff)
Unify all iterative queries
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs96
1 files changed, 76 insertions, 20 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index f08d85c6..1a199595 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -35,14 +35,20 @@ module Network.BitTorrent.DHT.Session
35 -- * Messaging 35 -- * Messaging
36 -- ** Initiate 36 -- ** Initiate
37 , queryNode 37 , queryNode
38 , queryParallel
38 , (<@>) 39 , (<@>)
39 40
40 -- ** Accept 41 -- ** Accept
41 , NodeHandler 42 , NodeHandler
42 , nodeHandler 43 , nodeHandler
43 44
44 -- ** Iterate 45 -- ** Search
46 , ping
47
45 , Iteration 48 , Iteration
49 , findNodeQ
50 , getPeersQ
51
46 , Search 52 , Search
47 , search 53 , search
48 ) where 54 ) where
@@ -52,6 +58,7 @@ import Prelude hiding (ioError)
52import Control.Applicative 58import Control.Applicative
53import Control.Concurrent.STM 59import Control.Concurrent.STM
54import Control.Concurrent.Lifted hiding (yield) 60import Control.Concurrent.Lifted hiding (yield)
61import Control.Concurrent.Async.Lifted
55import Control.Exception.Lifted hiding (Handler) 62import Control.Exception.Lifted hiding (Handler)
56import Control.Monad.Base 63import Control.Monad.Base
57import Control.Monad.Logger 64import Control.Monad.Logger
@@ -60,9 +67,11 @@ import Control.Monad.Trans.Control
60import Control.Monad.Trans.Resource 67import Control.Monad.Trans.Resource
61import Data.Conduit 68import Data.Conduit
62import Data.Default 69import Data.Default
70import Data.Either
63import Data.Fixed 71import Data.Fixed
64import Data.Hashable 72import Data.Hashable
65import Data.List as L 73import Data.List as L
74import Data.Maybe
66import Data.Monoid 75import Data.Monoid
67import Data.Text as T 76import Data.Text as T
68import Data.Time 77import Data.Time
@@ -95,6 +104,18 @@ defaultAlpha = 3
95 104
96-- TODO add replication loop 105-- TODO add replication loop
97 106
107-- TODO do not insert infohash -> peeraddr if infohash is too far from
108-- this node id
109
110data Order
111 = NearFirst
112 | FarFirst
113 | Random
114
115data Traversal
116 = Greedy -- ^ aggressive short-circuit traversal
117 | Exhaustive -- ^
118
98-- | Original Kamelia DHT uses term /publish/ for data replication 119-- | Original Kamelia DHT uses term /publish/ for data replication
99-- process. BitTorrent DHT uses term /announce/ since the purpose of 120-- process. BitTorrent DHT uses term /announce/ since the purpose of
100-- the DHT is peer discovery. Later in documentation, we use terms 121-- the DHT is peer discovery. Later in documentation, we use terms
@@ -253,10 +274,10 @@ runDHT handlers opts naddr action = runResourceT $ do
253-----------------------------------------------------------------------} 274-----------------------------------------------------------------------}
254 275
255routing :: Address ip => Routing ip a -> DHT ip (Maybe a) 276routing :: Address ip => Routing ip a -> DHT ip (Maybe a)
256routing = runRouting ping refreshNodes getTimestamp 277routing = runRouting probeNode refreshNodes getTimestamp
257 278
258ping :: Address ip => NodeAddr ip -> DHT ip Bool 279probeNode :: Address ip => NodeAddr ip -> DHT ip Bool
259ping addr = do 280probeNode addr = do
260 $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) 281 $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr)))
261 result <- try $ Ping <@> addr 282 result <- try $ Ping <@> addr
262 let _ = result :: Either SomeException Ping 283 let _ = result :: Either SomeException Ping
@@ -384,19 +405,29 @@ getPeerList ih = do
384 405
385-- | Throws exception if node is not responding. 406-- | Throws exception if node is not responding.
386queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 407queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
387 => NodeAddr ip -> a -> DHT ip b 408 => NodeAddr ip -> a -> DHT ip (NodeId, b)
388queryNode addr q = do 409queryNode addr q = do
389 nid <- getNodeId 410 nid <- getNodeId
390 Response remoteId r <- query (toSockAddr addr) (Query nid q) 411 Response remoteId r <- query (toSockAddr addr) (Query nid q)
391 insertNode (NodeInfo remoteId addr) 412 insertNode (NodeInfo remoteId addr)
392 return r 413 return (remoteId, r)
393 414
394-- | Infix version of 'queryNode' function. 415-- | Infix version of 'queryNode' function.
395(<@>) :: Address ip => KRPC (Query a) (Response b) 416(<@>) :: Address ip => KRPC (Query a) (Response b)
396 => a -> NodeAddr ip -> DHT ip b 417 => a -> NodeAddr ip -> DHT ip b
397(<@>) = flip queryNode 418q <@> addr = snd <$> queryNode addr q
398{-# INLINE (<@>) #-} 419{-# INLINE (<@>) #-}
399 420
421-- TODO: use alpha
422-- | Failed queries are ignored.
423queryParallel :: [DHT ip a] -> DHT ip [a]
424queryParallel queries = do
425 alpha <- asks (optAlpha . options)
426 cleanup <$> mapConcurrently try queries
427 where
428 cleanup :: [Either QueryFailure a] -> [a]
429 cleanup = mapMaybe (either (const Nothing) Just)
430
400type NodeHandler ip = Handler (DHT ip) 431type NodeHandler ip = Handler (DHT ip)
401 432
402nodeHandler :: Address ip => KRPC (Query a) (Response b) 433nodeHandler :: Address ip => KRPC (Query a) (Response b)
@@ -412,16 +443,41 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
412-- Search 443-- Search
413-----------------------------------------------------------------------} 444-----------------------------------------------------------------------}
414 445
415type Iteration ip i o = i ip -> DHT ip (Either [i ip] [o ip]) 446ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip)
416type Search ip i o = Conduit [i ip] (DHT ip) [o ip] 447ping addr = do
417 448 (nid, Ping) <- queryNode addr Ping
418-- TODO: use all inputs 449 return (NodeInfo nid addr)
419search :: Address ip => Iteration ip i o -> Search ip i o 450
420search action = do 451type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
421 alpha <- lift $ asks (optAlpha . options) 452
422 awaitForever $ \ inputs -> do 453-- TODO match with expected node id
423 forM_ (L.take alpha inputs) $ \ input -> do 454findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo
424 result <- lift $ try $ action input 455findNodeQ nid NodeInfo {..} = do
425 case result of 456 NodeFound closest <- FindNode nid <@> nodeAddr
426 Left e -> let _ = e :: IOError in return () 457 return $ Right closest
427 Right r -> either leftover yield r 458
459isLeft :: Either a b -> Bool
460isLeft (Right _) = False
461isLeft (Left _) = True
462
463getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr
464getPeersQ topic NodeInfo {..} = do
465 GotPeers {..} <- GetPeers topic <@> nodeAddr
466 let dist = distance (toNodeId topic) nodeId
467 $(logInfoS) "getPeersQ" $ T.pack
468 $ "distance: " <> render (pretty dist) <> " , result: "
469 <> if isLeft peers then "NODES" else "PEERS"
470 return peers
471
472type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
473
474-- TODO: use reorder and filter (Traversal option) leftovers
475search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
476search k action = do
477 awaitForever $ \ inputs -> unless (L.null inputs) $ do
478 $(logWarnS) "search" "start query"
479 responses <- lift $ queryParallel (action <$> inputs)
480 let (nodes, results) = partitionEithers responses
481 $(logWarnS) "search" "done query"
482 leftover $ L.concat nodes
483 mapM_ yield results