diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-12 13:55:14 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-12 13:55:14 +0400 |
commit | a43a810ff24bc1ab946e5459c0f914aca286c62f (patch) | |
tree | b716159332dd71661da49fe361dd7daf2052a778 /src/Network/BitTorrent/DHT/Session.hs | |
parent | e3d3fb375ca01aa844e86b8a4c5ca507919518d3 (diff) |
Unify all iterative queries
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 96 |
1 files changed, 76 insertions, 20 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index f08d85c6..1a199595 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -35,14 +35,20 @@ module Network.BitTorrent.DHT.Session | |||
35 | -- * Messaging | 35 | -- * Messaging |
36 | -- ** Initiate | 36 | -- ** Initiate |
37 | , queryNode | 37 | , queryNode |
38 | , queryParallel | ||
38 | , (<@>) | 39 | , (<@>) |
39 | 40 | ||
40 | -- ** Accept | 41 | -- ** Accept |
41 | , NodeHandler | 42 | , NodeHandler |
42 | , nodeHandler | 43 | , nodeHandler |
43 | 44 | ||
44 | -- ** Iterate | 45 | -- ** Search |
46 | , ping | ||
47 | |||
45 | , Iteration | 48 | , Iteration |
49 | , findNodeQ | ||
50 | , getPeersQ | ||
51 | |||
46 | , Search | 52 | , Search |
47 | , search | 53 | , search |
48 | ) where | 54 | ) where |
@@ -52,6 +58,7 @@ import Prelude hiding (ioError) | |||
52 | import Control.Applicative | 58 | import Control.Applicative |
53 | import Control.Concurrent.STM | 59 | import Control.Concurrent.STM |
54 | import Control.Concurrent.Lifted hiding (yield) | 60 | import Control.Concurrent.Lifted hiding (yield) |
61 | import Control.Concurrent.Async.Lifted | ||
55 | import Control.Exception.Lifted hiding (Handler) | 62 | import Control.Exception.Lifted hiding (Handler) |
56 | import Control.Monad.Base | 63 | import Control.Monad.Base |
57 | import Control.Monad.Logger | 64 | import Control.Monad.Logger |
@@ -60,9 +67,11 @@ import Control.Monad.Trans.Control | |||
60 | import Control.Monad.Trans.Resource | 67 | import Control.Monad.Trans.Resource |
61 | import Data.Conduit | 68 | import Data.Conduit |
62 | import Data.Default | 69 | import Data.Default |
70 | import Data.Either | ||
63 | import Data.Fixed | 71 | import Data.Fixed |
64 | import Data.Hashable | 72 | import Data.Hashable |
65 | import Data.List as L | 73 | import Data.List as L |
74 | import Data.Maybe | ||
66 | import Data.Monoid | 75 | import Data.Monoid |
67 | import Data.Text as T | 76 | import Data.Text as T |
68 | import Data.Time | 77 | import Data.Time |
@@ -95,6 +104,18 @@ defaultAlpha = 3 | |||
95 | 104 | ||
96 | -- TODO add replication loop | 105 | -- TODO add replication loop |
97 | 106 | ||
107 | -- TODO do not insert infohash -> peeraddr if infohash is too far from | ||
108 | -- this node id | ||
109 | |||
110 | data Order | ||
111 | = NearFirst | ||
112 | | FarFirst | ||
113 | | Random | ||
114 | |||
115 | data Traversal | ||
116 | = Greedy -- ^ aggressive short-circuit traversal | ||
117 | | Exhaustive -- ^ | ||
118 | |||
98 | -- | Original Kamelia DHT uses term /publish/ for data replication | 119 | -- | Original Kamelia DHT uses term /publish/ for data replication |
99 | -- process. BitTorrent DHT uses term /announce/ since the purpose of | 120 | -- process. BitTorrent DHT uses term /announce/ since the purpose of |
100 | -- the DHT is peer discovery. Later in documentation, we use terms | 121 | -- the DHT is peer discovery. Later in documentation, we use terms |
@@ -253,10 +274,10 @@ runDHT handlers opts naddr action = runResourceT $ do | |||
253 | -----------------------------------------------------------------------} | 274 | -----------------------------------------------------------------------} |
254 | 275 | ||
255 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) | 276 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) |
256 | routing = runRouting ping refreshNodes getTimestamp | 277 | routing = runRouting probeNode refreshNodes getTimestamp |
257 | 278 | ||
258 | ping :: Address ip => NodeAddr ip -> DHT ip Bool | 279 | probeNode :: Address ip => NodeAddr ip -> DHT ip Bool |
259 | ping addr = do | 280 | probeNode addr = do |
260 | $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) | 281 | $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) |
261 | result <- try $ Ping <@> addr | 282 | result <- try $ Ping <@> addr |
262 | let _ = result :: Either SomeException Ping | 283 | let _ = result :: Either SomeException Ping |
@@ -384,19 +405,29 @@ getPeerList ih = do | |||
384 | 405 | ||
385 | -- | Throws exception if node is not responding. | 406 | -- | Throws exception if node is not responding. |
386 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 407 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
387 | => NodeAddr ip -> a -> DHT ip b | 408 | => NodeAddr ip -> a -> DHT ip (NodeId, b) |
388 | queryNode addr q = do | 409 | queryNode addr q = do |
389 | nid <- getNodeId | 410 | nid <- getNodeId |
390 | Response remoteId r <- query (toSockAddr addr) (Query nid q) | 411 | Response remoteId r <- query (toSockAddr addr) (Query nid q) |
391 | insertNode (NodeInfo remoteId addr) | 412 | insertNode (NodeInfo remoteId addr) |
392 | return r | 413 | return (remoteId, r) |
393 | 414 | ||
394 | -- | Infix version of 'queryNode' function. | 415 | -- | Infix version of 'queryNode' function. |
395 | (<@>) :: Address ip => KRPC (Query a) (Response b) | 416 | (<@>) :: Address ip => KRPC (Query a) (Response b) |
396 | => a -> NodeAddr ip -> DHT ip b | 417 | => a -> NodeAddr ip -> DHT ip b |
397 | (<@>) = flip queryNode | 418 | q <@> addr = snd <$> queryNode addr q |
398 | {-# INLINE (<@>) #-} | 419 | {-# INLINE (<@>) #-} |
399 | 420 | ||
421 | -- TODO: use alpha | ||
422 | -- | Failed queries are ignored. | ||
423 | queryParallel :: [DHT ip a] -> DHT ip [a] | ||
424 | queryParallel queries = do | ||
425 | alpha <- asks (optAlpha . options) | ||
426 | cleanup <$> mapConcurrently try queries | ||
427 | where | ||
428 | cleanup :: [Either QueryFailure a] -> [a] | ||
429 | cleanup = mapMaybe (either (const Nothing) Just) | ||
430 | |||
400 | type NodeHandler ip = Handler (DHT ip) | 431 | type NodeHandler ip = Handler (DHT ip) |
401 | 432 | ||
402 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | 433 | nodeHandler :: Address ip => KRPC (Query a) (Response b) |
@@ -412,16 +443,41 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | |||
412 | -- Search | 443 | -- Search |
413 | -----------------------------------------------------------------------} | 444 | -----------------------------------------------------------------------} |
414 | 445 | ||
415 | type Iteration ip i o = i ip -> DHT ip (Either [i ip] [o ip]) | 446 | ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) |
416 | type Search ip i o = Conduit [i ip] (DHT ip) [o ip] | 447 | ping addr = do |
417 | 448 | (nid, Ping) <- queryNode addr Ping | |
418 | -- TODO: use all inputs | 449 | return (NodeInfo nid addr) |
419 | search :: Address ip => Iteration ip i o -> Search ip i o | 450 | |
420 | search action = do | 451 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) |
421 | alpha <- lift $ asks (optAlpha . options) | 452 | |
422 | awaitForever $ \ inputs -> do | 453 | -- TODO match with expected node id |
423 | forM_ (L.take alpha inputs) $ \ input -> do | 454 | findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo |
424 | result <- lift $ try $ action input | 455 | findNodeQ nid NodeInfo {..} = do |
425 | case result of | 456 | NodeFound closest <- FindNode nid <@> nodeAddr |
426 | Left e -> let _ = e :: IOError in return () | 457 | return $ Right closest |
427 | Right r -> either leftover yield r | 458 | |
459 | isLeft :: Either a b -> Bool | ||
460 | isLeft (Right _) = False | ||
461 | isLeft (Left _) = True | ||
462 | |||
463 | getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr | ||
464 | getPeersQ topic NodeInfo {..} = do | ||
465 | GotPeers {..} <- GetPeers topic <@> nodeAddr | ||
466 | let dist = distance (toNodeId topic) nodeId | ||
467 | $(logInfoS) "getPeersQ" $ T.pack | ||
468 | $ "distance: " <> render (pretty dist) <> " , result: " | ||
469 | <> if isLeft peers then "NODES" else "PEERS" | ||
470 | return peers | ||
471 | |||
472 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | ||
473 | |||
474 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
475 | search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o | ||
476 | search k action = do | ||
477 | awaitForever $ \ inputs -> unless (L.null inputs) $ do | ||
478 | $(logWarnS) "search" "start query" | ||
479 | responses <- lift $ queryParallel (action <$> inputs) | ||
480 | let (nodes, results) = partitionEithers responses | ||
481 | $(logWarnS) "search" "done query" | ||
482 | leftover $ L.concat nodes | ||
483 | mapM_ yield results | ||