diff options
author | joe <joe@jerkface.net> | 2017-06-08 00:00:56 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-06-08 00:00:56 -0400 |
commit | d6fac9a8df0ce872ede54d6a71ca6d6c750eadc9 (patch) | |
tree | c4a7cd804714796bc918091ebb29f4ad4009a401 /src/Network/BitTorrent/DHT/Query.hs | |
parent | 05345c643d0bcebe17f9474d9561da6e90fff34e (diff) |
WIP: Adapting DHT to Tox network (part 5).
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 63 |
1 files changed, 35 insertions, 28 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 820db8ba..4b386cdc 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -80,7 +80,7 @@ import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | |||
80 | import Data.Time | 80 | import Data.Time |
81 | import Data.Time.Clock.POSIX | 81 | import Data.Time.Clock.POSIX |
82 | 82 | ||
83 | import Network.KRPC hiding (Options, def) | 83 | import Network.KRPC as KRPC hiding (Options, def) |
84 | import Network.KRPC.Message (ReflectedIP(..)) | 84 | import Network.KRPC.Message (ReflectedIP(..)) |
85 | import Network.KRPC.Manager (QueryFailure(..)) | 85 | import Network.KRPC.Manager (QueryFailure(..)) |
86 | import Data.Torrent | 86 | import Data.Torrent |
@@ -90,14 +90,15 @@ import Network.BitTorrent.DHT.Session | |||
90 | import Control.Concurrent.STM | 90 | import Control.Concurrent.STM |
91 | import qualified Network.BitTorrent.DHT.Search as Search | 91 | import qualified Network.BitTorrent.DHT.Search as Search |
92 | #ifdef VERSION_bencoding | 92 | #ifdef VERSION_bencoding |
93 | import Network.BitTorrent.Address | ||
94 | import Data.BEncode (BValue) | 93 | import Data.BEncode (BValue) |
95 | import Network.DHT.Mainline | 94 | import Network.DHT.Mainline hiding (NodeId) |
95 | import Network.KRPC.Message (KMessageOf) | ||
96 | #else | 96 | #else |
97 | import Network.BitTorrent.Address hiding (NodeId) | ||
98 | import Data.ByteString (ByteString) | 97 | import Data.ByteString (ByteString) |
99 | import Data.Tox | 98 | import Data.Tox |
100 | #endif | 99 | #endif |
100 | import Network.BitTorrent.Address hiding (NodeId) | ||
101 | import Network.RPC as RPC hiding (Query,Response) | ||
101 | 102 | ||
102 | {----------------------------------------------------------------------- | 103 | {----------------------------------------------------------------------- |
103 | -- Handlers | 104 | -- Handlers |
@@ -106,18 +107,17 @@ import Data.Tox | |||
106 | nodeHandler :: ( Address ip | 107 | nodeHandler :: ( Address ip |
107 | , KRPC (Query a) (Response b) | 108 | , KRPC (Query a) (Response b) |
108 | #ifdef VERSION_bencoding | 109 | #ifdef VERSION_bencoding |
109 | , Envelope (Query a) (Response b) ~ BValue ) | 110 | , KRPC.Envelope (Query a) (Response b) ~ BValue ) |
110 | #else | 111 | #else |
111 | , Envelope (Query a) (Response b) ~ ByteString ) | 112 | , KPRC.Envelope (Query a) (Response b) ~ ByteString ) |
112 | #endif | 113 | #endif |
113 | => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip | 114 | => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip |
114 | #ifdef VERSION_bencoding | ||
115 | nodeHandler action = handler $ \ sockAddr qry -> do | 115 | nodeHandler action = handler $ \ sockAddr qry -> do |
116 | #ifdef VERSION_bencoding | ||
116 | let remoteId = queringNodeId qry | 117 | let remoteId = queringNodeId qry |
117 | read_only = queryIsReadOnly qry | 118 | read_only = queryIsReadOnly qry |
118 | q = queryParams qry | 119 | q = queryParams qry |
119 | #else | 120 | #else |
120 | nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do | ||
121 | let remoteId = msgClient qry | 121 | let remoteId = msgClient qry |
122 | read_only = False | 122 | read_only = False |
123 | q = msgPayload qry | 123 | q = msgPayload qry |
@@ -125,7 +125,7 @@ nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do | |||
125 | case fromSockAddr sockAddr of | 125 | case fromSockAddr sockAddr of |
126 | Nothing -> throwIO BadAddress | 126 | Nothing -> throwIO BadAddress |
127 | Just naddr -> do | 127 | Just naddr -> do |
128 | let ni = NodeInfo remoteId naddr | 128 | let ni = NodeInfo remoteId naddr () |
129 | -- Do not route read-only nodes. (bep 43) | 129 | -- Do not route read-only nodes. (bep 43) |
130 | if read_only | 130 | if read_only |
131 | then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | 131 | then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) |
@@ -136,8 +136,11 @@ nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do | |||
136 | 136 | ||
137 | -- | Default 'Ping' handler. | 137 | -- | Default 'Ping' handler. |
138 | pingH :: Address ip => NodeHandler ip | 138 | pingH :: Address ip => NodeHandler ip |
139 | pingH = nodeHandler $ \ _ Ping -> do | 139 | #ifdef VERSION_bencoding |
140 | return Ping | 140 | pingH = nodeHandler $ \ _ Ping -> return Ping |
141 | #else | ||
142 | pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } | ||
143 | #endif | ||
141 | 144 | ||
142 | -- | Default 'FindNode' handler. | 145 | -- | Default 'FindNode' handler. |
143 | findNodeH :: Address ip => NodeHandler ip | 146 | findNodeH :: Address ip => NodeHandler ip |
@@ -177,19 +180,23 @@ defaultHandlers = [pingH, findNodeH] | |||
177 | -- Basic queries | 180 | -- Basic queries |
178 | -----------------------------------------------------------------------} | 181 | -----------------------------------------------------------------------} |
179 | 182 | ||
180 | type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) | 183 | type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) |
181 | 184 | ||
182 | -- | The most basic query. May be used to check if the given node is | 185 | -- | The most basic query. May be used to check if the given node is |
183 | -- alive or get its 'NodeId'. | 186 | -- alive or get its 'NodeId'. |
184 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) | 187 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) |
185 | pingQ addr = do | 188 | pingQ addr = do |
189 | #ifdef VERSION_bencoding | ||
186 | (nid, Ping, mip) <- queryNode' addr Ping | 190 | (nid, Ping, mip) <- queryNode' addr Ping |
187 | return (NodeInfo nid addr, mip) | 191 | #else |
192 | (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | ||
193 | #endif | ||
194 | return (NodeInfo nid addr (), mip) | ||
188 | 195 | ||
189 | -- TODO [robustness] match range of returned node ids with the | 196 | -- TODO [robustness] match range of returned node ids with the |
190 | -- expected range and either filter bad nodes or discard response at | 197 | -- expected range and either filter bad nodes or discard response at |
191 | -- all throwing an exception | 198 | -- all throwing an exception |
192 | findNodeQ :: Address ip => TableKey key => key -> Iteration ip NodeInfo | 199 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo |
193 | findNodeQ key NodeInfo {..} = do | 200 | findNodeQ key NodeInfo {..} = do |
194 | NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr | 201 | NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr |
195 | $(logInfoS) "findNodeQ" $ "NodeFound\n" | 202 | $(logInfoS) "findNodeQ" $ "NodeFound\n" |
@@ -223,7 +230,7 @@ announceQ ih p NodeInfo {..} = do | |||
223 | -----------------------------------------------------------------------} | 230 | -----------------------------------------------------------------------} |
224 | 231 | ||
225 | 232 | ||
226 | ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [PeerAddr ip])) | 233 | ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) |
227 | ioGetPeers ih = do | 234 | ioGetPeers ih = do |
228 | session <- ask | 235 | session <- ask |
229 | return $ \ni -> runDHT session $ do | 236 | return $ \ni -> runDHT session $ do |
@@ -232,7 +239,7 @@ ioGetPeers ih = do | |||
232 | Right e -> return $ either (,[]) ([],) e | 239 | Right e -> return $ either (,[]) ([],) e |
233 | Left e -> let _ = e :: QueryFailure in return ([],[]) | 240 | Left e -> let _ = e :: QueryFailure in return ([],[]) |
234 | 241 | ||
235 | ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [NodeInfo ip])) | 242 | ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) |
236 | ioFindNode ih = do | 243 | ioFindNode ih = do |
237 | session <- ask | 244 | session <- ask |
238 | return $ \ni -> runDHT session $ do | 245 | return $ \ni -> runDHT session $ do |
@@ -240,7 +247,7 @@ ioFindNode ih = do | |||
240 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns | 247 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns |
241 | 248 | ||
242 | isearch :: (Ord r, Ord ip) => | 249 | isearch :: (Ord r, Ord ip) => |
243 | (InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [r]))) | 250 | (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) |
244 | -> InfoHash | 251 | -> InfoHash |
245 | -> DHT ip (ThreadId, Search.IterativeSearch ip r) | 252 | -> DHT ip (ThreadId, Search.IterativeSearch ip r) |
246 | isearch f ih = do | 253 | isearch f ih = do |
@@ -255,10 +262,10 @@ isearch f ih = do | |||
255 | return (a, s) | 262 | return (a, s) |
256 | 263 | ||
257 | 264 | ||
258 | type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] | 265 | type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] |
259 | 266 | ||
260 | -- TODO: use reorder and filter (Traversal option) leftovers | 267 | -- TODO: use reorder and filter (Traversal option) leftovers |
261 | search :: k -> Iteration ip o -> Search ip o | 268 | -- search :: k -> IterationI ip o -> Search ip o |
262 | search _ action = do | 269 | search _ action = do |
263 | awaitForever $ \ batch -> unless (L.null batch) $ do | 270 | awaitForever $ \ batch -> unless (L.null batch) $ do |
264 | $(logWarnS) "search" "start query" | 271 | $(logWarnS) "search" "start query" |
@@ -285,15 +292,15 @@ probeNode addr = do | |||
285 | 292 | ||
286 | 293 | ||
287 | -- FIXME do not use getClosest sinse we should /refresh/ them | 294 | -- FIXME do not use getClosest sinse we should /refresh/ them |
288 | refreshNodes :: Address ip => NodeId -> DHT ip () -- [NodeInfo ip] | 295 | refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()] |
289 | refreshNodes nid = do | 296 | refreshNodes nid = do |
290 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | 297 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) |
291 | nodes <- getClosest nid | 298 | nodes <- getClosest nid |
292 | do | 299 | do |
293 | -- forM (L.take 1 nodes) $ \ addr -> do | 300 | -- forM (L.take 1 nodes) $ \ addr -> do |
294 | -- NodeFound ns <- FindNode nid <@> addr | 301 | -- NodeFound ns <- FindNode nid <@> addr |
295 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo ip] (DHT ip) () | 302 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () |
296 | -- Actual type: ConduitM [NodeInfo ip] [NodeInfo ip] (DHT ip) () | 303 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () |
297 | -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume | 304 | -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume |
298 | nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume | 305 | nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume |
299 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." | 306 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." |
@@ -306,7 +313,7 @@ refreshNodes nid = do | |||
306 | 313 | ||
307 | -- | This operation do not block but acquire exclusive access to | 314 | -- | This operation do not block but acquire exclusive access to |
308 | -- routing table. | 315 | -- routing table. |
309 | insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip () | 316 | insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () |
310 | insertNode info witnessed_ip0 = do | 317 | insertNode info witnessed_ip0 = do |
311 | var <- asks routingInfo | 318 | var <- asks routingInfo |
312 | tm <- getTimestamp | 319 | tm <- getTimestamp |
@@ -315,7 +322,7 @@ insertNode info witnessed_ip0 = do | |||
315 | let logMsg = "Routing table: " <> pPrint t | 322 | let logMsg = "Routing table: " <> pPrint t |
316 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | 323 | $(logDebugS) "insertNode" (T.pack (render logMsg)) |
317 | let arrival0 = TryInsert info | 324 | let arrival0 = TryInsert info |
318 | arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) | 325 | arrival4 = TryInsert (mapAddress fromAddr info) :: Event (Maybe IPv4) |
319 | $(logDebugS) "insertNode" $ T.pack (show arrival4) | 326 | $(logDebugS) "insertNode" $ T.pack (show arrival4) |
320 | maxbuckets <- asks (optBucketCount . options) | 327 | maxbuckets <- asks (optBucketCount . options) |
321 | fallbackid <- asks tentativeNodeId | 328 | fallbackid <- asks tentativeNodeId |
@@ -380,18 +387,18 @@ insertNode info witnessed_ip0 = do | |||
380 | 387 | ||
381 | -- | Throws exception if node is not responding. | 388 | -- | Throws exception if node is not responding. |
382 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 389 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
383 | => NodeAddr ip -> a -> DHT ip (NodeId, b) | 390 | => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) |
384 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q | 391 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q |
385 | 392 | ||
386 | queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 393 | queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
387 | => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) | 394 | => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) |
388 | queryNode' addr q = do | 395 | queryNode' addr q = do |
389 | nid <- myNodeIdAccordingTo addr | 396 | nid <- myNodeIdAccordingTo addr |
390 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | 397 | let read_only = False -- TODO: check for NAT issues. (BEP 43) |
391 | (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) | 398 | (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) |
392 | -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) | 399 | -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) |
393 | -- <> " by " <> T.pack (show (toSockAddr addr)) | 400 | -- <> " by " <> T.pack (show (toSockAddr addr)) |
394 | _ <- insertNode (NodeInfo remoteId addr) witnessed_ip | 401 | _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip |
395 | return (remoteId, r, witnessed_ip) | 402 | return (remoteId, r, witnessed_ip) |
396 | 403 | ||
397 | -- | Infix version of 'queryNode' function. | 404 | -- | Infix version of 'queryNode' function. |