diff options
author | joe <joe@jerkface.net> | 2017-01-04 00:54:49 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-04 00:54:49 -0500 |
commit | 2fd473635dba00f7af37401058522a29460392fc (patch) | |
tree | 25e90513b252b5361201c524175851b63e12ca06 /src/Network/BitTorrent/DHT | |
parent | 19aa76afa7349cc3c91111b38ab3012f63380433 (diff) |
Made node refresh into full iterative search.
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 94 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 34 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 64 |
3 files changed, 107 insertions, 85 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index d51ab505..c5fcccb4 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -9,8 +9,9 @@ | |||
9 | -- Normally, you don't need to import this module, use | 9 | -- Normally, you don't need to import this module, use |
10 | -- "Network.BitTorrent.DHT" instead. | 10 | -- "Network.BitTorrent.DHT" instead. |
11 | -- | 11 | -- |
12 | {-# LANGUAGE FlexibleContexts #-} | 12 | {-# LANGUAGE FlexibleContexts #-} |
13 | {-# LANGUAGE TemplateHaskell #-} | 13 | {-# LANGUAGE ScopedTypeVariables #-} |
14 | {-# LANGUAGE TemplateHaskell #-} | ||
14 | module Network.BitTorrent.DHT.Query | 15 | module Network.BitTorrent.DHT.Query |
15 | ( -- * Handler | 16 | ( -- * Handler |
16 | -- | To bind specific set of handlers you need to pass | 17 | -- | To bind specific set of handlers you need to pass |
@@ -38,6 +39,14 @@ module Network.BitTorrent.DHT.Query | |||
38 | , Search | 39 | , Search |
39 | , search | 40 | , search |
40 | , publish | 41 | , publish |
42 | |||
43 | -- ** Routing table | ||
44 | , insertNode | ||
45 | , refreshNodes | ||
46 | |||
47 | -- ** Messaging | ||
48 | , queryNode | ||
49 | , (<@>) | ||
41 | ) where | 50 | ) where |
42 | 51 | ||
43 | import Control.Applicative | 52 | import Control.Applicative |
@@ -54,12 +63,14 @@ import Data.Text as T | |||
54 | import Network | 63 | import Network |
55 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | 64 | import Text.PrettyPrint as PP hiding ((<>), ($$)) |
56 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | 65 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) |
66 | import Data.Time | ||
67 | import Data.Time.Clock.POSIX | ||
57 | 68 | ||
58 | import Network.KRPC hiding (Options, def) | 69 | import Network.KRPC hiding (Options, def) |
59 | import Data.Torrent | 70 | import Data.Torrent |
60 | import Network.BitTorrent.Address | 71 | import Network.BitTorrent.Address |
61 | import Network.BitTorrent.DHT.Message | 72 | import Network.BitTorrent.DHT.Message |
62 | import Network.BitTorrent.DHT.Routing | 73 | import Network.BitTorrent.DHT.Routing as R |
63 | import Network.BitTorrent.DHT.Session | 74 | import Network.BitTorrent.DHT.Session |
64 | 75 | ||
65 | {----------------------------------------------------------------------- | 76 | {----------------------------------------------------------------------- |
@@ -165,7 +176,7 @@ search _ action = do | |||
165 | $(logWarnS) "search" "start query" | 176 | $(logWarnS) "search" "start query" |
166 | responses <- lift $ queryParallel (action <$> batch) | 177 | responses <- lift $ queryParallel (action <$> batch) |
167 | let (nodes, results) = partitionEithers responses | 178 | let (nodes, results) = partitionEithers responses |
168 | $(logWarnS) "search" ("done query more:" <> T.pack (show $ L.length nodes)) | 179 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) |
169 | leftover $ L.concat nodes | 180 | leftover $ L.concat nodes |
170 | mapM_ yield results | 181 | mapM_ yield results |
171 | 182 | ||
@@ -180,3 +191,78 @@ republish :: DHT ip ThreadId | |||
180 | republish = fork $ do | 191 | republish = fork $ do |
181 | i <- asks (optReannounce . options) | 192 | i <- asks (optReannounce . options) |
182 | error "DHT.republish: not implemented" | 193 | error "DHT.republish: not implemented" |
194 | |||
195 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) | ||
196 | routing = runRouting probeNode refreshNodes getTimestamp | ||
197 | |||
198 | getTimestamp :: DHT ip Timestamp | ||
199 | getTimestamp = do | ||
200 | utcTime <- liftIO $ getCurrentTime | ||
201 | $(logDebugS) "routing.make_timestamp" (T.pack (render (pPrint utcTime))) | ||
202 | return $ utcTimeToPOSIXSeconds utcTime | ||
203 | |||
204 | |||
205 | probeNode :: Address ip => NodeAddr ip -> DHT ip Bool | ||
206 | probeNode addr = do | ||
207 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) | ||
208 | result <- try $ Ping <@> addr | ||
209 | let _ = result :: Either SomeException Ping | ||
210 | return $ either (const False) (const True) result | ||
211 | |||
212 | |||
213 | -- FIXME do not use getClosest sinse we should /refresh/ them | ||
214 | refreshNodes :: Address ip => NodeId -> DHT ip () -- [NodeInfo ip] | ||
215 | refreshNodes nid = do | ||
216 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | ||
217 | nodes <- getClosest nid | ||
218 | do | ||
219 | -- forM (L.take 1 nodes) $ \ addr -> do | ||
220 | -- NodeFound ns <- FindNode nid <@> addr | ||
221 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo ip] (DHT ip) () | ||
222 | -- Actual type: ConduitM [NodeInfo ip] [NodeInfo ip] (DHT ip) () | ||
223 | -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume | ||
224 | nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume | ||
225 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." | ||
226 | queryParallel $ flip L.map (L.concat nss) $ \n -> do | ||
227 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | ||
228 | pingQ (nodeAddr n) | ||
229 | insertNode n | ||
230 | return () | ||
231 | return () -- $ L.concat nss | ||
232 | |||
233 | -- | This operation do not block but acquire exclusive access to | ||
234 | -- routing table. | ||
235 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId | ||
236 | insertNode info = fork $ do | ||
237 | var <- asks routingTable | ||
238 | t <- takeMVar var | ||
239 | t' <- do -- modifyMVar_ var $ \ t -> do | ||
240 | result <- routing (R.insert info t) | ||
241 | case result of | ||
242 | Nothing -> do | ||
243 | $(logDebugS) "insertNode" $ "Routing table is full: " | ||
244 | <> T.pack (show (pPrint t)) | ||
245 | return t | ||
246 | Just t' -> do | ||
247 | let logMsg = "Routing table updated: " | ||
248 | <> pPrint t <> " -> " <> pPrint t' | ||
249 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | ||
250 | return t' | ||
251 | putMVar var t' | ||
252 | |||
253 | -- | Throws exception if node is not responding. | ||
254 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | ||
255 | => NodeAddr ip -> a -> DHT ip (NodeId, b) | ||
256 | queryNode addr q = do | ||
257 | nid <- asks thisNodeId | ||
258 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | ||
259 | Response remoteId r <- query (toSockAddr addr) (Query nid read_only q) | ||
260 | insertNode (NodeInfo remoteId addr) | ||
261 | return (remoteId, r) | ||
262 | |||
263 | -- | Infix version of 'queryNode' function. | ||
264 | (<@>) :: Address ip => KRPC (Query a) (Response b) | ||
265 | => a -> NodeAddr ip -> DHT ip b | ||
266 | q <@> addr = snd <$> queryNode addr q | ||
267 | {-# INLINE (<@>) #-} | ||
268 | |||
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 8705a5a2..fee52380 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -103,14 +103,14 @@ data Routing ip result | |||
103 | | Done result | 103 | | Done result |
104 | | GetTime ( Timestamp -> Routing ip result) | 104 | | GetTime ( Timestamp -> Routing ip result) |
105 | | NeedPing (NodeAddr ip) ( Bool -> Routing ip result) | 105 | | NeedPing (NodeAddr ip) ( Bool -> Routing ip result) |
106 | | Refresh NodeId ([NodeInfo ip] -> Routing ip result) | 106 | | Refresh NodeId (Routing ip result) |
107 | 107 | ||
108 | instance Functor (Routing ip) where | 108 | instance Functor (Routing ip) where |
109 | fmap _ Full = Full | 109 | fmap _ Full = Full |
110 | fmap f (Done r) = Done ( f r) | 110 | fmap f (Done r) = Done ( f r) |
111 | fmap f (GetTime g) = GetTime (fmap f . g) | 111 | fmap f (GetTime g) = GetTime (fmap f . g) |
112 | fmap f (NeedPing addr g) = NeedPing addr (fmap f . g) | 112 | fmap f (NeedPing addr g) = NeedPing addr (fmap f . g) |
113 | fmap f (Refresh nid g) = Refresh nid (fmap f . g) | 113 | fmap f (Refresh nid g) = Refresh nid (fmap f g) |
114 | 114 | ||
115 | instance Monad (Routing ip) where | 115 | instance Monad (Routing ip) where |
116 | return = Done | 116 | return = Done |
@@ -119,7 +119,7 @@ instance Monad (Routing ip) where | |||
119 | Done r >>= m = m r | 119 | Done r >>= m = m r |
120 | GetTime f >>= m = GetTime $ \ t -> f t >>= m | 120 | GetTime f >>= m = GetTime $ \ t -> f t >>= m |
121 | NeedPing a f >>= m = NeedPing a $ \ p -> f p >>= m | 121 | NeedPing a f >>= m = NeedPing a $ \ p -> f p >>= m |
122 | Refresh n f >>= m = Refresh n $ \ i -> f i >>= m | 122 | Refresh n f >>= m = Refresh n $ f >>= m |
123 | 123 | ||
124 | instance Applicative (Routing ip) where | 124 | instance Applicative (Routing ip) where |
125 | pure = return | 125 | pure = return |
@@ -132,15 +132,15 @@ instance Alternative (Routing ip) where | |||
132 | Done a <|> _ = Done a | 132 | Done a <|> _ = Done a |
133 | GetTime f <|> m = GetTime $ \ t -> f t <|> m | 133 | GetTime f <|> m = GetTime $ \ t -> f t <|> m |
134 | NeedPing a f <|> m = NeedPing a $ \ p -> f p <|> m | 134 | NeedPing a f <|> m = NeedPing a $ \ p -> f p <|> m |
135 | Refresh n f <|> m = Refresh n $ \ i -> f i <|> m | 135 | Refresh n f <|> m = Refresh n (f <|> m) |
136 | 136 | ||
137 | -- | Run routing table operation. | 137 | -- | Run routing table operation. |
138 | runRouting :: (Monad m, Eq ip) | 138 | runRouting :: (Monad m, Eq ip) |
139 | => (NodeAddr ip -> m Bool) -- ^ ping the specific node; | 139 | => (NodeAddr ip -> m Bool) -- ^ ping the specific node; |
140 | -> (NodeId -> m [NodeInfo ip]) -- ^ get closest nodes; | 140 | -> (NodeId -> m ()) -- ^ refresh nodes; |
141 | -> m Timestamp -- ^ get current time; | 141 | -> m Timestamp -- ^ get current time; |
142 | -> Routing ip f -- ^ operation to run; | 142 | -> Routing ip f -- ^ operation to run; |
143 | -> m (Maybe f) -- ^ operation result; | 143 | -> m (Maybe f) -- ^ operation result; |
144 | runRouting ping_node find_nodes timestamper = go | 144 | runRouting ping_node find_nodes timestamper = go |
145 | where | 145 | where |
146 | go Full = return (Nothing) | 146 | go Full = return (Nothing) |
@@ -154,8 +154,8 @@ runRouting ping_node find_nodes timestamper = go | |||
154 | go (f pong) | 154 | go (f pong) |
155 | 155 | ||
156 | go (Refresh nid f) = do | 156 | go (Refresh nid f) = do |
157 | infos <- find_nodes nid | 157 | find_nodes nid |
158 | go (f infos) | 158 | go f |
159 | 159 | ||
160 | getTime :: Routing ip Timestamp | 160 | getTime :: Routing ip Timestamp |
161 | getTime = GetTime return | 161 | getTime = GetTime return |
@@ -165,8 +165,8 @@ needPing :: NodeAddr ip -> Routing ip Bool | |||
165 | needPing addr = NeedPing addr return | 165 | needPing addr = NeedPing addr return |
166 | {-# INLINE needPing #-} | 166 | {-# INLINE needPing #-} |
167 | 167 | ||
168 | refresh :: NodeId -> Routing ip [NodeInfo ip] | 168 | refresh :: NodeId -> Routing ip () |
169 | refresh nid = Refresh nid return | 169 | refresh nid = Refresh nid (Done ()) |
170 | {-# INLINE refresh #-} | 170 | {-# INLINE refresh #-} |
171 | 171 | ||
172 | {----------------------------------------------------------------------- | 172 | {----------------------------------------------------------------------- |
@@ -238,10 +238,8 @@ insertBucket curTime info bucket | |||
238 | -- Buckets that have not been changed in 15 minutes should be "refreshed." | 238 | -- Buckets that have not been changed in 15 minutes should be "refreshed." |
239 | | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket | 239 | | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket |
240 | , curTime - lastSeen > delta = do | 240 | , curTime - lastSeen > delta = do |
241 | infos <- refresh nodeId | 241 | refresh nodeId |
242 | refTime <- getTime | 242 | insertBucket curTime info bucket |
243 | let newBucket = L.foldr (\ x -> PSQ.insertWith max x refTime) bucket infos | ||
244 | insertBucket refTime info newBucket | ||
245 | 243 | ||
246 | -- If there are any questionable nodes in the bucket have not been | 244 | -- If there are any questionable nodes in the bucket have not been |
247 | -- seen in the last 15 minutes, the least recently seen node is | 245 | -- seen in the last 15 minutes, the least recently seen node is |
@@ -260,7 +258,7 @@ insertBucket curTime info bucket | |||
260 | return $ PSQ.insert info curTime bucket | 258 | return $ PSQ.insert info curTime bucket |
261 | 259 | ||
262 | -- When the bucket is full of good nodes, the new node is simply discarded. | 260 | -- When the bucket is full of good nodes, the new node is simply discarded. |
263 | | otherwise = A.empty | 261 | | otherwise = Full |
264 | 262 | ||
265 | insertNode :: Eq ip => NodeInfo ip -> Bucket ip -> ip `Routing` Bucket ip | 263 | insertNode :: Eq ip => NodeInfo ip -> Bucket ip -> ip `Routing` Bucket ip |
266 | insertNode info bucket = do | 264 | insertNode info bucket = do |
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index ffce47de..2bb3ce85 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -30,6 +30,7 @@ module Network.BitTorrent.DHT.Session | |||
30 | , Node | 30 | , Node |
31 | , options | 31 | , options |
32 | , thisNodeId | 32 | , thisNodeId |
33 | , routingTable | ||
33 | 34 | ||
34 | -- ** Initialization | 35 | -- ** Initialization |
35 | , LogFun | 36 | , LogFun |
@@ -50,7 +51,6 @@ module Network.BitTorrent.DHT.Session | |||
50 | -- ** Routing table | 51 | -- ** Routing table |
51 | , getTable | 52 | , getTable |
52 | , getClosest | 53 | , getClosest |
53 | , insertNode | ||
54 | 54 | ||
55 | -- ** Peer storage | 55 | -- ** Peer storage |
56 | , insertPeer | 56 | , insertPeer |
@@ -59,9 +59,7 @@ module Network.BitTorrent.DHT.Session | |||
59 | , deleteTopic | 59 | , deleteTopic |
60 | 60 | ||
61 | -- ** Messaging | 61 | -- ** Messaging |
62 | , queryNode | ||
63 | , queryParallel | 62 | , queryParallel |
64 | , (<@>) | ||
65 | ) where | 63 | ) where |
66 | 64 | ||
67 | import Prelude hiding (ioError) | 65 | import Prelude hiding (ioError) |
@@ -347,35 +345,9 @@ runDHT node action = runReaderT (unDHT action) node | |||
347 | -- Routing | 345 | -- Routing |
348 | -----------------------------------------------------------------------} | 346 | -----------------------------------------------------------------------} |
349 | 347 | ||
350 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) | ||
351 | routing = runRouting probeNode refreshNodes getTimestamp | ||
352 | |||
353 | probeNode :: Address ip => NodeAddr ip -> DHT ip Bool | ||
354 | probeNode addr = do | ||
355 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) | ||
356 | result <- try $ Ping <@> addr | ||
357 | let _ = result :: Either SomeException Ping | ||
358 | return $ either (const False) (const True) result | ||
359 | |||
360 | -- /pick a random ID/ in the range of the bucket and perform a | 348 | -- /pick a random ID/ in the range of the bucket and perform a |
361 | -- find_nodes search on it. | 349 | -- find_nodes search on it. |
362 | 350 | ||
363 | -- FIXME do not use getClosest sinse we should /refresh/ them | ||
364 | refreshNodes :: Address ip => NodeId -> DHT ip [NodeInfo ip] | ||
365 | refreshNodes nid = do | ||
366 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | ||
367 | nodes <- getClosest nid | ||
368 | nss <- forM (nodeAddr <$> nodes) $ \ addr -> do | ||
369 | NodeFound ns <- FindNode nid <@> addr | ||
370 | return ns | ||
371 | return $ L.concat nss | ||
372 | |||
373 | getTimestamp :: DHT ip Timestamp | ||
374 | getTimestamp = do | ||
375 | utcTime <- liftIO $ getCurrentTime | ||
376 | $(logDebugS) "routing.make_timestamp" (T.pack (render (pPrint utcTime))) | ||
377 | return $ utcTimeToPOSIXSeconds utcTime | ||
378 | |||
379 | {----------------------------------------------------------------------- | 351 | {----------------------------------------------------------------------- |
380 | -- Tokens | 352 | -- Tokens |
381 | -----------------------------------------------------------------------} | 353 | -----------------------------------------------------------------------} |
@@ -421,24 +393,6 @@ getClosest node = do | |||
421 | k <- asks (optK . options) | 393 | k <- asks (optK . options) |
422 | kclosest k node <$> getTable | 394 | kclosest k node <$> getTable |
423 | 395 | ||
424 | -- | This operation do not block but acquire exclusive access to | ||
425 | -- routing table. | ||
426 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId | ||
427 | insertNode info = fork $ do | ||
428 | var <- asks routingTable | ||
429 | modifyMVar_ var $ \ t -> do | ||
430 | result <- routing (R.insert info t) | ||
431 | case result of | ||
432 | Nothing -> do | ||
433 | $(logDebugS) "insertNode" $ "Routing table is full: " | ||
434 | <> T.pack (show (pPrint t)) | ||
435 | return t | ||
436 | Just t' -> do | ||
437 | let logMsg = "Routing table updated: " | ||
438 | <> pPrint t <> " -> " <> pPrint t' | ||
439 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | ||
440 | return t' | ||
441 | |||
442 | {----------------------------------------------------------------------- | 396 | {----------------------------------------------------------------------- |
443 | -- Peer storage | 397 | -- Peer storage |
444 | -----------------------------------------------------------------------} | 398 | -----------------------------------------------------------------------} |
@@ -487,22 +441,6 @@ deleteTopic ih p = do | |||
487 | -- Messaging | 441 | -- Messaging |
488 | -----------------------------------------------------------------------} | 442 | -----------------------------------------------------------------------} |
489 | 443 | ||
490 | -- | Throws exception if node is not responding. | ||
491 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | ||
492 | => NodeAddr ip -> a -> DHT ip (NodeId, b) | ||
493 | queryNode addr q = do | ||
494 | nid <- asks thisNodeId | ||
495 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | ||
496 | Response remoteId r <- query (toSockAddr addr) (Query nid read_only q) | ||
497 | insertNode (NodeInfo remoteId addr) | ||
498 | return (remoteId, r) | ||
499 | |||
500 | -- | Infix version of 'queryNode' function. | ||
501 | (<@>) :: Address ip => KRPC (Query a) (Response b) | ||
502 | => a -> NodeAddr ip -> DHT ip b | ||
503 | q <@> addr = snd <$> queryNode addr q | ||
504 | {-# INLINE (<@>) #-} | ||
505 | |||
506 | -- TODO: use alpha | 444 | -- TODO: use alpha |
507 | -- | Failed queries are ignored. | 445 | -- | Failed queries are ignored. |
508 | queryParallel :: [DHT ip a] -> DHT ip [a] | 446 | queryParallel :: [DHT ip a] -> DHT ip [a] |