From a18fe8a84025b3f0beb357eba73f37d77244a44a Mon Sep 17 00:00:00 2001 From: joe Date: Sat, 7 Jan 2017 20:50:33 -0500 Subject: Use BEP 42 compatible node ids. --- src/Network/BitTorrent/DHT/Query.hs | 97 +++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 37 deletions(-) (limited to 'src/Network/BitTorrent/DHT/Query.hs') diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 99d8cdaf..2ddd51ca 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -41,7 +41,6 @@ module Network.BitTorrent.DHT.Query , publish -- ** Routing table - , restoreTable , insertNode , refreshNodes @@ -55,6 +54,7 @@ import Control.Concurrent.Lifted hiding (yield) import Control.Exception.Lifted hiding (Handler) import Control.Monad.Reader import Control.Monad.Logger +import Data.Maybe import Data.Conduit import Data.Conduit.List as C hiding (mapMaybe, mapM_) import Data.Either @@ -68,6 +68,7 @@ import Data.Time import Data.Time.Clock.POSIX import Network.KRPC hiding (Options, def) +import Network.KRPC.Message (ReflectedIP(..)) import Data.Torrent import Network.BitTorrent.Address import Network.BitTorrent.DHT.Message @@ -89,8 +90,10 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId read_only q) -> do -- Do not route read-only nodes. (bep 43) if read_only then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) - else insertNode ni >> return () -- TODO need to block. why? - Response <$> asks thisNodeId <*> action naddr q + else insertNode ni Nothing >> return () -- TODO need to block. why? + Response + <$> myNodeIdAccordingTo naddr + <*> action naddr q -- | Default 'Ping' handler. pingH :: Address ip => NodeHandler ip @@ -134,10 +137,10 @@ type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. -pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) +pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) pingQ addr = do - (nid, Ping) <- queryNode addr Ping - return (NodeInfo nid addr) + (nid, Ping, mip) <- queryNode' addr Ping + return (NodeInfo nid addr, mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at @@ -197,9 +200,6 @@ republish = fork $ do i <- asks (optReannounce . options) error "DHT.republish: not implemented" -routing :: Address ip => Routing ip a -> DHT ip (Maybe a) -routing = runRouting probeNode refreshNodes getTimestamp - getTimestamp :: DHT ip Timestamp getTimestamp = do utcTime <- liftIO $ getCurrentTime @@ -207,12 +207,12 @@ getTimestamp = do return $ utcTimeToPOSIXSeconds utcTime -probeNode :: Address ip => NodeAddr ip -> DHT ip Bool +probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) - result <- try $ Ping <@> addr - let _ = result :: Either SomeException Ping - return $ either (const False) (const True) result + result <- try $ pingQ addr + let _ = fmap (const ()) result :: Either SomeException () + return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result -- FIXME do not use getClosest sinse we should /refresh/ them @@ -231,58 +231,81 @@ refreshNodes nid = do queryParallel $ flip L.map (L.concat nss) $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) pingQ (nodeAddr n) - insertNode n + -- pingQ takes care of inserting the node. return () return () -- $ L.concat nss -- | This operation do not block but acquire exclusive access to -- routing table. -insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId -insertNode info = fork $ do - var <- asks routingTable +insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId +insertNode info witnessed_ip = fork $ do + var <- asks routingInfo tm <- getTimestamp let showTable = do - t <- liftIO $ atomically $ readTVar var + t <- getTable let logMsg = "Routing table: " <> pPrint t $(logDebugS) "insertNode" (T.pack (render logMsg)) t <- liftIO $ atomically $ readTVar var let arrival = TryInsert info arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) $(logDebugS) "insertNode" $ T.pack (show arrival4) - ps <- liftIO $ atomically $ do - t <- readTVar var - (ps,t') <- R.insert tm arrival t - writeTVar var t' - return ps + maxbuckets <- asks (optBucketCount . options) + fallbackid <- asks tentativeNodeId + let atomicInsert arrival witnessed_ip = do + minfo <- readTVar var + let change ip = fromMaybe fallbackid + $ listToMaybe + $ rank id (nodeId $ foreignNode arrival) + $ bep42s ip fallbackid + case minfo of + Just info -> do + (ps,t') <- R.insert tm arrival $ myBuckets info + -- TODO: Check witnessed_ip against myAddress. + -- If 3 nodes witness a different address, change the table. + -- Require these witnesses satisfy bep-42 and that their + -- first 3 bits are unique. + writeTVar var $ Just $ info { myBuckets = t' } + return ps + -- Ignore non-witnessing nodes until somebody tells + -- us our ip address. + Nothing -> fromMaybe (return []) $ do + ReflectedIP ip0 <- witnessed_ip + ip <- fromSockAddr ip0 + let nil = nullTable (change ip) maxbuckets + return $ do + (ps,t') <- R.insert tm arrival nil + writeTVar var $ Just $ R.Info t' (change ip) ip + return ps + ps <- liftIO $ atomically $ atomicInsert arrival witnessed_ip showTable fork $ forM_ ps $ \(CheckPing ns)-> do forM_ ns $ \n -> do - alive <- PingResult n <$> probeNode (nodeAddr n) - let PingResult _ b = alive + (b,mip) <- probeNode (nodeAddr n) + let alive = PingResult n b $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) tm <- getTimestamp - liftIO $ atomically $ do - t <- readTVar var - (_,t') <- R.insert tm alive t - writeTVar var t' + liftIO $ atomically $ atomicInsert alive mip showTable return () -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) => NodeAddr ip -> a -> DHT ip (NodeId, b) -queryNode addr q = do - nid <- asks thisNodeId +queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q + +queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) + => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) +queryNode' addr q = do + nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) - Response remoteId r <- query (toSockAddr addr) (Query nid read_only q) - insertNode (NodeInfo remoteId addr) - return (remoteId, r) + (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) + $(logInfoS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) + <> " by " <> T.pack (show (toSockAddr addr)) + insertNode (NodeInfo remoteId addr) witnessed_ip + return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. (<@>) :: Address ip => KRPC (Query a) (Response b) => a -> NodeAddr ip -> DHT ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-} - -restoreTable :: Address ip => Table ip -> DHT ip () -restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl -- cgit v1.2.3