summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-01-04 00:54:49 -0500
committerjoe <joe@jerkface.net>2017-01-04 00:54:49 -0500
commit2fd473635dba00f7af37401058522a29460392fc (patch)
tree25e90513b252b5361201c524175851b63e12ca06 /src/Network
parent19aa76afa7349cc3c91111b38ab3012f63380433 (diff)
Made node refresh into full iterative search.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs94
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs34
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs64
3 files changed, 107 insertions, 85 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index d51ab505..c5fcccb4 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -9,8 +9,9 @@
9-- Normally, you don't need to import this module, use 9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead. 10-- "Network.BitTorrent.DHT" instead.
11-- 11--
12{-# LANGUAGE FlexibleContexts #-} 12{-# LANGUAGE FlexibleContexts #-}
13{-# LANGUAGE TemplateHaskell #-} 13{-# LANGUAGE ScopedTypeVariables #-}
14{-# LANGUAGE TemplateHaskell #-}
14module Network.BitTorrent.DHT.Query 15module Network.BitTorrent.DHT.Query
15 ( -- * Handler 16 ( -- * Handler
16 -- | To bind specific set of handlers you need to pass 17 -- | To bind specific set of handlers you need to pass
@@ -38,6 +39,14 @@ module Network.BitTorrent.DHT.Query
38 , Search 39 , Search
39 , search 40 , search
40 , publish 41 , publish
42
43 -- ** Routing table
44 , insertNode
45 , refreshNodes
46
47 -- ** Messaging
48 , queryNode
49 , (<@>)
41 ) where 50 ) where
42 51
43import Control.Applicative 52import Control.Applicative
@@ -54,12 +63,14 @@ import Data.Text as T
54import Network 63import Network
55import Text.PrettyPrint as PP hiding ((<>), ($$)) 64import Text.PrettyPrint as PP hiding ((<>), ($$))
56import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) 65import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
66import Data.Time
67import Data.Time.Clock.POSIX
57 68
58import Network.KRPC hiding (Options, def) 69import Network.KRPC hiding (Options, def)
59import Data.Torrent 70import Data.Torrent
60import Network.BitTorrent.Address 71import Network.BitTorrent.Address
61import Network.BitTorrent.DHT.Message 72import Network.BitTorrent.DHT.Message
62import Network.BitTorrent.DHT.Routing 73import Network.BitTorrent.DHT.Routing as R
63import Network.BitTorrent.DHT.Session 74import Network.BitTorrent.DHT.Session
64 75
65{----------------------------------------------------------------------- 76{-----------------------------------------------------------------------
@@ -165,7 +176,7 @@ search _ action = do
165 $(logWarnS) "search" "start query" 176 $(logWarnS) "search" "start query"
166 responses <- lift $ queryParallel (action <$> batch) 177 responses <- lift $ queryParallel (action <$> batch)
167 let (nodes, results) = partitionEithers responses 178 let (nodes, results) = partitionEithers responses
168 $(logWarnS) "search" ("done query more:" <> T.pack (show $ L.length nodes)) 179 $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results)))
169 leftover $ L.concat nodes 180 leftover $ L.concat nodes
170 mapM_ yield results 181 mapM_ yield results
171 182
@@ -180,3 +191,78 @@ republish :: DHT ip ThreadId
180republish = fork $ do 191republish = fork $ do
181 i <- asks (optReannounce . options) 192 i <- asks (optReannounce . options)
182 error "DHT.republish: not implemented" 193 error "DHT.republish: not implemented"
194
195routing :: Address ip => Routing ip a -> DHT ip (Maybe a)
196routing = runRouting probeNode refreshNodes getTimestamp
197
198getTimestamp :: DHT ip Timestamp
199getTimestamp = do
200 utcTime <- liftIO $ getCurrentTime
201 $(logDebugS) "routing.make_timestamp" (T.pack (render (pPrint utcTime)))
202 return $ utcTimeToPOSIXSeconds utcTime
203
204
205probeNode :: Address ip => NodeAddr ip -> DHT ip Bool
206probeNode addr = do
207 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr)))
208 result <- try $ Ping <@> addr
209 let _ = result :: Either SomeException Ping
210 return $ either (const False) (const True) result
211
212
213-- FIXME do not use getClosest sinse we should /refresh/ them
214refreshNodes :: Address ip => NodeId -> DHT ip () -- [NodeInfo ip]
215refreshNodes nid = do
216 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
217 nodes <- getClosest nid
218 do
219 -- forM (L.take 1 nodes) $ \ addr -> do
220 -- NodeFound ns <- FindNode nid <@> addr
221 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo ip] (DHT ip) ()
222 -- Actual type: ConduitM [NodeInfo ip] [NodeInfo ip] (DHT ip) ()
223 -- nss <- sourceList [[addr]] $= search nid (findNodeQ nid) $$ C.consume
224 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume
225 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes."
226 queryParallel $ flip L.map (L.concat nss) $ \n -> do
227 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
228 pingQ (nodeAddr n)
229 insertNode n
230 return ()
231 return () -- $ L.concat nss
232
233-- | This operation do not block but acquire exclusive access to
234-- routing table.
235insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId
236insertNode info = fork $ do
237 var <- asks routingTable
238 t <- takeMVar var
239 t' <- do -- modifyMVar_ var $ \ t -> do
240 result <- routing (R.insert info t)
241 case result of
242 Nothing -> do
243 $(logDebugS) "insertNode" $ "Routing table is full: "
244 <> T.pack (show (pPrint t))
245 return t
246 Just t' -> do
247 let logMsg = "Routing table updated: "
248 <> pPrint t <> " -> " <> pPrint t'
249 $(logDebugS) "insertNode" (T.pack (render logMsg))
250 return t'
251 putMVar var t'
252
253-- | Throws exception if node is not responding.
254queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
255 => NodeAddr ip -> a -> DHT ip (NodeId, b)
256queryNode addr q = do
257 nid <- asks thisNodeId
258 let read_only = False -- TODO: check for NAT issues. (BEP 43)
259 Response remoteId r <- query (toSockAddr addr) (Query nid read_only q)
260 insertNode (NodeInfo remoteId addr)
261 return (remoteId, r)
262
263-- | Infix version of 'queryNode' function.
264(<@>) :: Address ip => KRPC (Query a) (Response b)
265 => a -> NodeAddr ip -> DHT ip b
266q <@> addr = snd <$> queryNode addr q
267{-# INLINE (<@>) #-}
268
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 8705a5a2..fee52380 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -103,14 +103,14 @@ data Routing ip result
103 | Done result 103 | Done result
104 | GetTime ( Timestamp -> Routing ip result) 104 | GetTime ( Timestamp -> Routing ip result)
105 | NeedPing (NodeAddr ip) ( Bool -> Routing ip result) 105 | NeedPing (NodeAddr ip) ( Bool -> Routing ip result)
106 | Refresh NodeId ([NodeInfo ip] -> Routing ip result) 106 | Refresh NodeId (Routing ip result)
107 107
108instance Functor (Routing ip) where 108instance Functor (Routing ip) where
109 fmap _ Full = Full 109 fmap _ Full = Full
110 fmap f (Done r) = Done ( f r) 110 fmap f (Done r) = Done ( f r)
111 fmap f (GetTime g) = GetTime (fmap f . g) 111 fmap f (GetTime g) = GetTime (fmap f . g)
112 fmap f (NeedPing addr g) = NeedPing addr (fmap f . g) 112 fmap f (NeedPing addr g) = NeedPing addr (fmap f . g)
113 fmap f (Refresh nid g) = Refresh nid (fmap f . g) 113 fmap f (Refresh nid g) = Refresh nid (fmap f g)
114 114
115instance Monad (Routing ip) where 115instance Monad (Routing ip) where
116 return = Done 116 return = Done
@@ -119,7 +119,7 @@ instance Monad (Routing ip) where
119 Done r >>= m = m r 119 Done r >>= m = m r
120 GetTime f >>= m = GetTime $ \ t -> f t >>= m 120 GetTime f >>= m = GetTime $ \ t -> f t >>= m
121 NeedPing a f >>= m = NeedPing a $ \ p -> f p >>= m 121 NeedPing a f >>= m = NeedPing a $ \ p -> f p >>= m
122 Refresh n f >>= m = Refresh n $ \ i -> f i >>= m 122 Refresh n f >>= m = Refresh n $ f >>= m
123 123
124instance Applicative (Routing ip) where 124instance Applicative (Routing ip) where
125 pure = return 125 pure = return
@@ -132,15 +132,15 @@ instance Alternative (Routing ip) where
132 Done a <|> _ = Done a 132 Done a <|> _ = Done a
133 GetTime f <|> m = GetTime $ \ t -> f t <|> m 133 GetTime f <|> m = GetTime $ \ t -> f t <|> m
134 NeedPing a f <|> m = NeedPing a $ \ p -> f p <|> m 134 NeedPing a f <|> m = NeedPing a $ \ p -> f p <|> m
135 Refresh n f <|> m = Refresh n $ \ i -> f i <|> m 135 Refresh n f <|> m = Refresh n (f <|> m)
136 136
137-- | Run routing table operation. 137-- | Run routing table operation.
138runRouting :: (Monad m, Eq ip) 138runRouting :: (Monad m, Eq ip)
139 => (NodeAddr ip -> m Bool) -- ^ ping the specific node; 139 => (NodeAddr ip -> m Bool) -- ^ ping the specific node;
140 -> (NodeId -> m [NodeInfo ip]) -- ^ get closest nodes; 140 -> (NodeId -> m ()) -- ^ refresh nodes;
141 -> m Timestamp -- ^ get current time; 141 -> m Timestamp -- ^ get current time;
142 -> Routing ip f -- ^ operation to run; 142 -> Routing ip f -- ^ operation to run;
143 -> m (Maybe f) -- ^ operation result; 143 -> m (Maybe f) -- ^ operation result;
144runRouting ping_node find_nodes timestamper = go 144runRouting ping_node find_nodes timestamper = go
145 where 145 where
146 go Full = return (Nothing) 146 go Full = return (Nothing)
@@ -154,8 +154,8 @@ runRouting ping_node find_nodes timestamper = go
154 go (f pong) 154 go (f pong)
155 155
156 go (Refresh nid f) = do 156 go (Refresh nid f) = do
157 infos <- find_nodes nid 157 find_nodes nid
158 go (f infos) 158 go f
159 159
160getTime :: Routing ip Timestamp 160getTime :: Routing ip Timestamp
161getTime = GetTime return 161getTime = GetTime return
@@ -165,8 +165,8 @@ needPing :: NodeAddr ip -> Routing ip Bool
165needPing addr = NeedPing addr return 165needPing addr = NeedPing addr return
166{-# INLINE needPing #-} 166{-# INLINE needPing #-}
167 167
168refresh :: NodeId -> Routing ip [NodeInfo ip] 168refresh :: NodeId -> Routing ip ()
169refresh nid = Refresh nid return 169refresh nid = Refresh nid (Done ())
170{-# INLINE refresh #-} 170{-# INLINE refresh #-}
171 171
172{----------------------------------------------------------------------- 172{-----------------------------------------------------------------------
@@ -238,10 +238,8 @@ insertBucket curTime info bucket
238 -- Buckets that have not been changed in 15 minutes should be "refreshed." 238 -- Buckets that have not been changed in 15 minutes should be "refreshed."
239 | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket 239 | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket
240 , curTime - lastSeen > delta = do 240 , curTime - lastSeen > delta = do
241 infos <- refresh nodeId 241 refresh nodeId
242 refTime <- getTime 242 insertBucket curTime info bucket
243 let newBucket = L.foldr (\ x -> PSQ.insertWith max x refTime) bucket infos
244 insertBucket refTime info newBucket
245 243
246 -- If there are any questionable nodes in the bucket have not been 244 -- If there are any questionable nodes in the bucket have not been
247 -- seen in the last 15 minutes, the least recently seen node is 245 -- seen in the last 15 minutes, the least recently seen node is
@@ -260,7 +258,7 @@ insertBucket curTime info bucket
260 return $ PSQ.insert info curTime bucket 258 return $ PSQ.insert info curTime bucket
261 259
262 -- When the bucket is full of good nodes, the new node is simply discarded. 260 -- When the bucket is full of good nodes, the new node is simply discarded.
263 | otherwise = A.empty 261 | otherwise = Full
264 262
265insertNode :: Eq ip => NodeInfo ip -> Bucket ip -> ip `Routing` Bucket ip 263insertNode :: Eq ip => NodeInfo ip -> Bucket ip -> ip `Routing` Bucket ip
266insertNode info bucket = do 264insertNode info bucket = do
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index ffce47de..2bb3ce85 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -30,6 +30,7 @@ module Network.BitTorrent.DHT.Session
30 , Node 30 , Node
31 , options 31 , options
32 , thisNodeId 32 , thisNodeId
33 , routingTable
33 34
34 -- ** Initialization 35 -- ** Initialization
35 , LogFun 36 , LogFun
@@ -50,7 +51,6 @@ module Network.BitTorrent.DHT.Session
50 -- ** Routing table 51 -- ** Routing table
51 , getTable 52 , getTable
52 , getClosest 53 , getClosest
53 , insertNode
54 54
55 -- ** Peer storage 55 -- ** Peer storage
56 , insertPeer 56 , insertPeer
@@ -59,9 +59,7 @@ module Network.BitTorrent.DHT.Session
59 , deleteTopic 59 , deleteTopic
60 60
61 -- ** Messaging 61 -- ** Messaging
62 , queryNode
63 , queryParallel 62 , queryParallel
64 , (<@>)
65 ) where 63 ) where
66 64
67import Prelude hiding (ioError) 65import Prelude hiding (ioError)
@@ -347,35 +345,9 @@ runDHT node action = runReaderT (unDHT action) node
347-- Routing 345-- Routing
348-----------------------------------------------------------------------} 346-----------------------------------------------------------------------}
349 347
350routing :: Address ip => Routing ip a -> DHT ip (Maybe a)
351routing = runRouting probeNode refreshNodes getTimestamp
352
353probeNode :: Address ip => NodeAddr ip -> DHT ip Bool
354probeNode addr = do
355 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr)))
356 result <- try $ Ping <@> addr
357 let _ = result :: Either SomeException Ping
358 return $ either (const False) (const True) result
359
360-- /pick a random ID/ in the range of the bucket and perform a 348-- /pick a random ID/ in the range of the bucket and perform a
361-- find_nodes search on it. 349-- find_nodes search on it.
362 350
363-- FIXME do not use getClosest sinse we should /refresh/ them
364refreshNodes :: Address ip => NodeId -> DHT ip [NodeInfo ip]
365refreshNodes nid = do
366 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
367 nodes <- getClosest nid
368 nss <- forM (nodeAddr <$> nodes) $ \ addr -> do
369 NodeFound ns <- FindNode nid <@> addr
370 return ns
371 return $ L.concat nss
372
373getTimestamp :: DHT ip Timestamp
374getTimestamp = do
375 utcTime <- liftIO $ getCurrentTime
376 $(logDebugS) "routing.make_timestamp" (T.pack (render (pPrint utcTime)))
377 return $ utcTimeToPOSIXSeconds utcTime
378
379{----------------------------------------------------------------------- 351{-----------------------------------------------------------------------
380-- Tokens 352-- Tokens
381-----------------------------------------------------------------------} 353-----------------------------------------------------------------------}
@@ -421,24 +393,6 @@ getClosest node = do
421 k <- asks (optK . options) 393 k <- asks (optK . options)
422 kclosest k node <$> getTable 394 kclosest k node <$> getTable
423 395
424-- | This operation do not block but acquire exclusive access to
425-- routing table.
426insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId
427insertNode info = fork $ do
428 var <- asks routingTable
429 modifyMVar_ var $ \ t -> do
430 result <- routing (R.insert info t)
431 case result of
432 Nothing -> do
433 $(logDebugS) "insertNode" $ "Routing table is full: "
434 <> T.pack (show (pPrint t))
435 return t
436 Just t' -> do
437 let logMsg = "Routing table updated: "
438 <> pPrint t <> " -> " <> pPrint t'
439 $(logDebugS) "insertNode" (T.pack (render logMsg))
440 return t'
441
442{----------------------------------------------------------------------- 396{-----------------------------------------------------------------------
443-- Peer storage 397-- Peer storage
444-----------------------------------------------------------------------} 398-----------------------------------------------------------------------}
@@ -487,22 +441,6 @@ deleteTopic ih p = do
487-- Messaging 441-- Messaging
488-----------------------------------------------------------------------} 442-----------------------------------------------------------------------}
489 443
490-- | Throws exception if node is not responding.
491queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b)
492 => NodeAddr ip -> a -> DHT ip (NodeId, b)
493queryNode addr q = do
494 nid <- asks thisNodeId
495 let read_only = False -- TODO: check for NAT issues. (BEP 43)
496 Response remoteId r <- query (toSockAddr addr) (Query nid read_only q)
497 insertNode (NodeInfo remoteId addr)
498 return (remoteId, r)
499
500-- | Infix version of 'queryNode' function.
501(<@>) :: Address ip => KRPC (Query a) (Response b)
502 => a -> NodeAddr ip -> DHT ip b
503q <@> addr = snd <$> queryNode addr q
504{-# INLINE (<@>) #-}
505
506-- TODO: use alpha 444-- TODO: use alpha
507-- | Failed queries are ignored. 445-- | Failed queries are ignored.
508queryParallel :: [DHT ip a] -> DHT ip [a] 446queryParallel :: [DHT ip a] -> DHT ip [a]