diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs new file mode 100644 index 00000000..abe1ef5f --- /dev/null +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -0,0 +1,141 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | {-# LANGUAGE TemplateHaskell #-} | ||
3 | module Network.BitTorrent.DHT.Query | ||
4 | ( -- * Handler | ||
5 | pingH | ||
6 | , findNodeH | ||
7 | , getPeersH | ||
8 | , announceH | ||
9 | , handlers | ||
10 | |||
11 | -- * Search | ||
12 | -- ** Step | ||
13 | , Iteration | ||
14 | , findNodeQ | ||
15 | , getPeersQ | ||
16 | , announceQ | ||
17 | |||
18 | -- ** Traversal | ||
19 | , Search | ||
20 | , search | ||
21 | , publish | ||
22 | ) where | ||
23 | |||
24 | import Control.Applicative | ||
25 | import Control.Concurrent.Lifted hiding (yield) | ||
26 | import Control.Exception.Lifted hiding (Handler) | ||
27 | import Control.Monad.Reader | ||
28 | import Control.Monad.Logger | ||
29 | import Data.Conduit | ||
30 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | ||
31 | import Data.Either | ||
32 | import Data.List as L | ||
33 | import Data.Monoid | ||
34 | import Data.Text as T | ||
35 | import Network | ||
36 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
37 | import Text.PrettyPrint.Class | ||
38 | |||
39 | import Network.KRPC hiding (Options, def) | ||
40 | import Data.Torrent.InfoHash | ||
41 | import Network.BitTorrent.Core | ||
42 | import Network.BitTorrent.DHT.Message | ||
43 | import Network.BitTorrent.DHT.Routing | ||
44 | import Network.BitTorrent.DHT.Session | ||
45 | |||
46 | {----------------------------------------------------------------------- | ||
47 | -- Handlers | ||
48 | -----------------------------------------------------------------------} | ||
49 | |||
50 | nodeHandler :: Address ip => KRPC (Query a) (Response b) | ||
51 | => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip | ||
52 | nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do | ||
53 | case fromSockAddr sockAddr of | ||
54 | Nothing -> throwIO BadAddress | ||
55 | Just naddr -> do | ||
56 | insertNode (NodeInfo remoteId naddr) | ||
57 | Response <$> getNodeId <*> action naddr q | ||
58 | |||
59 | pingH :: Address ip => NodeHandler ip | ||
60 | pingH = nodeHandler $ \ _ Ping -> do | ||
61 | return Ping | ||
62 | |||
63 | findNodeH :: Address ip => NodeHandler ip | ||
64 | findNodeH = nodeHandler $ \ _ (FindNode nid) -> do | ||
65 | NodeFound <$> getClosest nid | ||
66 | |||
67 | getPeersH :: Address ip => NodeHandler ip | ||
68 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do | ||
69 | GotPeers <$> getPeerList ih <*> grantToken naddr | ||
70 | |||
71 | announceH :: Address ip => NodeHandler ip | ||
72 | announceH = nodeHandler $ \ naddr @ NodeAddr {..} (Announce {..}) -> do | ||
73 | checkToken naddr sessionToken | ||
74 | let annPort = if impliedPort then nodePort else port | ||
75 | let peerAddr = PeerAddr Nothing nodeHost annPort | ||
76 | insertPeer topic peerAddr | ||
77 | return Announced | ||
78 | |||
79 | handlers :: Address ip => [NodeHandler ip] | ||
80 | handlers = [pingH, findNodeH, getPeersH, announceH] | ||
81 | |||
82 | {----------------------------------------------------------------------- | ||
83 | -- Search | ||
84 | -----------------------------------------------------------------------} | ||
85 | |||
86 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) | ||
87 | |||
88 | -- TODO match with expected node id | ||
89 | findNodeQ :: Address ip => NodeId -> Iteration ip NodeInfo | ||
90 | findNodeQ nid NodeInfo {..} = do | ||
91 | NodeFound closest <- FindNode nid <@> nodeAddr | ||
92 | return $ Right closest | ||
93 | |||
94 | isLeft :: Either a b -> Bool | ||
95 | isLeft (Right _) = False | ||
96 | isLeft (Left _) = True | ||
97 | |||
98 | getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr | ||
99 | getPeersQ topic NodeInfo {..} = do | ||
100 | GotPeers {..} <- GetPeers topic <@> nodeAddr | ||
101 | let dist = distance (toNodeId topic) nodeId | ||
102 | $(logInfoS) "getPeersQ" $ T.pack | ||
103 | $ "distance: " <> render (pretty dist) <> " , result: " | ||
104 | <> if isLeft peers then "NODES" else "PEERS" | ||
105 | return peers | ||
106 | |||
107 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr | ||
108 | announceQ ih p NodeInfo {..} = do | ||
109 | GotPeers {..} <- GetPeers ih <@> nodeAddr | ||
110 | case peers of | ||
111 | Left ns | ||
112 | | False -> undefined -- TODO check if we can announce | ||
113 | | otherwise -> return (Left ns) | ||
114 | Right ps -> do -- TODO *probably* add to peer cache | ||
115 | Announced <- Announce False ih p grantedToken <@> nodeAddr | ||
116 | return (Right [nodeAddr]) | ||
117 | |||
118 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | ||
119 | |||
120 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
121 | search :: TableKey k => Address ip => k -> Iteration ip o -> Search ip o | ||
122 | search k action = do | ||
123 | awaitForever $ \ batch -> unless (L.null batch) $ do | ||
124 | $(logWarnS) "search" "start query" | ||
125 | responses <- lift $ queryParallel (action <$> batch) | ||
126 | let (nodes, results) = partitionEithers responses | ||
127 | $(logWarnS) "search" "done query" | ||
128 | leftover $ L.concat nodes | ||
129 | mapM_ yield results | ||
130 | |||
131 | publish :: Address ip => InfoHash -> PortNumber -> DHT ip () | ||
132 | publish ih p = do | ||
133 | nodes <- getClosest ih | ||
134 | r <- asks (optReplication . options) | ||
135 | _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
136 | return () | ||
137 | |||
138 | republish :: DHT ip ThreadId | ||
139 | republish = fork $ do | ||
140 | i <- askOption optReannounce | ||
141 | error "DHT.republish: not implemented" | ||