summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Query.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-08 00:00:56 -0400
committerjoe <joe@jerkface.net>2017-06-08 00:00:56 -0400
commitd6fac9a8df0ce872ede54d6a71ca6d6c750eadc9 (patch)
treec4a7cd804714796bc918091ebb29f4ad4009a401 /src/Network/BitTorrent/DHT/Query.hs
parent05345c643d0bcebe17f9474d9561da6e90fff34e (diff)
WIP: Adapting DHT to Tox network (part 5).
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs63
1 files changed, 35 insertions, 28 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 820db8ba..4b386cdc 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -80,7 +80,7 @@ import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
80import Data.Time 80import Data.Time
81import Data.Time.Clock.POSIX 81import Data.Time.Clock.POSIX
82 82
83import Network.KRPC hiding (Options, def) 83import Network.KRPC as KRPC hiding (Options, def)
84import Network.KRPC.Message (ReflectedIP(..)) 84import Network.KRPC.Message (ReflectedIP(..))
85import Network.KRPC.Manager (QueryFailure(..)) 85import Network.KRPC.Manager (QueryFailure(..))
86import Data.Torrent 86import Data.Torrent
@@ -90,14 +90,15 @@ import Network.BitTorrent.DHT.Session
90import Control.Concurrent.STM 90import Control.Concurrent.STM
91import qualified Network.BitTorrent.DHT.Search as Search 91import qualified Network.BitTorrent.DHT.Search as Search
92#ifdef VERSION_bencoding 92#ifdef VERSION_bencoding
93import Network.BitTorrent.Address
94import Data.BEncode (BValue) 93import Data.BEncode (BValue)
95import Network.DHT.Mainline 94import Network.DHT.Mainline hiding (NodeId)
95import Network.KRPC.Message (KMessageOf)
96#else 96#else
97import Network.BitTorrent.Address hiding (NodeId)
98import Data.ByteString (ByteString) 97import Data.ByteString (ByteString)
99import Data.Tox 98import Data.Tox
100#endif 99#endif
100import Network.BitTorrent.Address hiding (NodeId)
101import Network.RPC as RPC hiding (Query,Response)
101 102
102{----------------------------------------------------------------------- 103{-----------------------------------------------------------------------
103-- Handlers 104-- Handlers
@@ -106,18 +107,17 @@ import Data.Tox
106nodeHandler :: ( Address ip 107nodeHandler :: ( Address ip
107 , KRPC (Query a) (Response b) 108 , KRPC (Query a) (Response b)
108#ifdef VERSION_bencoding 109#ifdef VERSION_bencoding
109 , Envelope (Query a) (Response b) ~ BValue ) 110 , KRPC.Envelope (Query a) (Response b) ~ BValue )
110#else 111#else
111 , Envelope (Query a) (Response b) ~ ByteString ) 112 , KPRC.Envelope (Query a) (Response b) ~ ByteString )
112#endif 113#endif
113 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip 114 => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip
114#ifdef VERSION_bencoding
115nodeHandler action = handler $ \ sockAddr qry -> do 115nodeHandler action = handler $ \ sockAddr qry -> do
116#ifdef VERSION_bencoding
116 let remoteId = queringNodeId qry 117 let remoteId = queringNodeId qry
117 read_only = queryIsReadOnly qry 118 read_only = queryIsReadOnly qry
118 q = queryParams qry 119 q = queryParams qry
119#else 120#else
120nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do
121 let remoteId = msgClient qry 121 let remoteId = msgClient qry
122 read_only = False 122 read_only = False
123 q = msgPayload qry 123 q = msgPayload qry
@@ -125,7 +125,7 @@ nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do
125 case fromSockAddr sockAddr of 125 case fromSockAddr sockAddr of
126 Nothing -> throwIO BadAddress 126 Nothing -> throwIO BadAddress
127 Just naddr -> do 127 Just naddr -> do
128 let ni = NodeInfo remoteId naddr 128 let ni = NodeInfo remoteId naddr ()
129 -- Do not route read-only nodes. (bep 43) 129 -- Do not route read-only nodes. (bep 43)
130 if read_only 130 if read_only
131 then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) 131 then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
@@ -136,8 +136,11 @@ nodeHandler action = handler (error "TODO TOX Messaging") $ \ sockAddr qry -> do
136 136
137-- | Default 'Ping' handler. 137-- | Default 'Ping' handler.
138pingH :: Address ip => NodeHandler ip 138pingH :: Address ip => NodeHandler ip
139pingH = nodeHandler $ \ _ Ping -> do 139#ifdef VERSION_bencoding
140 return Ping 140pingH = nodeHandler $ \ _ Ping -> return Ping
141#else
142pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
143#endif
141 144
142-- | Default 'FindNode' handler. 145-- | Default 'FindNode' handler.
143findNodeH :: Address ip => NodeHandler ip 146findNodeH :: Address ip => NodeHandler ip
@@ -177,19 +180,23 @@ defaultHandlers = [pingH, findNodeH]
177-- Basic queries 180-- Basic queries
178-----------------------------------------------------------------------} 181-----------------------------------------------------------------------}
179 182
180type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) 183type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip])
181 184
182-- | The most basic query. May be used to check if the given node is 185-- | The most basic query. May be used to check if the given node is
183-- alive or get its 'NodeId'. 186-- alive or get its 'NodeId'.
184pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) 187pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP)
185pingQ addr = do 188pingQ addr = do
189#ifdef VERSION_bencoding
186 (nid, Ping, mip) <- queryNode' addr Ping 190 (nid, Ping, mip) <- queryNode' addr Ping
187 return (NodeInfo nid addr, mip) 191#else
192 (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
193#endif
194 return (NodeInfo nid addr (), mip)
188 195
189-- TODO [robustness] match range of returned node ids with the 196-- TODO [robustness] match range of returned node ids with the
190-- expected range and either filter bad nodes or discard response at 197-- expected range and either filter bad nodes or discard response at
191-- all throwing an exception 198-- all throwing an exception
192findNodeQ :: Address ip => TableKey key => key -> Iteration ip NodeInfo 199-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo
193findNodeQ key NodeInfo {..} = do 200findNodeQ key NodeInfo {..} = do
194 NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr 201 NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr
195 $(logInfoS) "findNodeQ" $ "NodeFound\n" 202 $(logInfoS) "findNodeQ" $ "NodeFound\n"
@@ -223,7 +230,7 @@ announceQ ih p NodeInfo {..} = do
223-----------------------------------------------------------------------} 230-----------------------------------------------------------------------}
224 231
225 232
226ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [PeerAddr ip])) 233ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip]))
227ioGetPeers ih = do 234ioGetPeers ih = do
228 session <- ask 235 session <- ask
229 return $ \ni -> runDHT session $ do 236 return $ \ni -> runDHT session $ do
@@ -232,7 +239,7 @@ ioGetPeers ih = do
232 Right e -> return $ either (,[]) ([],) e 239 Right e -> return $ either (,[]) ([],) e
233 Left e -> let _ = e :: QueryFailure in return ([],[]) 240 Left e -> let _ = e :: QueryFailure in return ([],[])
234 241
235ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [NodeInfo ip])) 242ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()]))
236ioFindNode ih = do 243ioFindNode ih = do
237 session <- ask 244 session <- ask
238 return $ \ni -> runDHT session $ do 245 return $ \ni -> runDHT session $ do
@@ -240,7 +247,7 @@ ioFindNode ih = do
240 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns 247 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns
241 248
242isearch :: (Ord r, Ord ip) => 249isearch :: (Ord r, Ord ip) =>
243 (InfoHash -> DHT ip (NodeInfo ip -> IO ([NodeInfo ip], [r]))) 250 (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r])))
244 -> InfoHash 251 -> InfoHash
245 -> DHT ip (ThreadId, Search.IterativeSearch ip r) 252 -> DHT ip (ThreadId, Search.IterativeSearch ip r)
246isearch f ih = do 253isearch f ih = do
@@ -255,10 +262,10 @@ isearch f ih = do
255 return (a, s) 262 return (a, s)
256 263
257 264
258type Search ip o = Conduit [NodeInfo ip] (DHT ip) [o ip] 265type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()]
259 266
260-- TODO: use reorder and filter (Traversal option) leftovers 267-- TODO: use reorder and filter (Traversal option) leftovers
261search :: k -> Iteration ip o -> Search ip o 268-- search :: k -> IterationI ip o -> Search ip o
262search _ action = do 269search _ action = do
263 awaitForever $ \ batch -> unless (L.null batch) $ do 270 awaitForever $ \ batch -> unless (L.null batch) $ do
264 $(logWarnS) "search" "start query" 271 $(logWarnS) "search" "start query"
@@ -285,15 +292,15 @@ probeNode addr = do
285 292
286 293
287-- FIXME do not use getClosest sinse we should /refresh/ them 294-- FIXME do not use getClosest sinse we should /refresh/ them
288refreshNodes :: Address ip => NodeId -> DHT ip () -- [NodeInfo ip] 295refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()]
289refreshNodes nid = do 296refreshNodes nid = do
290 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) 297 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
291 nodes <- getClosest nid 298 nodes <- getClosest nid
292 do 299 do
293 -- forM (L.take 1 nodes) $ \ addr -> do 300 -- forM (L.take 1 nodes) $ \ addr -> do
294 -- NodeFound ns <- FindNode nid <@> addr 301 -- NodeFound ns <- FindNode nid <@> addr
295 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo ip] (DHT ip) () 302 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) ()
296 -- Actual type: ConduitM [NodeInfo ip] [NodeInfo ip] (DHT ip) () 303 -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) ()
297 -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume 304 -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume
298 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume 305 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume
299 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." 306 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes."
@@ -306,7 +313,7 @@ refreshNodes nid = do
306 313
307-- | This operation do not block but acquire exclusive access to 314-- | This operation do not block but acquire exclusive access to
308-- routing table. 315-- routing table.
309insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip () 316insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip ()
310insertNode info witnessed_ip0 = do 317insertNode info witnessed_ip0 = do
311 var <- asks routingInfo 318 var <- asks routingInfo
312 tm <- getTimestamp 319 tm <- getTimestamp
@@ -315,7 +322,7 @@ insertNode info witnessed_ip0 = do
315 let logMsg = "Routing table: " <> pPrint t 322 let logMsg = "Routing table: " <> pPrint t
316 $(logDebugS) "insertNode" (T.pack (render logMsg)) 323 $(logDebugS) "insertNode" (T.pack (render logMsg))
317 let arrival0 = TryInsert info 324 let arrival0 = TryInsert info
318 arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) 325 arrival4 = TryInsert (mapAddress fromAddr info) :: Event (Maybe IPv4)
319 $(logDebugS) "insertNode" $ T.pack (show arrival4) 326 $(logDebugS) "insertNode" $ T.pack (show arrival4)
320 maxbuckets <- asks (optBucketCount . options) 327 maxbuckets <- asks (optBucketCount . options)
321 fallbackid <- asks tentativeNodeId 328 fallbackid <- asks tentativeNodeId
@@ -380,18 +387,18 @@ insertNode info witnessed_ip0 = do
380 387
381-- | Throws exception if node is not responding. 388-- | Throws exception if node is not responding.
382queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 389queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
383 => NodeAddr ip -> a -> DHT ip (NodeId, b) 390 => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b)
384queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q 391queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q
385 392
386queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) 393queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b)
387 => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) 394 => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP)
388queryNode' addr q = do 395queryNode' addr q = do
389 nid <- myNodeIdAccordingTo addr 396 nid <- myNodeIdAccordingTo addr
390 let read_only = False -- TODO: check for NAT issues. (BEP 43) 397 let read_only = False -- TODO: check for NAT issues. (BEP 43)
391 (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) 398 (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q)
392 -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) 399 -- $(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
393 -- <> " by " <> T.pack (show (toSockAddr addr)) 400 -- <> " by " <> T.pack (show (toSockAddr addr))
394 _ <- insertNode (NodeInfo remoteId addr) witnessed_ip 401 _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip
395 return (remoteId, r, witnessed_ip) 402 return (remoteId, r, witnessed_ip)
396 403
397-- | Infix version of 'queryNode' function. 404-- | Infix version of 'queryNode' function.