summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-01-12 13:55:14 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-01-12 13:55:14 +0400
commita43a810ff24bc1ab946e5459c0f914aca286c62f (patch)
treeb716159332dd71661da49fe361dd7daf2052a778
parente3d3fb375ca01aa844e86b8a4c5ca507919518d3 (diff)
Unify all iterative queries
-rw-r--r--src/Network/BitTorrent/Core/NodeInfo.hs7
-rw-r--r--src/Network/BitTorrent/DHT.hs58
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs96
3 files changed, 99 insertions, 62 deletions
diff --git a/src/Network/BitTorrent/Core/NodeInfo.hs b/src/Network/BitTorrent/Core/NodeInfo.hs
index fe2357a4..2fd7e575 100644
--- a/src/Network/BitTorrent/Core/NodeInfo.hs
+++ b/src/Network/BitTorrent/Core/NodeInfo.hs
@@ -47,6 +47,7 @@ import Data.ByteString.Base16 as Base16
47import Data.BEncode as BE 47import Data.BEncode as BE
48import Data.Default 48import Data.Default
49import Data.Hashable 49import Data.Hashable
50import Data.Foldable
50import Data.IP 51import Data.IP
51import Data.List as L 52import Data.List as L
52import Data.Monoid 53import Data.Monoid
@@ -125,6 +126,12 @@ genNodeId = NodeId <$> getEntropy nodeIdSize
125newtype NodeDistance = NodeDistance BS.ByteString 126newtype NodeDistance = NodeDistance BS.ByteString
126 deriving (Eq, Ord) 127 deriving (Eq, Ord)
127 128
129instance Pretty NodeDistance where
130 pretty (NodeDistance bs) = foldMap bitseq $ BS.unpack bs
131 where
132 listBits w = L.map (testBit w) (L.reverse [0..bitSize w - 1])
133 bitseq = foldMap (int . fromEnum) . listBits
134
128-- | distance(A,B) = |A xor B| Smaller values are closer. 135-- | distance(A,B) = |A xor B| Smaller values are closer.
129distance :: NodeId -> NodeId -> NodeDistance 136distance :: NodeId -> NodeId -> NodeDistance
130distance (NodeId a) (NodeId b) = NodeDistance (BS.pack (BS.zipWith xor a b)) 137distance (NodeId a) (NodeId b) = NodeDistance (BS.pack (BS.zipWith xor a b))
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index 7f1fa295..7c892349 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -77,20 +77,6 @@ handlers :: Address ip => [NodeHandler ip]
77handlers = [pingH, findNodeH, getPeersH, announceH] 77handlers = [pingH, findNodeH, getPeersH, announceH]
78 78
79{----------------------------------------------------------------------- 79{-----------------------------------------------------------------------
80-- Query
81-----------------------------------------------------------------------}
82
83findNodeQ :: Address ip => NodeId -> Iteration ip NodeAddr NodeInfo
84findNodeQ nid addr = do
85 NodeFound closest <- FindNode nid <@> addr
86 return $ Right closest
87
88getPeersQ :: Address ip => InfoHash -> Iteration ip NodeInfo PeerAddr
89getPeersQ topic NodeInfo {..} = do
90 GotPeers {..} <- GetPeers topic <@> nodeAddr
91 return peers
92
93{-----------------------------------------------------------------------
94-- DHT operations 80-- DHT operations
95-----------------------------------------------------------------------} 81-----------------------------------------------------------------------}
96 82
@@ -107,48 +93,36 @@ dht = runDHT handlers
107-- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping 93-- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping
108-- process can take up to 5 minutes. 94-- process can take up to 5 minutes.
109-- 95--
110-- (TODO) This operation is asynchronous and do not block. 96-- This operation is synchronous and do block, use 'async' if needed.
111-- 97--
112bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () 98bootstrap :: Address ip => [NodeAddr ip] -> DHT ip ()
113bootstrap startNodes = do 99bootstrap startNodes = do
114 $(logInfoS) "bootstrap" "Start node bootstrapping" 100 $(logInfoS) "bootstrap" "Start node bootstrapping"
115 M.mapM_ insertClosest startNodes 101 nid <- getNodeId
116 $(logInfoS) "bootstrap" "Node bootstrapping finished" 102 aliveNodes <- queryParallel (ping <$> startNodes)
117 where 103 _ <- sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume
118 insertClosest addr = do 104 $(logInfoS) "bootstrap" "Node bootstrapping finished"
119 t <- getTable 105-- t <- getTable
120 unless (full t) $ do 106-- unless (full t) $ do
121 nid <- getNodeId 107-- nid <- getNodeId
122 result <- try $ FindNode nid <@> addr
123 case result of
124 Left e -> do
125 $(logWarnS) "bootstrap" $ T.pack $ show (e :: QueryFailure)
126
127 Right (NodeFound closest) -> do
128 $(logDebug) $ "Get a list of closest nodes: " <>
129 T.pack (PP.render (pretty closest))
130 forM_ closest $ \ info @ NodeInfo {..} -> do
131 let prettyAddr = T.pack (show (pretty nodeAddr))
132 $(logInfoS) "bootstrap" $ "table detalization" <> prettyAddr
133 insertClosest nodeAddr
134 108
135-- | Get list of peers which downloading this torrent. 109-- | Get list of peers which downloading this torrent.
136-- 110--
137-- (TODO) This operation is synchronous and do block. 111-- This operation is incremental and do block.
138-- 112--
139lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip] 113lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip]
140lookup topic = do -- TODO retry getClosestHash if bucket is empty 114lookup topic = do -- TODO retry getClosest if bucket is empty
141 closest <- lift $ getClosestHash topic 115 closest <- lift $ getClosest topic
142 sourceList [closest] $= search (getPeersQ topic) 116 sourceList [closest] $= search topic (getPeersQ topic)
143 117
144-- | Announce that /this/ peer may have some pieces of the specified 118-- | Announce that /this/ peer may have some pieces of the specified
145-- torrent. 119-- torrent.
146-- 120--
147-- (TODO) This operation is asynchronous and do not block. 121-- This operation is synchronous and do block, use 'async' if needed.
148-- 122--
149insert :: Address ip => InfoHash -> PortNumber -> DHT ip () 123insert :: Address ip => InfoHash -> PortNumber -> DHT ip ()
150insert ih port = do 124insert ih port = do
151 nodes <- getClosestHash ih 125 nodes <- getClosest ih
152 forM_ (nodeAddr <$> nodes) $ \ addr -> do 126 forM_ (nodeAddr <$> nodes) $ \ addr -> do
153-- GotPeers {..} <- GetPeers ih <@> addr 127-- GotPeers {..} <- GetPeers ih <@> addr
154-- Announced <- Announce False ih undefined grantedToken <@> addr 128-- Announced <- Announce False ih undefined grantedToken <@> addr
@@ -156,7 +130,7 @@ insert ih port = do
156 130
157-- | Stop announcing /this/ peer for the specified torrent. 131-- | Stop announcing /this/ peer for the specified torrent.
158-- 132--
159-- This operation is atomic and do not block. 133-- This operation is atomic and may block for a while.
160-- 134--
161delete :: Address ip => InfoHash -> DHT ip () 135delete :: Address ip => InfoHash -> DHT ip ()
162delete = error "DHT.delete: not implemented" \ No newline at end of file 136delete = error "DHT.delete: not implemented" \ No newline at end of file
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index f08d85c6..1a199595 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -35,14 +35,20 @@ module Network.BitTorrent.DHT.Session
35 -- * Messaging 35 -- * Messaging
36 -- ** Initiate 36 -- ** Initiate
37 , queryNode 37 , queryNode
38 , queryParallel
38 , (<@>) 39 , (<@>)
39 40
40 -- ** Accept 41 -- ** Accept
41 , NodeHandler 42 , NodeHandler
42 , nodeHandler 43 , nodeHandler
43 44
44 -- ** Iterate 45 -- ** Search
46 , ping
47
45 , Iteration 48 , Iteration
49 , findNodeQ
50 , getPeersQ
51
46 , Search 52 , Search
47 , search 53 , search
48 ) where 54 ) where
@@ -52,6 +58,7 @@ import Prelude hiding (ioError)
52import Control.Applicative 58import Control.Applicative
53import Control.Concurrent.STM 59import Control.Concurrent.STM
54import Control.Concurrent.Lifted hiding (yield) 60import Control.Concurrent.Lifted hiding (yield)
61import Control.Concurrent.Async.Lifted
55import Control.Exception.Lifted hiding (Handler) 62import Control.Exception.Lifted hiding (Handler)
56import Control.Monad.Base 63import Control.Monad.Base
57import Control.Monad.Logger 64import Control.Monad.Logger
@@ -60,9 +67,11 @@ import Control.Monad.Trans.Control
60import Control.Monad.Trans.Resource 67import Control.Monad.Trans.Resource
61import Data.Conduit 68import Data.Conduit
62import Data.Default 69import Data.Default
70import Data.Either
63import Data.Fixed 71import Data.Fixed
64import Data.Hashable 72import Data.Hashable
65import Data.List as L 73import Data.List as L
74import Data.Maybe
66import Data.Monoid 75import Data.Monoid
67import Data.Text as T 76import Data.Text as T
68import Data.Time 77import Data.Time
@@ -95,6 +104,18 @@ defaultAlpha = 3
95 104
96-- TODO add replication loop 105-- TODO add replication loop
97 106
107-- TODO do not insert infohash -> peeraddr if infohash is too far from
108-- this node id
109
110data Order
111 = NearFirst
112 | FarFirst
113 | Random
114
115data Traversal
116 = Greedy -- ^ aggressive short-circuit traversal
117 | Exhaustive -- ^
118
98-- | Original Kamelia DHT uses term /publish/ for data replication 119-- | Original Kamelia DHT uses term /publish/ for data replication
99-- process. BitTorrent DHT uses term /announce/ since the purpose of 120-- process. BitTorrent DHT uses term /announce/ since the purpose of
100-- the DHT is peer discovery. Later in documentation, we use terms 121-- the DHT is peer discovery. Later in documentation, we use terms
@@ -253,10 +274,10 @@ runDHT handlers opts naddr action = runResourceT $ do
253-----------------------------------------------------------------------} 274-----------------------------------------------------------------------}
254 275
255routing :: Address ip => Routing ip a -> DHT ip (Maybe a) 276routing :: Address ip => Routing ip a -> DHT ip (Maybe a)
256routing = runRouting ping refreshNodes getTimestamp 277routing = runRouting probeNode refreshNodes getTimestamp
257 278
258ping :: Address ip => NodeAddr ip -> DHT ip Bool 279probeNode :: Address ip => NodeAddr ip -> DHT ip Bool
259ping addr = do 280probeNode addr = do
260 $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) 281 $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr)))
261 result <- try $ Ping <@> addr 282 result <- try $ Ping <@> addr
262 let _ = result :: Either SomeException Ping 283 let _ = result :: Either SomeException Ping
@@ -384,19 +405,29 @@ getPeerList ih = do
384 405
385-- | Throws exception if node is not responding. 406-- | Throws exception if node is not responding.
386queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 407queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
387 => NodeAddr ip -> a -> DHT ip b 408 => NodeAddr ip -> a -> DHT ip (NodeId, b)
388queryNode addr q = do 409queryNode addr q = do
389 nid <- getNodeId 410 nid <- getNodeId
390 Response remoteId r <- query (toSockAddr addr) (Query nid q) 411 Response remoteId r <- query (toSockAddr addr) (Query nid q)
391 insertNode (NodeInfo remoteId addr) 412 insertNode (NodeInfo remoteId addr)
392 return r 413 return (remoteId, r)
393 414
394-- | Infix version of 'queryNode' function. 415-- | Infix version of 'queryNode' function.
395(<@>) :: Address ip => KRPC (Query a) (Response b) 416(<@>) :: Address ip => KRPC (Query a) (Response b)
396 => a -> NodeAddr ip -> DHT ip b 417 => a -> NodeAddr ip -> DHT ip b
397(<@>) = flip queryNode 418q <@> addr = snd <$> queryNode addr q
398{-# INLINE (<@>) #-} 419{-# INLINE (<@>) #-}
399 420
421-- TODO: use alpha
422-- | Failed queries are ignored.
423queryParallel :: [DHT ip a] -> DHT ip [a]
424queryParallel queries = do
425 alpha <- asks (optAlpha . options)
426 cleanup <$> mapConcurrently try queries
427 where
428 cleanup :: [Either QueryFailure a] -> [a]
429 cleanup = mapMaybe (either (const Nothing) Just)
430
400type NodeHandler ip = Handler (DHT ip) 431type NodeHandler ip = Handler (DHT ip)
401 432
402nodeHandler :: Address ip => KRPC (Query a) (Response b) 433nodeHandler :: Address ip => KRPC (Query a) (Response b)
@@ -412,16 +443,41 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
412-- Search 443-- Search
413-----------------------------------------------------------------------} 444-----------------------------------------------------------------------}
414 445
415type Iteration ip i o = i ip -> DHT ip (Either [i ip] [o ip]) 446ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip)
416type Search ip i o = Conduit [i ip] (DHT ip) [o ip] 447ping addr = do
417 448 (nid, Ping) <- queryNode addr Ping
418-- TODO: use all inputs 449 return (NodeInfo nid addr)
419search :: Address ip => Iteration ip i o -> Search ip i o 450
420search action = do 451type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
421 alpha <- lift $ asks (optAlpha . options) 452
422 awaitForever $ \ inputs -> do 453-- TODO match with expected node id
423 forM_ (L.take alpha inputs) $ \ input -> do 454findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo
424 result <- lift $ try $ action input 455findNodeQ nid NodeInfo {..} = do
425 case result of 456 NodeFound closest <- FindNode nid <@> nodeAddr
426 Left e -> let _ = e :: IOError in return () 457 return $ Right closest
427 Right r -> either leftover yield r 458
459isLeft :: Either a b -> Bool
460isLeft (Right _) = False
461isLeft (Left _) = True
462
463getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr
464getPeersQ topic NodeInfo {..} = do
465 GotPeers {..} <- GetPeers topic <@> nodeAddr
466 let dist = distance (toNodeId topic) nodeId
467 $(logInfoS) "getPeersQ" $ T.pack
468 $ "distance: " <> render (pretty dist) <> " , result: "
469 <> if isLeft peers then "NODES" else "PEERS"
470 return peers
471
472type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
473
474-- TODO: use reorder and filter (Traversal option) leftovers
475search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
476search k action = do
477 awaitForever $ \ inputs -> unless (L.null inputs) $ do
478 $(logWarnS) "search" "start query"
479 responses <- lift $ queryParallel (action <$> inputs)
480 let (nodes, results) = partitionEithers responses
481 $(logWarnS) "search" "done query"
482 leftover $ L.concat nodes
483 mapM_ yield results