summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Query.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs141
1 files changed, 141 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
new file mode 100644
index 00000000..abe1ef5f
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -0,0 +1,141 @@
1{-# LANGUAGE FlexibleContexts #-}
2{-# LANGUAGE TemplateHaskell #-}
3module Network.BitTorrent.DHT.Query
4 ( -- * Handler
5 pingH
6 , findNodeH
7 , getPeersH
8 , announceH
9 , handlers
10
11 -- * Search
12 -- ** Step
13 , Iteration
14 , findNodeQ
15 , getPeersQ
16 , announceQ
17
18 -- ** Traversal
19 , Search
20 , search
21 , publish
22 ) where
23
24import Control.Applicative
25import Control.Concurrent.Lifted hiding (yield)
26import Control.Exception.Lifted hiding (Handler)
27import Control.Monad.Reader
28import Control.Monad.Logger
29import Data.Conduit
30import Data.Conduit.List as C hiding (mapMaybe, mapM_)
31import Data.Either
32import Data.List as L
33import Data.Monoid
34import Data.Text as T
35import Network
36import Text.PrettyPrint as PP hiding ((<>), ($$))
37import Text.PrettyPrint.Class
38
39import Network.KRPC hiding (Options, def)
40import Data.Torrent.InfoHash
41import Network.BitTorrent.Core
42import Network.BitTorrent.DHT.Message
43import Network.BitTorrent.DHT.Routing
44import Network.BitTorrent.DHT.Session
45
46{-----------------------------------------------------------------------
47-- Handlers
48-----------------------------------------------------------------------}
49
50nodeHandler :: Address ip => KRPC (Query a) (Response b)
51 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
52nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do
53 case fromSockAddr sockAddr of
54 Nothing -> throwIO BadAddress
55 Just naddr -> do
56 insertNode (NodeInfo remoteId naddr)
57 Response <$> getNodeId <*> action naddr q
58
59pingH :: Address ip => NodeHandler ip
60pingH = nodeHandler $ \ _ Ping -> do
61 return Ping
62
63findNodeH :: Address ip => NodeHandler ip
64findNodeH = nodeHandler $ \ _ (FindNode nid) -> do
65 NodeFound <$> getClosest nid
66
67getPeersH :: Address ip => NodeHandler ip
68getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do
69 GotPeers <$> getPeerList ih <*> grantToken naddr
70
71announceH :: Address ip => NodeHandler ip
72announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do
73 checkToken naddr sessionToken
74 let annPort = if impliedPort then nodePort else port
75 let peerAddr = PeerAddr Nothing nodeHost annPort
76 insertPeer topic peerAddr
77 return Announced
78
79handlers :: Address ip => [NodeHandler ip]
80handlers = [pingH, findNodeH, getPeersH, announceH]
81
82{-----------------------------------------------------------------------
83-- Search
84-----------------------------------------------------------------------}
85
86type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip])
87
88-- TODO match with expected node id
89findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo
90findNodeQ nid NodeInfo {..} = do
91 NodeFound closest <- FindNode nid <@> nodeAddr
92 return $ Right closest
93
94isLeft :: Either a b -> Bool
95isLeft (Right _) = False
96isLeft (Left _) = True
97
98getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr
99getPeersQ topic NodeInfo {..} = do
100 GotPeers {..} <- GetPeers topic <@> nodeAddr
101 let dist = distance (toNodeId topic) nodeId
102 $(logInfoS) "getPeersQ" $ T.pack
103 $ "distance: " <> render (pretty dist) <> " , result: "
104 <> if isLeft peers then "NODES" else "PEERS"
105 return peers
106
107announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr
108announceQ ih p NodeInfo {..} = do
109 GotPeers {..} <- GetPeers ih <@> nodeAddr
110 case peers of
111 Left ns
112 | False -> undefined -- TODO check if we can announce
113 | otherwise -> return (Left ns)
114 Right ps -> do -- TODO *probably* add to peer cache
115 Announced <- Announce False ih p grantedToken <@> nodeAddr
116 return (Right [nodeAddr])
117
118type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip]
119
120-- TODO: use reorder and filter (Traversal option) leftovers
121search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o
122search k action = do
123 awaitForever $ \ batch -> unless (L.null batch) $ do
124 $(logWarnS) "search" "start query"
125 responses <- lift $ queryParallel (action <$> batch)
126 let (nodes, results) = partitionEithers responses
127 $(logWarnS) "search" "done query"
128 leftover $ L.concat nodes
129 mapM_ yield results
130
131publish :: Address ip => InfoHash -> PortNumber -> DHT ip ()
132publish ih p = do
133 nodes <- getClosest ih
134 r <- asks (optReplication . options)
135 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
136 return ()
137
138republish :: DHT ip ThreadId
139republish = fork $ do
140 i <- askOption optReannounce
141 error "DHT.republish: not implemented"