summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bittorrent.cabal1
-rw-r--r--examples/MkTorrent.hs2
-rw-r--r--src/Network/BitTorrent/DHT.hs1
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs141
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs126
5 files changed, 147 insertions, 124 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index 8cf045e7..c9e8c83c 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -64,6 +64,7 @@ library
64 Network.BitTorrent.Core.PeerAddr 64 Network.BitTorrent.Core.PeerAddr
65 Network.BitTorrent.DHT 65 Network.BitTorrent.DHT
66 Network.BitTorrent.DHT.Message 66 Network.BitTorrent.DHT.Message
67 Network.BitTorrent.DHT.Query
67 Network.BitTorrent.DHT.Routing 68 Network.BitTorrent.DHT.Routing
68 Network.BitTorrent.DHT.Session 69 Network.BitTorrent.DHT.Session
69 Network.BitTorrent.DHT.Token 70 Network.BitTorrent.DHT.Token
diff --git a/examples/MkTorrent.hs b/examples/MkTorrent.hs
index 4b6439e3..7fce5e16 100644
--- a/examples/MkTorrent.hs
+++ b/examples/MkTorrent.hs
@@ -38,7 +38,7 @@ import Data.Torrent.Piece
38import Data.Torrent.Layout 38import Data.Torrent.Layout
39import Data.Torrent.Magnet hiding (Magnet) 39import Data.Torrent.Magnet hiding (Magnet)
40import Network.BitTorrent.Core 40import Network.BitTorrent.Core
41import Network.BitTorrent.DHT.Session hiding (Options) 41import Network.BitTorrent.DHT.Session hiding (Options, options)
42import Network.BitTorrent.DHT as DHT hiding (Options) 42import Network.BitTorrent.DHT as DHT hiding (Options)
43import Network.BitTorrent.Exchange.Message 43import Network.BitTorrent.Exchange.Message
44import Network.BitTorrent.Exchange.Wire hiding (Options) 44import Network.BitTorrent.Exchange.Wire hiding (Options)
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index 6b57e451..b5dc868d 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -62,6 +62,7 @@ import Network.Socket
62import Data.Torrent (tNodes) 62import Data.Torrent (tNodes)
63import Data.Torrent.InfoHash 63import Data.Torrent.InfoHash
64import Network.BitTorrent.Core 64import Network.BitTorrent.Core
65import Network.BitTorrent.DHT.Query
65import Network.BitTorrent.DHT.Session 66import Network.BitTorrent.DHT.Session
66import Network.BitTorrent.DHT.Routing as T 67import Network.BitTorrent.DHT.Routing as T
67 68
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
new file mode 100644
index 00000000..abe1ef5f
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -0,0 +1,141 @@
1{-# LANGUAGE FlexibleContexts #-}
2{-# LANGUAGE TemplateHaskell #-}
3module Network.BitTorrent.DHT.Query
4 ( -- * Handler
5 pingH
6 , findNodeH
7 , getPeersH
8 , announceH
9 , handlers
10
11 -- * Search
12 -- ** Step
13 , Iteration
14 , findNodeQ
15 , getPeersQ
16 , announceQ
17
18 -- ** Traversal
19 , Search
20 , search
21 , publish
22 ) where
23
24import Control.Applicative
25import Control.Concurrent.Lifted hiding (yield)
26import Control.Exception.Lifted hiding (Handler)
27import Control.Monad.Reader
28import Control.Monad.Logger
29import Data.Conduit
30import Data.Conduit.List as C hiding (mapMaybe, mapM_)
31import Data.Either
32import Data.List as L
33import Data.Monoid
34import Data.Text as T
35import Network
36import Text.PrettyPrint as PP hiding ((<>), ($$))
37import Text.PrettyPrint.Class
38
39import Network.KRPC hiding (Options, def)
40import Data.Torrent.InfoHash
41import Network.BitTorrent.Core
42import Network.BitTorrent.DHT.Message
43import Network.BitTorrent.DHT.Routing
44import Network.BitTorrent.DHT.Session
45
46{-----------------------------------------------------------------------
47-- Handlers
48-----------------------------------------------------------------------}
49
50nodeHandler :: Address ip => KRPC (Query a) (Response b)
51 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
52nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
53 case fromSockAddr sockAddr of
54 Nothing -> throwIO BadAddress
55 Just naddr -> do
56 insertNode (NodeInfo remoteId naddr)
57 Response <$> getNodeId <*> action naddr q
58
59pingH :: Address ip => NodeHandler ip
60pingH = nodeHandler $ \ _ Ping -> do
61 return Ping
62
63findNodeH :: Address ip => NodeHandler ip
64findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
65 NodeFound <$> getClosest nid
66
67getPeersH :: Address ip => NodeHandler ip
68getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
69 GotPeers <$> getPeerList ih <*> grantToken naddr
70
71announceH :: Address ip => NodeHandler ip
72announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do
73 checkToken naddr sessionToken
74 let annPort = if impliedPort then nodePort else port
75 let peerAddr = PeerAddr Nothing nodeHost annPort
76 insertPeer topic peerAddr
77 return Announced
78
79handlers :: Address ip => [NodeHandler ip]
80handlers = [pingH, findNodeH, getPeersH, announceH]
81
82{-----------------------------------------------------------------------
83-- Search
84-----------------------------------------------------------------------}
85
86type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
87
88-- TODO match with expected node id
89findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo
90findNodeQ nid NodeInfo {..} = do
91 NodeFound closest <- FindNode nid <@> nodeAddr
92 return $ Right closest
93
94isLeft :: Either a b -> Bool
95isLeft (Right _) = False
96isLeft (Left _) = True
97
98getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr
99getPeersQ topic NodeInfo {..} = do
100 GotPeers {..} <- GetPeers topic <@> nodeAddr
101 let dist = distance (toNodeId topic) nodeId
102 $(logInfoS) "getPeersQ" $ T.pack
103 $ "distance: " <> render (pretty dist) <> " , result: "
104 <> if isLeft peers then "NODES" else "PEERS"
105 return peers
106
107announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr
108announceQ ih p NodeInfo {..} = do
109 GotPeers {..} <- GetPeers ih <@> nodeAddr
110 case peers of
111 Left ns
112 | False -> undefined -- TODO check if we can announce
113 | otherwise -> return (Left ns)
114 Right ps -> do -- TODO *probably* add to peer cache
115 Announced <- Announce False ih p grantedToken <@> nodeAddr
116 return (Right [nodeAddr])
117
118type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
119
120-- TODO: use reorder and filter (Traversal option) leftovers
121search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
122search k action = do
123 awaitForever $ \ batch -> unless (L.null batch) $ do
124 $(logWarnS) "search" "start query"
125 responses <- lift $ queryParallel (action <$> batch)
126 let (nodes, results) = partitionEithers responses
127 $(logWarnS) "search" "done query"
128 leftover $ L.concat nodes
129 mapM_ yield results
130
131publish :: Address ip => InfoHash -> PortNumber -> DHT ip ()
132publish ih p = do
133 nodes <- getClosest ih
134 r <- asks (optReplication . options)
135 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
136 return ()
137
138republish :: DHT ip ThreadId
139republish = fork $ do
140 i <- askOption optReannounce
141 error "DHT.republish: not implemented"
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 50ca6db3..01950c10 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -22,8 +22,10 @@ module Network.BitTorrent.DHT.Session
22 , Options (..) 22 , Options (..)
23 23
24 -- * Node 24 -- * Node
25 , LogFun
26 , Node 25 , Node
26 , options
27 , LogFun
28 , NodeHandler
27 , startNode 29 , startNode
28 30
29 -- * Session 31 -- * Session
@@ -48,32 +50,10 @@ module Network.BitTorrent.DHT.Session
48 , deleteTopic 50 , deleteTopic
49 51
50 -- * Messaging 52 -- * Messaging
51 -- ** Initiate
52 , queryNode 53 , queryNode
53 , queryParallel 54 , queryParallel
54 , (<@>) 55 , (<@>)
55 , ping 56 , ping
56
57 -- ** Accept
58 , NodeHandler
59 , nodeHandler
60 , pingH
61 , findNodeH
62 , getPeersH
63 , announceH
64 , handlers
65
66 -- * Search
67 -- ** Step
68 , Iteration
69 , findNodeQ
70 , getPeersQ
71 , announceQ
72
73 -- ** Traversal
74 , Search
75 , search
76 , publish
77 ) where 57 ) where
78 58
79import Prelude hiding (ioError) 59import Prelude hiding (ioError)
@@ -88,10 +68,7 @@ import Control.Monad.Logger
88import Control.Monad.Reader 68import Control.Monad.Reader
89import Control.Monad.Trans.Control 69import Control.Monad.Trans.Control
90import Control.Monad.Trans.Resource 70import Control.Monad.Trans.Resource
91import Data.Conduit
92import Data.Conduit.List as C hiding (mapMaybe, mapM_)
93import Data.Default 71import Data.Default
94import Data.Either
95import Data.Fixed 72import Data.Fixed
96import Data.Hashable 73import Data.Hashable
97import Data.List as L 74import Data.List as L
@@ -493,100 +470,3 @@ ping :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip)
493ping addr = do 470ping addr = do
494 (nid, Ping) <- queryNode addr Ping 471 (nid, Ping) <- queryNode addr Ping
495 return (NodeInfo nid addr) 472 return (NodeInfo nid addr)
496
497nodeHandler :: Address ip => KRPC (Query a) (Response b)
498 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
499nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
500 case fromSockAddr sockAddr of
501 Nothing -> throwIO BadAddress
502 Just naddr -> do
503 insertNode (NodeInfo remoteId naddr)
504 Response <$> getNodeId <*> action naddr q
505
506{-----------------------------------------------------------------------
507-- Search
508-----------------------------------------------------------------------}
509
510type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
511
512-- TODO match with expected node id
513findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo
514findNodeQ nid NodeInfo {..} = do
515 NodeFound closest <- FindNode nid <@> nodeAddr
516 return $ Right closest
517
518isLeft :: Either a b -> Bool
519isLeft (Right _) = False
520isLeft (Left _) = True
521
522getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr
523getPeersQ topic NodeInfo {..} = do
524 GotPeers {..} <- GetPeers topic <@> nodeAddr
525 let dist = distance (toNodeId topic) nodeId
526 $(logInfoS) "getPeersQ" $ T.pack
527 $ "distance: " <> render (pretty dist) <> " , result: "
528 <> if isLeft peers then "NODES" else "PEERS"
529 return peers
530
531announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr
532announceQ ih p NodeInfo {..} = do
533 GotPeers {..} <- GetPeers ih <@> nodeAddr
534 case peers of
535 Left ns
536 | False -> undefined -- TODO check if we can announce
537 | otherwise -> return (Left ns)
538 Right ps -> do -- TODO *probably* add to peer cache
539 Announced <- Announce False ih p grantedToken <@> nodeAddr
540 return (Right [nodeAddr])
541
542type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
543
544-- TODO: use reorder and filter (Traversal option) leftovers
545search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
546search k action = do
547 awaitForever $ \ batch -> unless (L.null batch) $ do
548 $(logWarnS) "search" "start query"
549 responses <- lift $ queryParallel (action <$> batch)
550 let (nodes, results) = partitionEithers responses
551 $(logWarnS) "search" "done query"
552 leftover $ L.concat nodes
553 mapM_ yield results
554
555publish :: Address ip => InfoHash -> PortNumber -> DHT ip ()
556publish ih p = do
557 nodes <- getClosest ih
558 r <- asks (optReplication . options)
559 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
560 return ()
561
562republish :: DHT ip ThreadId
563republish = fork $ do
564 i <- askOption optReannounce
565 error "DHT.republish: not implemented"
566
567{-----------------------------------------------------------------------
568-- Handlers
569-----------------------------------------------------------------------}
570
571pingH :: Address ip => NodeHandler ip
572pingH = nodeHandler $ \ _ Ping -> do
573 return Ping
574
575findNodeH :: Address ip => NodeHandler ip
576findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
577 NodeFound <$> getClosest nid
578
579getPeersH :: Address ip => NodeHandler ip
580getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
581 GotPeers <$> getPeerList ih <*> grantToken naddr
582
583announceH :: Address ip => NodeHandler ip
584announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do
585 checkToken naddr sessionToken
586 let annPort = if impliedPort then nodePort else port
587 let peerAddr = PeerAddr Nothing nodeHost annPort
588 insertPeer topic peerAddr
589 return Announced
590
591handlers :: Address ip => [NodeHandler ip]
592handlers = [pingH, findNodeH, getPeersH, announceH]