diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-13 11:13:42 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-13 11:13:42 +0400 |
commit | 84486e1bed75ae3ece0217fb330c6ca648931bb3 (patch) | |
tree | 69fe3bf2a0832ef6a2c3a7ac93ec829454e6d8d4 /src/Network/BitTorrent | |
parent | a43a810ff24bc1ab946e5459c0f914aca286c62f (diff) |
Refactor DHT module
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 61 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 86 |
2 files changed, 84 insertions, 63 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 7c892349..71803ccf 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -28,58 +28,17 @@ module Network.BitTorrent.DHT | |||
28 | ) where | 28 | ) where |
29 | 29 | ||
30 | import Control.Applicative | 30 | import Control.Applicative |
31 | import Control.Exception.Lifted | ||
32 | import Control.Monad as M | ||
33 | import Control.Monad.Logger | 31 | import Control.Monad.Logger |
34 | import Control.Monad.Trans | 32 | import Control.Monad.Trans |
35 | import Data.Conduit as C | 33 | import Data.Conduit as C |
36 | import Data.Conduit.List as C | 34 | import Data.Conduit.List as C |
37 | import Data.List as L | ||
38 | import Data.Monoid | ||
39 | import Data.Text as T | ||
40 | import Network.Socket (PortNumber) | 35 | import Network.Socket (PortNumber) |
41 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
42 | import Text.PrettyPrint.Class | ||
43 | 36 | ||
44 | import Data.Torrent.InfoHash | 37 | import Data.Torrent.InfoHash |
45 | import Network.KRPC (QueryFailure) | ||
46 | import Network.BitTorrent.Core | 38 | import Network.BitTorrent.Core |
47 | import Network.BitTorrent.DHT.Message | ||
48 | import Network.BitTorrent.DHT.Routing | ||
49 | import Network.BitTorrent.DHT.Session | 39 | import Network.BitTorrent.DHT.Session |
50 | 40 | ||
51 | 41 | ||
52 | {----------------------------------------------------------------------- | ||
53 | -- Handlers | ||
54 | -----------------------------------------------------------------------} | ||
55 | |||
56 | pingH :: Address ip => NodeHandler ip | ||
57 | pingH = nodeHandler $ \ _ Ping -> do | ||
58 | return Ping | ||
59 | |||
60 | findNodeH :: Address ip => NodeHandler ip | ||
61 | findNodeH = nodeHandler $ \ _ (FindNode nid) -> do | ||
62 | NodeFound <$> getClosest nid | ||
63 | |||
64 | getPeersH :: Address ip => NodeHandler ip | ||
65 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do | ||
66 | GotPeers <$> getPeerList ih <*> grantToken naddr | ||
67 | |||
68 | announceH :: Address ip => NodeHandler ip | ||
69 | announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do | ||
70 | checkToken naddr sessionToken | ||
71 | let annPort = if impliedPort then nodePort else port | ||
72 | let peerAddr = PeerAddr Nothing nodeHost annPort | ||
73 | insertPeer topic peerAddr | ||
74 | return Announced | ||
75 | |||
76 | handlers :: Address ip => [NodeHandler ip] | ||
77 | handlers = [pingH, findNodeH, getPeersH, announceH] | ||
78 | |||
79 | {----------------------------------------------------------------------- | ||
80 | -- DHT operations | ||
81 | -----------------------------------------------------------------------} | ||
82 | |||
83 | -- | Run DHT on specified port. <add note about resources> | 42 | -- | Run DHT on specified port. <add note about resources> |
84 | dht :: Address ip | 43 | dht :: Address ip |
85 | => Options -- ^ normally you need to use 'Data.Default.def'; | 44 | => Options -- ^ normally you need to use 'Data.Default.def'; |
@@ -93,7 +52,8 @@ dht = runDHT handlers | |||
93 | -- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping | 52 | -- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping |
94 | -- process can take up to 5 minutes. | 53 | -- process can take up to 5 minutes. |
95 | -- | 54 | -- |
96 | -- This operation is synchronous and do block, use 'async' if needed. | 55 | -- This operation is synchronous and do block, use |
56 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | ||
97 | -- | 57 | -- |
98 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () | 58 | bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () |
99 | bootstrap startNodes = do | 59 | bootstrap startNodes = do |
@@ -118,19 +78,18 @@ lookup topic = do -- TODO retry getClosest if bucket is empty | |||
118 | -- | Announce that /this/ peer may have some pieces of the specified | 78 | -- | Announce that /this/ peer may have some pieces of the specified |
119 | -- torrent. | 79 | -- torrent. |
120 | -- | 80 | -- |
121 | -- This operation is synchronous and do block, use 'async' if needed. | 81 | -- This operation is synchronous and do block, use |
82 | -- 'Control.Concurrent.Async.Lifted.async' if needed. | ||
122 | -- | 83 | -- |
123 | insert :: Address ip => InfoHash -> PortNumber -> DHT ip () | 84 | insert :: Address ip => InfoHash -> PortNumber -> DHT ip () |
124 | insert ih port = do | 85 | insert ih p = do |
125 | nodes <- getClosest ih | 86 | publish ih p |
126 | forM_ (nodeAddr <$> nodes) $ \ addr -> do | 87 | insertTopic ih p |
127 | -- GotPeers {..} <- GetPeers ih <@> addr | ||
128 | -- Announced <- Announce False ih undefined grantedToken <@> addr | ||
129 | return () | ||
130 | 88 | ||
131 | -- | Stop announcing /this/ peer for the specified torrent. | 89 | -- | Stop announcing /this/ peer for the specified torrent. |
132 | -- | 90 | -- |
133 | -- This operation is atomic and may block for a while. | 91 | -- This operation is atomic and may block for a while. |
134 | -- | 92 | -- |
135 | delete :: Address ip => InfoHash -> DHT ip () | 93 | delete :: Address ip => InfoHash -> PortNumber -> DHT ip () |
136 | delete = error "DHT.delete: not implemented" \ No newline at end of file | 94 | delete = deleteTopic |
95 | {-# INLINE delete #-} \ No newline at end of file | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 1a199595..e26cbad1 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -31,26 +31,36 @@ module Network.BitTorrent.DHT.Session | |||
31 | -- * Peer storage | 31 | -- * Peer storage |
32 | , insertPeer | 32 | , insertPeer |
33 | , getPeerList | 33 | , getPeerList |
34 | , insertTopic | ||
35 | , deleteTopic | ||
34 | 36 | ||
35 | -- * Messaging | 37 | -- * Messaging |
36 | -- ** Initiate | 38 | -- ** Initiate |
37 | , queryNode | 39 | , queryNode |
38 | , queryParallel | 40 | , queryParallel |
39 | , (<@>) | 41 | , (<@>) |
42 | , ping | ||
40 | 43 | ||
41 | -- ** Accept | 44 | -- ** Accept |
42 | , NodeHandler | 45 | , NodeHandler |
43 | , nodeHandler | 46 | , nodeHandler |
44 | 47 | , pingH | |
45 | -- ** Search | 48 | , findNodeH |
46 | , ping | 49 | , getPeersH |
47 | 50 | , announceH | |
51 | , handlers | ||
52 | |||
53 | -- * Search | ||
54 | -- ** Step | ||
48 | , Iteration | 55 | , Iteration |
49 | , findNodeQ | 56 | , findNodeQ |
50 | , getPeersQ | 57 | , getPeersQ |
58 | , announceQ | ||
51 | 59 | ||
60 | -- ** Traversal | ||
52 | , Search | 61 | , Search |
53 | , search | 62 | , search |
63 | , publish | ||
54 | ) where | 64 | ) where |
55 | 65 | ||
56 | import Prelude hiding (ioError) | 66 | import Prelude hiding (ioError) |
@@ -66,6 +76,7 @@ import Control.Monad.Reader | |||
66 | import Control.Monad.Trans.Control | 76 | import Control.Monad.Trans.Control |
67 | import Control.Monad.Trans.Resource | 77 | import Control.Monad.Trans.Resource |
68 | import Data.Conduit | 78 | import Data.Conduit |
79 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | ||
69 | import Data.Default | 80 | import Data.Default |
70 | import Data.Either | 81 | import Data.Either |
71 | import Data.Fixed | 82 | import Data.Fixed |
@@ -76,9 +87,10 @@ import Data.Monoid | |||
76 | import Data.Text as T | 87 | import Data.Text as T |
77 | import Data.Time | 88 | import Data.Time |
78 | import Data.Time.Clock.POSIX | 89 | import Data.Time.Clock.POSIX |
90 | import Network (PortNumber) | ||
79 | import System.Log.FastLogger | 91 | import System.Log.FastLogger |
80 | import System.Random (randomIO) | 92 | import System.Random (randomIO) |
81 | import Text.PrettyPrint as PP hiding ((<>)) | 93 | import Text.PrettyPrint as PP hiding ((<>), ($$)) |
82 | import Text.PrettyPrint.Class | 94 | import Text.PrettyPrint.Class |
83 | 95 | ||
84 | import Data.Torrent.InfoHash | 96 | import Data.Torrent.InfoHash |
@@ -399,6 +411,12 @@ getPeerList ih = do | |||
399 | then Left <$> getClosest ih | 411 | then Left <$> getClosest ih |
400 | else return (Right ps) | 412 | else return (Right ps) |
401 | 413 | ||
414 | insertTopic :: InfoHash -> PortNumber -> DHT ip () | ||
415 | insertTopic = undefined | ||
416 | |||
417 | deleteTopic :: InfoHash -> PortNumber -> DHT ip () | ||
418 | deleteTopic = undefined | ||
419 | |||
402 | {----------------------------------------------------------------------- | 420 | {----------------------------------------------------------------------- |
403 | -- Messaging | 421 | -- Messaging |
404 | -----------------------------------------------------------------------} | 422 | -----------------------------------------------------------------------} |
@@ -428,6 +446,11 @@ queryParallel queries = do | |||
428 | cleanup :: [Either QueryFailure a] -> [a] | 446 | cleanup :: [Either QueryFailure a] -> [a] |
429 | cleanup = mapMaybe (either (const Nothing) Just) | 447 | cleanup = mapMaybe (either (const Nothing) Just) |
430 | 448 | ||
449 | ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) | ||
450 | ping addr = do | ||
451 | (nid, Ping) <- queryNode addr Ping | ||
452 | return (NodeInfo nid addr) | ||
453 | |||
431 | type NodeHandler ip = Handler (DHT ip) | 454 | type NodeHandler ip = Handler (DHT ip) |
432 | 455 | ||
433 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | 456 | nodeHandler :: Address ip => KRPC (Query a) (Response b) |
@@ -443,11 +466,6 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | |||
443 | -- Search | 466 | -- Search |
444 | -----------------------------------------------------------------------} | 467 | -----------------------------------------------------------------------} |
445 | 468 | ||
446 | ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) | ||
447 | ping addr = do | ||
448 | (nid, Ping) <- queryNode addr Ping | ||
449 | return (NodeInfo nid addr) | ||
450 | |||
451 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) | 469 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) |
452 | 470 | ||
453 | -- TODO match with expected node id | 471 | -- TODO match with expected node id |
@@ -469,15 +487,59 @@ getPeersQ topic NodeInfo {..} = do | |||
469 | <> if isLeft peers then "NODES" else "PEERS" | 487 | <> if isLeft peers then "NODES" else "PEERS" |
470 | return peers | 488 | return peers |
471 | 489 | ||
490 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr | ||
491 | announceQ ih p NodeInfo {..} = do | ||
492 | GotPeers {..} <- GetPeers ih <@> nodeAddr | ||
493 | case peers of | ||
494 | Left ns | ||
495 | | False -> undefined -- TODO check if we can announce | ||
496 | | otherwise -> return (Left ns) | ||
497 | Right ps -> do -- TODO *probably* add to peer cache | ||
498 | Announced <- Announce False ih p grantedToken <@> nodeAddr | ||
499 | return (Right [nodeAddr]) | ||
500 | |||
472 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | 501 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] |
473 | 502 | ||
474 | -- TODO: use reorder and filter (Traversal option) leftovers | 503 | -- TODO: use reorder and filter (Traversal option) leftovers |
475 | search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o | 504 | search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o |
476 | search k action = do | 505 | search k action = do |
477 | awaitForever $ \ inputs -> unless (L.null inputs) $ do | 506 | awaitForever $ \ batch -> unless (L.null batch) $ do |
478 | $(logWarnS) "search" "start query" | 507 | $(logWarnS) "search" "start query" |
479 | responses <- lift $ queryParallel (action <$> inputs) | 508 | responses <- lift $ queryParallel (action <$> batch) |
480 | let (nodes, results) = partitionEithers responses | 509 | let (nodes, results) = partitionEithers responses |
481 | $(logWarnS) "search" "done query" | 510 | $(logWarnS) "search" "done query" |
482 | leftover $ L.concat nodes | 511 | leftover $ L.concat nodes |
483 | mapM_ yield results | 512 | mapM_ yield results |
513 | |||
514 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () | ||
515 | publish ih port = do | ||
516 | nodes <- getClosest ih | ||
517 | _ <- sourceList [nodes] $= search ih (announceQ ih port) $$ C.take 20 | ||
518 | return () | ||
519 | |||
520 | {----------------------------------------------------------------------- | ||
521 | -- Handlers | ||
522 | -----------------------------------------------------------------------} | ||
523 | |||
524 | pingH :: Address ip => NodeHandler ip | ||
525 | pingH = nodeHandler $ \ _ Ping -> do | ||
526 | return Ping | ||
527 | |||
528 | findNodeH :: Address ip => NodeHandler ip | ||
529 | findNodeH = nodeHandler $ \ _ (FindNode nid) -> do | ||
530 | NodeFound <$> getClosest nid | ||
531 | |||
532 | getPeersH :: Address ip => NodeHandler ip | ||
533 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do | ||
534 | GotPeers <$> getPeerList ih <*> grantToken naddr | ||
535 | |||
536 | announceH :: Address ip => NodeHandler ip | ||
537 | announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do | ||
538 | checkToken naddr sessionToken | ||
539 | let annPort = if impliedPort then nodePort else port | ||
540 | let peerAddr = PeerAddr Nothing nodeHost annPort | ||
541 | insertPeer topic peerAddr | ||
542 | return Announced | ||
543 | |||
544 | handlers :: Address ip => [NodeHandler ip] | ||
545 | handlers = [pingH, findNodeH, getPeersH, announceH] | ||