diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/Core/NodeInfo.hs | 7 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 58 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 96 |
3 files changed, 99 insertions, 62 deletions
diff --git a/src/Network/BitTorrent/Core/NodeInfo.hs b/src/Network/BitTorrent/Core/NodeInfo.hs index fe2357a4..2fd7e575 100644 --- a/src/Network/BitTorrent/Core/NodeInfo.hs +++ b/src/Network/BitTorrent/Core/NodeInfo.hs | |||
@@ -47,6 +47,7 @@ import Data.ByteString.Base16 as Base16 | |||
47 | import Data.BEncode as BE | 47 | import Data.BEncode as BE |
48 | import Data.Default | 48 | import Data.Default |
49 | import Data.Hashable | 49 | import Data.Hashable |
50 | import Data.Foldable | ||
50 | import Data.IP | 51 | import Data.IP |
51 | import Data.List as L | 52 | import Data.List as L |
52 | import Data.Monoid | 53 | import Data.Monoid |
@@ -125,6 +126,12 @@ genNodeId = NodeId <$> getEntropy nodeIdSize | |||
125 | newtype NodeDistance = NodeDistance BS.ByteString | 126 | newtype NodeDistance = NodeDistance BS.ByteString |
126 | deriving (Eq, Ord) | 127 | deriving (Eq, Ord) |
127 | 128 | ||
129 | instance Pretty NodeDistance where | ||
130 | pretty (NodeDistance bs) = foldMap bitseq $ BS.unpack bs | ||
131 | where | ||
132 | listBits w = L.map (testBit w) (L.reverse [0..bitSize w - 1]) | ||
133 | bitseq = foldMap (int . fromEnum) . listBits | ||
134 | |||
128 | -- | distance(A,B) = |A xor B| Smaller values are closer. | 135 | -- | distance(A,B) = |A xor B| Smaller values are closer. |
129 | distance :: NodeId -> NodeId -> NodeDistance | 136 | distance :: NodeId -> NodeId -> NodeDistance |
130 | distance (NodeId a) (NodeId b) = NodeDistance (BS.pack (BS.zipWith xor a b)) | 137 | distance (NodeId a) (NodeId b) = NodeDistance (BS.pack (BS.zipWith xor a b)) |
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 7f1fa295..7c892349 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -77,20 +77,6 @@ handlers :: Address ip => [NodeHandler ip] | |||
77 | handlers = [pingH, findNodeH, getPeersH, announceH] | 77 | handlers = [pingH, findNodeH, getPeersH, announceH] |
78 | 78 | ||
79 | {----------------------------------------------------------------------- | 79 | {----------------------------------------------------------------------- |
80 | -- Query | ||
81 | -----------------------------------------------------------------------} | ||
82 | |||
83 | findNodeQ :: Address ip => NodeId -> Iteration ip NodeAddr NodeInfo | ||
84 | findNodeQ nid addr = do | ||
85 | NodeFound closest <- FindNode nid <@> addr | ||
86 | return $ Right closest | ||
87 | |||
88 | getPeersQ :: Address ip => InfoHash -> Iteration ip NodeInfo PeerAddr | ||
89 | getPeersQ topic NodeInfo {..} = do | ||
90 | GotPeers {..} <- GetPeers topic <@> nodeAddr | ||
91 | return peers | ||
92 | |||
93 | {----------------------------------------------------------------------- | ||
94 | -- DHT operations | 80 | -- DHT operations |
95 | -----------------------------------------------------------------------} | 81 | -----------------------------------------------------------------------} |
96 | 82 | ||
@@ -107,48 +93,36 @@ dht = runDHT handlers | |||
107 | -- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping | 93 | -- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping |
108 | -- process can take up to 5 minutes. | 94 | -- process can take up to 5 minutes. |
109 | -- | 95 | -- |
110 | -- (TODO) This operation is asynchronous and do not block. | 96 | -- This operation is synchronous and do block, use 'async' if needed. |
111 | -- | 97 | -- |
112 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () | 98 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () |
113 | bootstrap startNodes = do | 99 | bootstrap startNodes = do |
114 | $(logInfoS) "bootstrap" "Start node bootstrapping" | 100 | $(logInfoS) "bootstrap" "Start node bootstrapping" |
115 | M.mapM_ insertClosest startNodes | 101 | nid <- getNodeId |
116 | $(logInfoS) "bootstrap" "Node bootstrapping finished" | 102 | aliveNodes <- queryParallel (ping <$> startNodes) |
117 | where | 103 | _ <- sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume |
118 | insertClosest addr = do | 104 | $(logInfoS) "bootstrap" "Node bootstrapping finished" |
119 | t <- getTable | 105 | -- t <- getTable |
120 | unless (full t) $ do | 106 | -- unless (full t) $ do |
121 | nid <- getNodeId | 107 | -- nid <- getNodeId |
122 | result <- try $ FindNode nid <@> addr | ||
123 | case result of | ||
124 | Left e -> do | ||
125 | $(logWarnS) "bootstrap" $ T.pack $ show (e :: QueryFailure) | ||
126 | |||
127 | Right (NodeFound closest) -> do | ||
128 | $(logDebug) $ "Get a list of closest nodes: " <> | ||
129 | T.pack (PP.render (pretty closest)) | ||
130 | forM_ closest $ \ info @ NodeInfo {..} -> do | ||
131 | let prettyAddr = T.pack (show (pretty nodeAddr)) | ||
132 | $(logInfoS) "bootstrap" $ "table detalization" <> prettyAddr | ||
133 | insertClosest nodeAddr | ||
134 | 108 | ||
135 | -- | Get list of peers which downloading this torrent. | 109 | -- | Get list of peers which downloading this torrent. |
136 | -- | 110 | -- |
137 | -- (TODO) This operation is synchronous and do block. | 111 | -- This operation is incremental and do block. |
138 | -- | 112 | -- |
139 | lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip] | 113 | lookup :: Address ip => InfoHash -> DHT ip `Source` [PeerAddr ip] |
140 | lookup topic = do -- TODO retry getClosestHash if bucket is empty | 114 | lookup topic = do -- TODO retry getClosest if bucket is empty |
141 | closest <- lift $ getClosestHash topic | 115 | closest <- lift $ getClosest topic |
142 | sourceList [closest] $= search (getPeersQ topic) | 116 | sourceList [closest] $= search topic (getPeersQ topic) |
143 | 117 | ||
144 | -- | Announce that /this/ peer may have some pieces of the specified | 118 | -- | Announce that /this/ peer may have some pieces of the specified |
145 | -- torrent. | 119 | -- torrent. |
146 | -- | 120 | -- |
147 | -- (TODO) This operation is asynchronous and do not block. | 121 | -- This operation is synchronous and do block, use 'async' if needed. |
148 | -- | 122 | -- |
149 | insert :: Address ip => InfoHash -> PortNumber -> DHT ip () | 123 | insert :: Address ip => InfoHash -> PortNumber -> DHT ip () |
150 | insert ih port = do | 124 | insert ih port = do |
151 | nodes <- getClosestHash ih | 125 | nodes <- getClosest ih |
152 | forM_ (nodeAddr <$> nodes) $ \ addr -> do | 126 | forM_ (nodeAddr <$> nodes) $ \ addr -> do |
153 | -- GotPeers {..} <- GetPeers ih <@> addr | 127 | -- GotPeers {..} <- GetPeers ih <@> addr |
154 | -- Announced <- Announce False ih undefined grantedToken <@> addr | 128 | -- Announced <- Announce False ih undefined grantedToken <@> addr |
@@ -156,7 +130,7 @@ insert ih port = do | |||
156 | 130 | ||
157 | -- | Stop announcing /this/ peer for the specified torrent. | 131 | -- | Stop announcing /this/ peer for the specified torrent. |
158 | -- | 132 | -- |
159 | -- This operation is atomic and do not block. | 133 | -- This operation is atomic and may block for a while. |
160 | -- | 134 | -- |
161 | delete :: Address ip => InfoHash -> DHT ip () | 135 | delete :: Address ip => InfoHash -> DHT ip () |
162 | delete = error "DHT.delete: not implemented" \ No newline at end of file | 136 | delete = error "DHT.delete: not implemented" \ No newline at end of file |
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index f08d85c6..1a199595 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -35,14 +35,20 @@ module Network.BitTorrent.DHT.Session | |||
35 | -- * Messaging | 35 | -- * Messaging |
36 | -- ** Initiate | 36 | -- ** Initiate |
37 | , queryNode | 37 | , queryNode |
38 | , queryParallel | ||
38 | , (<@>) | 39 | , (<@>) |
39 | 40 | ||
40 | -- ** Accept | 41 | -- ** Accept |
41 | , NodeHandler | 42 | , NodeHandler |
42 | , nodeHandler | 43 | , nodeHandler |
43 | 44 | ||
44 | -- ** Iterate | 45 | -- ** Search |
46 | , ping | ||
47 | |||
45 | , Iteration | 48 | , Iteration |
49 | , findNodeQ | ||
50 | , getPeersQ | ||
51 | |||
46 | , Search | 52 | , Search |
47 | , search | 53 | , search |
48 | ) where | 54 | ) where |
@@ -52,6 +58,7 @@ import Prelude hiding (ioError) | |||
52 | import Control.Applicative | 58 | import Control.Applicative |
53 | import Control.Concurrent.STM | 59 | import Control.Concurrent.STM |
54 | import Control.Concurrent.Lifted hiding (yield) | 60 | import Control.Concurrent.Lifted hiding (yield) |
61 | import Control.Concurrent.Async.Lifted | ||
55 | import Control.Exception.Lifted hiding (Handler) | 62 | import Control.Exception.Lifted hiding (Handler) |
56 | import Control.Monad.Base | 63 | import Control.Monad.Base |
57 | import Control.Monad.Logger | 64 | import Control.Monad.Logger |
@@ -60,9 +67,11 @@ import Control.Monad.Trans.Control | |||
60 | import Control.Monad.Trans.Resource | 67 | import Control.Monad.Trans.Resource |
61 | import Data.Conduit | 68 | import Data.Conduit |
62 | import Data.Default | 69 | import Data.Default |
70 | import Data.Either | ||
63 | import Data.Fixed | 71 | import Data.Fixed |
64 | import Data.Hashable | 72 | import Data.Hashable |
65 | import Data.List as L | 73 | import Data.List as L |
74 | import Data.Maybe | ||
66 | import Data.Monoid | 75 | import Data.Monoid |
67 | import Data.Text as T | 76 | import Data.Text as T |
68 | import Data.Time | 77 | import Data.Time |
@@ -95,6 +104,18 @@ defaultAlpha = 3 | |||
95 | 104 | ||
96 | -- TODO add replication loop | 105 | -- TODO add replication loop |
97 | 106 | ||
107 | -- TODO do not insert infohash -> peeraddr if infohash is too far from | ||
108 | -- this node id | ||
109 | |||
110 | data Order | ||
111 | = NearFirst | ||
112 | | FarFirst | ||
113 | | Random | ||
114 | |||
115 | data Traversal | ||
116 | = Greedy -- ^ aggressive short-circuit traversal | ||
117 | | Exhaustive -- ^ | ||
118 | |||
98 | -- | Original Kamelia DHT uses term /publish/ for data replication | 119 | -- | Original Kamelia DHT uses term /publish/ for data replication |
99 | -- process. BitTorrent DHT uses term /announce/ since the purpose of | 120 | -- process. BitTorrent DHT uses term /announce/ since the purpose of |
100 | -- the DHT is peer discovery. Later in documentation, we use terms | 121 | -- the DHT is peer discovery. Later in documentation, we use terms |
@@ -253,10 +274,10 @@ runDHT handlers opts naddr action = runResourceT $ do | |||
253 | -----------------------------------------------------------------------} | 274 | -----------------------------------------------------------------------} |
254 | 275 | ||
255 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) | 276 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) |
256 | routing = runRouting ping refreshNodes getTimestamp | 277 | routing = runRouting probeNode refreshNodes getTimestamp |
257 | 278 | ||
258 | ping :: Address ip => NodeAddr ip -> DHT ip Bool | 279 | probeNode :: Address ip => NodeAddr ip -> DHT ip Bool |
259 | ping addr = do | 280 | probeNode addr = do |
260 | $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) | 281 | $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) |
261 | result <- try $ Ping <@> addr | 282 | result <- try $ Ping <@> addr |
262 | let _ = result :: Either SomeException Ping | 283 | let _ = result :: Either SomeException Ping |
@@ -384,19 +405,29 @@ getPeerList ih = do | |||
384 | 405 | ||
385 | -- | Throws exception if node is not responding. | 406 | -- | Throws exception if node is not responding. |
386 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 407 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
387 | => NodeAddr ip -> a -> DHT ip b | 408 | => NodeAddr ip -> a -> DHT ip (NodeId, b) |
388 | queryNode addr q = do | 409 | queryNode addr q = do |
389 | nid <- getNodeId | 410 | nid <- getNodeId |
390 | Response remoteId r <- query (toSockAddr addr) (Query nid q) | 411 | Response remoteId r <- query (toSockAddr addr) (Query nid q) |
391 | insertNode (NodeInfo remoteId addr) | 412 | insertNode (NodeInfo remoteId addr) |
392 | return r | 413 | return (remoteId, r) |
393 | 414 | ||
394 | -- | Infix version of 'queryNode' function. | 415 | -- | Infix version of 'queryNode' function. |
395 | (<@>) :: Address ip => KRPC (Query a) (Response b) | 416 | (<@>) :: Address ip => KRPC (Query a) (Response b) |
396 | => a -> NodeAddr ip -> DHT ip b | 417 | => a -> NodeAddr ip -> DHT ip b |
397 | (<@>) = flip queryNode | 418 | q <@> addr = snd <$> queryNode addr q |
398 | {-# INLINE (<@>) #-} | 419 | {-# INLINE (<@>) #-} |
399 | 420 | ||
421 | -- TODO: use alpha | ||
422 | -- | Failed queries are ignored. | ||
423 | queryParallel :: [DHT ip a] -> DHT ip [a] | ||
424 | queryParallel queries = do | ||
425 | alpha <- asks (optAlpha . options) | ||
426 | cleanup <$> mapConcurrently try queries | ||
427 | where | ||
428 | cleanup :: [Either QueryFailure a] -> [a] | ||
429 | cleanup = mapMaybe (either (const Nothing) Just) | ||
430 | |||
400 | type NodeHandler ip = Handler (DHT ip) | 431 | type NodeHandler ip = Handler (DHT ip) |
401 | 432 | ||
402 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | 433 | nodeHandler :: Address ip => KRPC (Query a) (Response b) |
@@ -412,16 +443,41 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | |||
412 | -- Search | 443 | -- Search |
413 | -----------------------------------------------------------------------} | 444 | -----------------------------------------------------------------------} |
414 | 445 | ||
415 | type Iteration ip i o = i ip -> DHT ip (Either [i ip] [o ip]) | 446 | ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) |
416 | type Search ip i o = Conduit [i ip] (DHT ip) [o ip] | 447 | ping addr = do |
417 | 448 | (nid, Ping) <- queryNode addr Ping | |
418 | -- TODO: use all inputs | 449 | return (NodeInfo nid addr) |
419 | search :: Address ip => Iteration ip i o -> Search ip i o | 450 | |
420 | search action = do | 451 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) |
421 | alpha <- lift $ asks (optAlpha . options) | 452 | |
422 | awaitForever $ \ inputs -> do | 453 | -- TODO match with expected node id |
423 | forM_ (L.take alpha inputs) $ \ input -> do | 454 | findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo |
424 | result <- lift $ try $ action input | 455 | findNodeQ nid NodeInfo {..} = do |
425 | case result of | 456 | NodeFound closest <- FindNode nid <@> nodeAddr |
426 | Left e -> let _ = e :: IOError in return () | 457 | return $ Right closest |
427 | Right r -> either leftover yield r | 458 | |
459 | isLeft :: Either a b -> Bool | ||
460 | isLeft (Right _) = False | ||
461 | isLeft (Left _) = True | ||
462 | |||
463 | getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr | ||
464 | getPeersQ topic NodeInfo {..} = do | ||
465 | GotPeers {..} <- GetPeers topic <@> nodeAddr | ||
466 | let dist = distance (toNodeId topic) nodeId | ||
467 | $(logInfoS) "getPeersQ" $ T.pack | ||
468 | $ "distance: " <> render (pretty dist) <> " , result: " | ||
469 | <> if isLeft peers then "NODES" else "PEERS" | ||
470 | return peers | ||
471 | |||
472 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | ||
473 | |||
474 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
475 | search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o | ||
476 | search k action = do | ||
477 | awaitForever $ \ inputs -> unless (L.null inputs) $ do | ||
478 | $(logWarnS) "search" "start query" | ||
479 | responses <- lift $ queryParallel (action <$> inputs) | ||
480 | let (nodes, results) = partitionEithers responses | ||
481 | $(logWarnS) "search" "done query" | ||
482 | leftover $ L.concat nodes | ||
483 | mapM_ yield results | ||