summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-01-13 11:13:42 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-01-13 11:13:42 +0400
commit84486e1bed75ae3ece0217fb330c6ca648931bb3 (patch)
tree69fe3bf2a0832ef6a2c3a7ac93ec829454e6d8d4 /src/Network
parenta43a810ff24bc1ab946e5459c0f914aca286c62f (diff)
Refactor DHT module
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/DHT.hs61
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs86
2 files changed, 84 insertions, 63 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index 7c892349..71803ccf 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -28,58 +28,17 @@ module Network.BitTorrent.DHT
28 ) where 28 ) where
29 29
30import Control.Applicative 30import Control.Applicative
31import Control.Exception.Lifted
32import Control.Monad as M
33import Control.Monad.Logger 31import Control.Monad.Logger
34import Control.Monad.Trans 32import Control.Monad.Trans
35import Data.Conduit as C 33import Data.Conduit as C
36import Data.Conduit.List as C 34import Data.Conduit.List as C
37import Data.List as L
38import Data.Monoid
39import Data.Text as T
40import Network.Socket (PortNumber) 35import Network.Socket (PortNumber)
41import Text.PrettyPrint as PP hiding ((<>), ($$))
42import Text.PrettyPrint.Class
43 36
44import Data.Torrent.InfoHash 37import Data.Torrent.InfoHash
45import Network.KRPC (QueryFailure)
46import Network.BitTorrent.Core 38import Network.BitTorrent.Core
47import Network.BitTorrent.DHT.Message
48import Network.BitTorrent.DHT.Routing
49import Network.BitTorrent.DHT.Session 39import Network.BitTorrent.DHT.Session
50 40
51 41
52{-----------------------------------------------------------------------
53-- Handlers
54-----------------------------------------------------------------------}
55
56pingH :: Address ip => NodeHandler ip
57pingH = nodeHandler $ \ _ Ping -> do
58 return Ping
59
60findNodeH :: Address ip => NodeHandler ip
61findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
62 NodeFound <$> getClosest nid
63
64getPeersH :: Address ip => NodeHandler ip
65getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
66 GotPeers <$> getPeerList ih <*> grantToken naddr
67
68announceH :: Address ip => NodeHandler ip
69announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do
70 checkToken naddr sessionToken
71 let annPort = if impliedPort then nodePort else port
72 let peerAddr = PeerAddr Nothing nodeHost annPort
73 insertPeer topic peerAddr
74 return Announced
75
76handlers :: Address ip => [NodeHandler ip]
77handlers = [pingH, findNodeH, getPeersH, announceH]
78
79{-----------------------------------------------------------------------
80-- DHT operations
81-----------------------------------------------------------------------}
82
83-- | Run DHT on specified port. <add note about resources> 42-- | Run DHT on specified port. <add note about resources>
84dht :: Address ip 43dht :: Address ip
85 => Options -- ^ normally you need to use 'Data.Default.def'; 44 => Options -- ^ normally you need to use 'Data.Default.def';
@@ -93,7 +52,8 @@ dht = runDHT handlers
93-- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping 52-- usually obtained from 'Data.Torrent.tNodes' field. Bootstrapping
94-- process can take up to 5 minutes. 53-- process can take up to 5 minutes.
95-- 54--
96-- This operation is synchronous and do block, use 'async' if needed. 55-- This operation is synchronous and do block, use
56-- 'Control.Concurrent.Async.Lifted.async' if needed.
97-- 57--
98bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () 58bootstrap :: Address ip => [NodeAddr ip] -> DHT ip ()
99bootstrap startNodes = do 59bootstrap startNodes = do
@@ -118,19 +78,18 @@ lookup topic = do -- TODO retry getClosest if bucket is empty
118-- | Announce that /this/ peer may have some pieces of the specified 78-- | Announce that /this/ peer may have some pieces of the specified
119-- torrent. 79-- torrent.
120-- 80--
121-- This operation is synchronous and do block, use 'async' if needed. 81-- This operation is synchronous and do block, use
82-- 'Control.Concurrent.Async.Lifted.async' if needed.
122-- 83--
123insert :: Address ip => InfoHash -> PortNumber -> DHT ip () 84insert :: Address ip => InfoHash -> PortNumber -> DHT ip ()
124insert ih port = do 85insert ih p = do
125 nodes <- getClosest ih 86 publish ih p
126 forM_ (nodeAddr <$> nodes) $ \ addr -> do 87 insertTopic ih p
127-- GotPeers {..} <- GetPeers ih <@> addr
128-- Announced <- Announce False ih undefined grantedToken <@> addr
129 return ()
130 88
131-- | Stop announcing /this/ peer for the specified torrent. 89-- | Stop announcing /this/ peer for the specified torrent.
132-- 90--
133-- This operation is atomic and may block for a while. 91-- This operation is atomic and may block for a while.
134-- 92--
135delete :: Address ip => InfoHash -> DHT ip () 93delete :: Address ip => InfoHash -> PortNumber -> DHT ip ()
136delete = error "DHT.delete: not implemented" \ No newline at end of file 94delete = deleteTopic
95{-# INLINE delete #-} \ No newline at end of file
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 1a199595..e26cbad1 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -31,26 +31,36 @@ module Network.BitTorrent.DHT.Session
31 -- * Peer storage 31 -- * Peer storage
32 , insertPeer 32 , insertPeer
33 , getPeerList 33 , getPeerList
34 , insertTopic
35 , deleteTopic
34 36
35 -- * Messaging 37 -- * Messaging
36 -- ** Initiate 38 -- ** Initiate
37 , queryNode 39 , queryNode
38 , queryParallel 40 , queryParallel
39 , (<@>) 41 , (<@>)
42 , ping
40 43
41 -- ** Accept 44 -- ** Accept
42 , NodeHandler 45 , NodeHandler
43 , nodeHandler 46 , nodeHandler
44 47 , pingH
45 -- ** Search 48 , findNodeH
46 , ping 49 , getPeersH
47 50 , announceH
51 , handlers
52
53 -- * Search
54 -- ** Step
48 , Iteration 55 , Iteration
49 , findNodeQ 56 , findNodeQ
50 , getPeersQ 57 , getPeersQ
58 , announceQ
51 59
60 -- ** Traversal
52 , Search 61 , Search
53 , search 62 , search
63 , publish
54 ) where 64 ) where
55 65
56import Prelude hiding (ioError) 66import Prelude hiding (ioError)
@@ -66,6 +76,7 @@ import Control.Monad.Reader
66import Control.Monad.Trans.Control 76import Control.Monad.Trans.Control
67import Control.Monad.Trans.Resource 77import Control.Monad.Trans.Resource
68import Data.Conduit 78import Data.Conduit
79import Data.Conduit.List as C hiding (mapMaybe, mapM_)
69import Data.Default 80import Data.Default
70import Data.Either 81import Data.Either
71import Data.Fixed 82import Data.Fixed
@@ -76,9 +87,10 @@ import Data.Monoid
76import Data.Text as T 87import Data.Text as T
77import Data.Time 88import Data.Time
78import Data.Time.Clock.POSIX 89import Data.Time.Clock.POSIX
90import Network (PortNumber)
79import System.Log.FastLogger 91import System.Log.FastLogger
80import System.Random (randomIO) 92import System.Random (randomIO)
81import Text.PrettyPrint as PP hiding ((<>)) 93import Text.PrettyPrint as PP hiding ((<>), ($$))
82import Text.PrettyPrint.Class 94import Text.PrettyPrint.Class
83 95
84import Data.Torrent.InfoHash 96import Data.Torrent.InfoHash
@@ -399,6 +411,12 @@ getPeerList ih = do
399 then Left <$> getClosest ih 411 then Left <$> getClosest ih
400 else return (Right ps) 412 else return (Right ps)
401 413
414insertTopic :: InfoHash -> PortNumber -> DHT ip ()
415insertTopic = undefined
416
417deleteTopic :: InfoHash -> PortNumber -> DHT ip ()
418deleteTopic = undefined
419
402{----------------------------------------------------------------------- 420{-----------------------------------------------------------------------
403-- Messaging 421-- Messaging
404-----------------------------------------------------------------------} 422-----------------------------------------------------------------------}
@@ -428,6 +446,11 @@ queryParallel queries = do
428 cleanup :: [Either QueryFailure a] -> [a] 446 cleanup :: [Either QueryFailure a] -> [a]
429 cleanup = mapMaybe (either (const Nothing) Just) 447 cleanup = mapMaybe (either (const Nothing) Just)
430 448
449ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip)
450ping addr = do
451 (nid, Ping) <- queryNode addr Ping
452 return (NodeInfo nid addr)
453
431type NodeHandler ip = Handler (DHT ip) 454type NodeHandler ip = Handler (DHT ip)
432 455
433nodeHandler :: Address ip => KRPC (Query a) (Response b) 456nodeHandler :: Address ip => KRPC (Query a) (Response b)
@@ -443,11 +466,6 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
443-- Search 466-- Search
444-----------------------------------------------------------------------} 467-----------------------------------------------------------------------}
445 468
446ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip)
447ping addr = do
448 (nid, Ping) <- queryNode addr Ping
449 return (NodeInfo nid addr)
450
451type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) 469type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
452 470
453-- TODO match with expected node id 471-- TODO match with expected node id
@@ -469,15 +487,59 @@ getPeersQ topic NodeInfo {..} = do
469 <> if isLeft peers then "NODES" else "PEERS" 487 <> if isLeft peers then "NODES" else "PEERS"
470 return peers 488 return peers
471 489
490announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr
491announceQ ih p NodeInfo {..} = do
492 GotPeers {..} <- GetPeers ih <@> nodeAddr
493 case peers of
494 Left ns
495 | False -> undefined -- TODO check if we can announce
496 | otherwise -> return (Left ns)
497 Right ps -> do -- TODO *probably* add to peer cache
498 Announced <- Announce False ih p grantedToken <@> nodeAddr
499 return (Right [nodeAddr])
500
472type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] 501type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
473 502
474-- TODO: use reorder and filter (Traversal option) leftovers 503-- TODO: use reorder and filter (Traversal option) leftovers
475search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o 504search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
476search k action = do 505search k action = do
477 awaitForever $ \ inputs -> unless (L.null inputs) $ do 506 awaitForever $ \ batch -> unless (L.null batch) $ do
478 $(logWarnS) "search" "start query" 507 $(logWarnS) "search" "start query"
479 responses <- lift $ queryParallel (action <$> inputs) 508 responses <- lift $ queryParallel (action <$> batch)
480 let (nodes, results) = partitionEithers responses 509 let (nodes, results) = partitionEithers responses
481 $(logWarnS) "search" "done query" 510 $(logWarnS) "search" "done query"
482 leftover $ L.concat nodes 511 leftover $ L.concat nodes
483 mapM_ yield results 512 mapM_ yield results
513
514publish :: Address ip => InfoHash -> PortNumber -> DHT ip ()
515publish ih port = do
516 nodes <- getClosest ih
517 _ <- sourceList [nodes] $= search ih (announceQ ih port) $$ C.take 20
518 return ()
519
520{-----------------------------------------------------------------------
521-- Handlers
522-----------------------------------------------------------------------}
523
524pingH :: Address ip => NodeHandler ip
525pingH = nodeHandler $ \ _ Ping -> do
526 return Ping
527
528findNodeH :: Address ip => NodeHandler ip
529findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
530 NodeFound <$> getClosest nid
531
532getPeersH :: Address ip => NodeHandler ip
533getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
534 GotPeers <$> getPeerList ih <*> grantToken naddr
535
536announceH :: Address ip => NodeHandler ip
537announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do
538 checkToken naddr sessionToken
539 let annPort = if impliedPort then nodePort else port
540 let peerAddr = PeerAddr Nothing nodeHost annPort
541 insertPeer topic peerAddr
542 return Announced
543
544handlers :: Address ip => [NodeHandler ip]
545handlers = [pingH, findNodeH, getPeersH, announceH]