diff options
author | joe <joe@jerkface.net> | 2017-01-07 20:50:33 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-08 16:35:59 -0500 |
commit | a18fe8a84025b3f0beb357eba73f37d77244a44a (patch) | |
tree | 6cad0091df7d6aaceaa4f88be0a29fd320a8abba /src/Network/BitTorrent/DHT/Query.hs | |
parent | bcd860aa8816cf52a01c313aecfdcde21fcd2c16 (diff) |
Use BEP 42 compatible node ids.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 97 |
1 files changed, 60 insertions, 37 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 99d8cdaf..2ddd51ca 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -41,7 +41,6 @@ module Network.BitTorrent.DHT.Query | |||
41 | , publish | 41 | , publish |
42 | 42 | ||
43 | -- ** Routing table | 43 | -- ** Routing table |
44 | , restoreTable | ||
45 | , insertNode | 44 | , insertNode |
46 | , refreshNodes | 45 | , refreshNodes |
47 | 46 | ||
@@ -55,6 +54,7 @@ import Control.Concurrent.Lifted hiding (yield) | |||
55 | import Control.Exception.Lifted hiding (Handler) | 54 | import Control.Exception.Lifted hiding (Handler) |
56 | import Control.Monad.Reader | 55 | import Control.Monad.Reader |
57 | import Control.Monad.Logger | 56 | import Control.Monad.Logger |
57 | import Data.Maybe | ||
58 | import Data.Conduit | 58 | import Data.Conduit |
59 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | 59 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) |
60 | import Data.Either | 60 | import Data.Either |
@@ -68,6 +68,7 @@ import Data.Time | |||
68 | import Data.Time.Clock.POSIX | 68 | import Data.Time.Clock.POSIX |
69 | 69 | ||
70 | import Network.KRPC hiding (Options, def) | 70 | import Network.KRPC hiding (Options, def) |
71 | import Network.KRPC.Message (ReflectedIP(..)) | ||
71 | import Data.Torrent | 72 | import Data.Torrent |
72 | import Network.BitTorrent.Address | 73 | import Network.BitTorrent.Address |
73 | import Network.BitTorrent.DHT.Message | 74 | import Network.BitTorrent.DHT.Message |
@@ -89,8 +90,10 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId read_only q) -> do | |||
89 | -- Do not route read-only nodes. (bep 43) | 90 | -- Do not route read-only nodes. (bep 43) |
90 | if read_only | 91 | if read_only |
91 | then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | 92 | then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) |
92 | else insertNode ni >> return () -- TODO need to block. why? | 93 | else insertNode ni Nothing >> return () -- TODO need to block. why? |
93 | Response <$> asks thisNodeId <*> action naddr q | 94 | Response |
95 | <$> myNodeIdAccordingTo naddr | ||
96 | <*> action naddr q | ||
94 | 97 | ||
95 | -- | Default 'Ping' handler. | 98 | -- | Default 'Ping' handler. |
96 | pingH :: Address ip => NodeHandler ip | 99 | pingH :: Address ip => NodeHandler ip |
@@ -134,10 +137,10 @@ type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) | |||
134 | 137 | ||
135 | -- | The most basic query. May be used to check if the given node is | 138 | -- | The most basic query. May be used to check if the given node is |
136 | -- alive or get its 'NodeId'. | 139 | -- alive or get its 'NodeId'. |
137 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) | 140 | pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) |
138 | pingQ addr = do | 141 | pingQ addr = do |
139 | (nid, Ping) <- queryNode addr Ping | 142 | (nid, Ping, mip) <- queryNode' addr Ping |
140 | return (NodeInfo nid addr) | 143 | return (NodeInfo nid addr, mip) |
141 | 144 | ||
142 | -- TODO [robustness] match range of returned node ids with the | 145 | -- TODO [robustness] match range of returned node ids with the |
143 | -- expected range and either filter bad nodes or discard response at | 146 | -- expected range and either filter bad nodes or discard response at |
@@ -197,9 +200,6 @@ republish = fork $ do | |||
197 | i <- asks (optReannounce . options) | 200 | i <- asks (optReannounce . options) |
198 | error "DHT.republish: not implemented" | 201 | error "DHT.republish: not implemented" |
199 | 202 | ||
200 | routing :: Address ip => Routing ip a -> DHT ip (Maybe a) | ||
201 | routing = runRouting probeNode refreshNodes getTimestamp | ||
202 | |||
203 | getTimestamp :: DHT ip Timestamp | 203 | getTimestamp :: DHT ip Timestamp |
204 | getTimestamp = do | 204 | getTimestamp = do |
205 | utcTime <- liftIO $ getCurrentTime | 205 | utcTime <- liftIO $ getCurrentTime |
@@ -207,12 +207,12 @@ getTimestamp = do | |||
207 | return $ utcTimeToPOSIXSeconds utcTime | 207 | return $ utcTimeToPOSIXSeconds utcTime |
208 | 208 | ||
209 | 209 | ||
210 | probeNode :: Address ip => NodeAddr ip -> DHT ip Bool | 210 | probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) |
211 | probeNode addr = do | 211 | probeNode addr = do |
212 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) | 212 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) |
213 | result <- try $ Ping <@> addr | 213 | result <- try $ pingQ addr |
214 | let _ = result :: Either SomeException Ping | 214 | let _ = fmap (const ()) result :: Either SomeException () |
215 | return $ either (const False) (const True) result | 215 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result |
216 | 216 | ||
217 | 217 | ||
218 | -- FIXME do not use getClosest sinse we should /refresh/ them | 218 | -- FIXME do not use getClosest sinse we should /refresh/ them |
@@ -231,58 +231,81 @@ refreshNodes nid = do | |||
231 | queryParallel $ flip L.map (L.concat nss) $ \n -> do | 231 | queryParallel $ flip L.map (L.concat nss) $ \n -> do |
232 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | 232 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) |
233 | pingQ (nodeAddr n) | 233 | pingQ (nodeAddr n) |
234 | insertNode n | 234 | -- pingQ takes care of inserting the node. |
235 | return () | 235 | return () |
236 | return () -- $ L.concat nss | 236 | return () -- $ L.concat nss |
237 | 237 | ||
238 | -- | This operation do not block but acquire exclusive access to | 238 | -- | This operation do not block but acquire exclusive access to |
239 | -- routing table. | 239 | -- routing table. |
240 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId | 240 | insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId |
241 | insertNode info = fork $ do | 241 | insertNode info witnessed_ip = fork $ do |
242 | var <- asks routingTable | 242 | var <- asks routingInfo |
243 | tm <- getTimestamp | 243 | tm <- getTimestamp |
244 | let showTable = do | 244 | let showTable = do |
245 | t <- liftIO $ atomically $ readTVar var | 245 | t <- getTable |
246 | let logMsg = "Routing table: " <> pPrint t | 246 | let logMsg = "Routing table: " <> pPrint t |
247 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | 247 | $(logDebugS) "insertNode" (T.pack (render logMsg)) |
248 | t <- liftIO $ atomically $ readTVar var | 248 | t <- liftIO $ atomically $ readTVar var |
249 | let arrival = TryInsert info | 249 | let arrival = TryInsert info |
250 | arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) | 250 | arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) |
251 | $(logDebugS) "insertNode" $ T.pack (show arrival4) | 251 | $(logDebugS) "insertNode" $ T.pack (show arrival4) |
252 | ps <- liftIO $ atomically $ do | 252 | maxbuckets <- asks (optBucketCount . options) |
253 | t <- readTVar var | 253 | fallbackid <- asks tentativeNodeId |
254 | (ps,t') <- R.insert tm arrival t | 254 | let atomicInsert arrival witnessed_ip = do |
255 | writeTVar var t' | 255 | minfo <- readTVar var |
256 | return ps | 256 | let change ip = fromMaybe fallbackid |
257 | $ listToMaybe | ||
258 | $ rank id (nodeId $ foreignNode arrival) | ||
259 | $ bep42s ip fallbackid | ||
260 | case minfo of | ||
261 | Just info -> do | ||
262 | (ps,t') <- R.insert tm arrival $ myBuckets info | ||
263 | -- TODO: Check witnessed_ip against myAddress. | ||
264 | -- If 3 nodes witness a different address, change the table. | ||
265 | -- Require these witnesses satisfy bep-42 and that their | ||
266 | -- first 3 bits are unique. | ||
267 | writeTVar var $ Just $ info { myBuckets = t' } | ||
268 | return ps | ||
269 | -- Ignore non-witnessing nodes until somebody tells | ||
270 | -- us our ip address. | ||
271 | Nothing -> fromMaybe (return []) $ do | ||
272 | ReflectedIP ip0 <- witnessed_ip | ||
273 | ip <- fromSockAddr ip0 | ||
274 | let nil = nullTable (change ip) maxbuckets | ||
275 | return $ do | ||
276 | (ps,t') <- R.insert tm arrival nil | ||
277 | writeTVar var $ Just $ R.Info t' (change ip) ip | ||
278 | return ps | ||
279 | ps <- liftIO $ atomically $ atomicInsert arrival witnessed_ip | ||
257 | showTable | 280 | showTable |
258 | fork $ forM_ ps $ \(CheckPing ns)-> do | 281 | fork $ forM_ ps $ \(CheckPing ns)-> do |
259 | forM_ ns $ \n -> do | 282 | forM_ ns $ \n -> do |
260 | alive <- PingResult n <$> probeNode (nodeAddr n) | 283 | (b,mip) <- probeNode (nodeAddr n) |
261 | let PingResult _ b = alive | 284 | let alive = PingResult n b |
262 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) | 285 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) |
263 | tm <- getTimestamp | 286 | tm <- getTimestamp |
264 | liftIO $ atomically $ do | 287 | liftIO $ atomically $ atomicInsert alive mip |
265 | t <- readTVar var | ||
266 | (_,t') <- R.insert tm alive t | ||
267 | writeTVar var t' | ||
268 | showTable | 288 | showTable |
269 | return () | 289 | return () |
270 | 290 | ||
271 | -- | Throws exception if node is not responding. | 291 | -- | Throws exception if node is not responding. |
272 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 292 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
273 | => NodeAddr ip -> a -> DHT ip (NodeId, b) | 293 | => NodeAddr ip -> a -> DHT ip (NodeId, b) |
274 | queryNode addr q = do | 294 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q |
275 | nid <- asks thisNodeId | 295 | |
296 | queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) | ||
297 | => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) | ||
298 | queryNode' addr q = do | ||
299 | nid <- myNodeIdAccordingTo addr | ||
276 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | 300 | let read_only = False -- TODO: check for NAT issues. (BEP 43) |
277 | Response remoteId r <- query (toSockAddr addr) (Query nid read_only q) | 301 | (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) |
278 | insertNode (NodeInfo remoteId addr) | 302 | $(logInfoS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) |
279 | return (remoteId, r) | 303 | <> " by " <> T.pack (show (toSockAddr addr)) |
304 | insertNode (NodeInfo remoteId addr) witnessed_ip | ||
305 | return (remoteId, r, witnessed_ip) | ||
280 | 306 | ||
281 | -- | Infix version of 'queryNode' function. | 307 | -- | Infix version of 'queryNode' function. |
282 | (<@>) :: Address ip => KRPC (Query a) (Response b) | 308 | (<@>) :: Address ip => KRPC (Query a) (Response b) |
283 | => a -> NodeAddr ip -> DHT ip b | 309 | => a -> NodeAddr ip -> DHT ip b |
284 | q <@> addr = snd <$> queryNode addr q | 310 | q <@> addr = snd <$> queryNode addr q |
285 | {-# INLINE (<@>) #-} | 311 | {-# INLINE (<@>) #-} |
286 | |||
287 | restoreTable :: Address ip => Table ip -> DHT ip () | ||
288 | restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl | ||