From a18fe8a84025b3f0beb357eba73f37d77244a44a Mon Sep 17 00:00:00 2001 From: joe Date: Sat, 7 Jan 2017 20:50:33 -0500 Subject: Use BEP 42 compatible node ids. --- src/Network/BitTorrent/Address.hs | 20 ++++++-- src/Network/BitTorrent/DHT.hs | 61 ++++++++++++---------- src/Network/BitTorrent/DHT/Query.hs | 97 ++++++++++++++++++++++------------- src/Network/BitTorrent/DHT/Routing.hs | 18 +++++-- src/Network/BitTorrent/DHT/Session.hs | 43 +++++++++++----- 5 files changed, 154 insertions(+), 85 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/Address.hs b/src/Network/BitTorrent/Address.hs index 7ef837db..fea7139d 100644 --- a/src/Network/BitTorrent/Address.hs +++ b/src/Network/BitTorrent/Address.hs @@ -65,6 +65,7 @@ module Network.BitTorrent.Address , bucketRange , genBucketSample , bep42 + , bep42s -- ** Info , NodeAddr (..) @@ -102,7 +103,7 @@ import Data.Foldable import Data.IP import Data.List as L import Data.List.Split as L -import Data.Maybe (fromMaybe, catMaybes) +import Data.Maybe (fromMaybe, catMaybes, mapMaybe) import Data.Monoid import Data.Hashable import Data.Ord @@ -813,8 +814,8 @@ instance Pretty ip => Pretty [NodeInfo ip] where pPrint = PP.vcat . PP.punctuate "," . L.map pPrint -- | Order by closeness: nearest nodes first. -rank :: Eq ip => NodeId -> [NodeInfo ip] -> [NodeInfo ip] -rank nid = L.sortBy (comparing (distance nid . nodeId)) +rank :: (x -> NodeId) -> NodeId -> [x] -> [x] +rank f nid = L.sortBy (comparing (distance nid . f)) {----------------------------------------------------------------------- -- Fingerprint @@ -1224,6 +1225,15 @@ fingerprint pid = either (const def) id $ runGet getCI (getPeerId pid) return $ Version (catMaybes $ L.map decodeShadowVerNr str) [] +-- | Yields all 8 DHT neighborhoods available to you given a particular ip +-- address. +bep42s :: Address a => a -> NodeId -> [NodeId] +bep42s addr (NodeId r) = mapMaybe (bep42 addr) rs + where + rs = L.map (NodeId . change3bits r) [0..7] + +change3bits bs n = BS.snoc (BS.init bs) (BS.last bs .&. 0xF8 .|. n) + -- | Modifies a purely random 'NodeId' to one that is related to a given -- routable address in accordance with BEP 42. bep42 :: Address a => a -> NodeId -> Maybe NodeId @@ -1236,11 +1246,11 @@ bep42 addr (NodeId r) where ip4mask = "\x03\x0f\x3f\xff" :: ByteString ip6mask = "\x01\x03\x07\x0f\x1f\x3f\x7f\xff" :: ByteString - rbyte = BS.last r + nbhood_select = BS.last r .&. 7 retr n = pure $ BS.drop (BS.length r - n) r crc = S.encode . crc32c . BS.pack masked ip = case BS.zipWith (.&.) msk ip of - (b:bs) -> (b .|. shiftL (rbyte .&. 7) 5) : bs + (b:bs) -> (b .|. shiftL nbhood_select 5) : bs bs -> bs where msk | BS.length ip == 4 = ip4mask | otherwise = ip6mask diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 1c62a0e0..3867d182 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -33,7 +33,6 @@ module Network.BitTorrent.DHT -- * Initialization , snapshot - , restore -- * Operations , Network.BitTorrent.DHT.lookup @@ -162,24 +161,39 @@ resolveHostName NodeAddr {..} = do -- -- This operation do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () -bootstrap startNodes = do +bootstrap :: Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () +bootstrap mbs startNodes = do + restored <- + case decode <$> mbs of + Just (Right tbl) -> return (T.toList tbl) + Just (Left e) -> do $(logWarnS) "restore" (Text.pack e) + return [] + Nothing -> return [] + $(logInfoS) "bootstrap" "Start node bootstrapping" - nid <- asks thisNodeId - let searchAll aliveNodes - = C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume + let searchAll aliveNodes = do + nid <- myNodeIdAccordingTo (error "FIXME") + C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume + input_nodes <- (restored ++) . T.toList <$> getTable -- Step 1: Use iterative searches to flesh out the table.. - do knowns <- map (map $ nodeAddr . fst) . T.toList <$> getTable - alive_knowns <- queryParallel (pingQ <$> concat knowns) - nss <- searchAll alive_knowns - -- We only use the supplied bootstrap nodes when we don't know of any - -- others to try. - when (null nss) $ do - -- TODO filter duplicated in startNodes list - -- TODO retransmissions for startNodes - aliveNodes <- queryParallel (pingQ <$> startNodes) - _ <- searchAll aliveNodes - return () + do let knowns = map (map $ nodeAddr . fst) input_nodes + -- Below, we reverse the nodes since the table serialization puts the + -- nearest nodes last and we want to choose a similar node id to bootstrap + -- faster. + (alive_knowns,_) <- unzip <$> queryParallel (pingQ <$> reverse (concat knowns)) + b <- isBootstrapped + -- If our cached nodes are alive and our IP address did not change, it's possible + -- we are already bootsrapped, so no need to do any searches. + when (not b) $ do + nss <- searchAll $ take 2 alive_knowns + -- We only use the supplied bootstrap nodes when we don't know of any + -- others to try. + when (null nss) $ do + -- TODO filter duplicated in startNodes list + -- TODO retransmissions for startNodes + (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) + _ <- searchAll $ take 2 aliveNodes + return () -- Step 2: Repeatedly refresh incomplete buckets until the table is full. maxbuckets <- asks $ optBucketCount . options flip fix 0 $ \loop icnt -> do @@ -195,6 +209,7 @@ bootstrap startNodes = do p:ps -> p:unfull ps [] -> [] forM_ us $ \(is_last,(index,_)) -> do + nid <- myNodeIdAccordingTo (error "FIXME") sample <- liftIO $ genBucketSample nid (bucketRange index is_last) $(logDebugS) "bootstrapping" $ "BOOTSTRAP sample" @@ -213,23 +228,13 @@ bootstrap startNodes = do -- -- This operation do not block. -- -isBootstrapped :: DHT ip Bool +isBootstrapped :: Eq ip => DHT ip Bool isBootstrapped = T.full <$> getTable {----------------------------------------------------------------------- -- Initialization -----------------------------------------------------------------------} --- | Load previous session. (corrupted - exception/ignore ?) --- --- This is blocking operation, use --- 'Control.Concurrent.Async.Lifted.async' if needed. -restore :: Address ip => BS.ByteString -> DHT ip () -restore bs = do - case decode bs of - Right tbl -> restoreTable tbl - Left e -> $(logWarnS) "restore" (Text.pack e) - -- | Serialize current DHT session to byte string. -- -- This is blocking operation, use diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 99d8cdaf..2ddd51ca 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -41,7 +41,6 @@ module Network.BitTorrent.DHT.Query , publish -- ** Routing table - , restoreTable , insertNode , refreshNodes @@ -55,6 +54,7 @@ import Control.Concurrent.Lifted hiding (yield) import Control.Exception.Lifted hiding (Handler) import Control.Monad.Reader import Control.Monad.Logger +import Data.Maybe import Data.Conduit import Data.Conduit.List as C hiding (mapMaybe, mapM_) import Data.Either @@ -68,6 +68,7 @@ import Data.Time import Data.Time.Clock.POSIX import Network.KRPC hiding (Options, def) +import Network.KRPC.Message (ReflectedIP(..)) import Data.Torrent import Network.BitTorrent.Address import Network.BitTorrent.DHT.Message @@ -89,8 +90,10 @@ nodeHandler action = handler $ \ sockAddr (Query remoteId read_only q) -> do -- Do not route read-only nodes. (bep 43) if read_only then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) - else insertNode ni >> return () -- TODO need to block. why? - Response <$> asks thisNodeId <*> action naddr q + else insertNode ni Nothing >> return () -- TODO need to block. why? + Response + <$> myNodeIdAccordingTo naddr + <*> action naddr q -- | Default 'Ping' handler. pingH :: Address ip => NodeHandler ip @@ -134,10 +137,10 @@ type Iteration ip o = NodeInfo ip -> DHT ip (Either [NodeInfo ip] [o ip]) -- | The most basic query. May be used to check if the given node is -- alive or get its 'NodeId'. -pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip) +pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo ip, Maybe ReflectedIP) pingQ addr = do - (nid, Ping) <- queryNode addr Ping - return (NodeInfo nid addr) + (nid, Ping, mip) <- queryNode' addr Ping + return (NodeInfo nid addr, mip) -- TODO [robustness] match range of returned node ids with the -- expected range and either filter bad nodes or discard response at @@ -197,9 +200,6 @@ republish = fork $ do i <- asks (optReannounce . options) error "DHT.republish: not implemented" -routing :: Address ip => Routing ip a -> DHT ip (Maybe a) -routing = runRouting probeNode refreshNodes getTimestamp - getTimestamp :: DHT ip Timestamp getTimestamp = do utcTime <- liftIO $ getCurrentTime @@ -207,12 +207,12 @@ getTimestamp = do return $ utcTimeToPOSIXSeconds utcTime -probeNode :: Address ip => NodeAddr ip -> DHT ip Bool +probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) probeNode addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) - result <- try $ Ping <@> addr - let _ = result :: Either SomeException Ping - return $ either (const False) (const True) result + result <- try $ pingQ addr + let _ = fmap (const ()) result :: Either SomeException () + return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result -- FIXME do not use getClosest sinse we should /refresh/ them @@ -231,58 +231,81 @@ refreshNodes nid = do queryParallel $ flip L.map (L.concat nss) $ \n -> do $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) pingQ (nodeAddr n) - insertNode n + -- pingQ takes care of inserting the node. return () return () -- $ L.concat nss -- | This operation do not block but acquire exclusive access to -- routing table. -insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId -insertNode info = fork $ do - var <- asks routingTable +insertNode :: Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId +insertNode info witnessed_ip = fork $ do + var <- asks routingInfo tm <- getTimestamp let showTable = do - t <- liftIO $ atomically $ readTVar var + t <- getTable let logMsg = "Routing table: " <> pPrint t $(logDebugS) "insertNode" (T.pack (render logMsg)) t <- liftIO $ atomically $ readTVar var let arrival = TryInsert info arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) $(logDebugS) "insertNode" $ T.pack (show arrival4) - ps <- liftIO $ atomically $ do - t <- readTVar var - (ps,t') <- R.insert tm arrival t - writeTVar var t' - return ps + maxbuckets <- asks (optBucketCount . options) + fallbackid <- asks tentativeNodeId + let atomicInsert arrival witnessed_ip = do + minfo <- readTVar var + let change ip = fromMaybe fallbackid + $ listToMaybe + $ rank id (nodeId $ foreignNode arrival) + $ bep42s ip fallbackid + case minfo of + Just info -> do + (ps,t') <- R.insert tm arrival $ myBuckets info + -- TODO: Check witnessed_ip against myAddress. + -- If 3 nodes witness a different address, change the table. + -- Require these witnesses satisfy bep-42 and that their + -- first 3 bits are unique. + writeTVar var $ Just $ info { myBuckets = t' } + return ps + -- Ignore non-witnessing nodes until somebody tells + -- us our ip address. + Nothing -> fromMaybe (return []) $ do + ReflectedIP ip0 <- witnessed_ip + ip <- fromSockAddr ip0 + let nil = nullTable (change ip) maxbuckets + return $ do + (ps,t') <- R.insert tm arrival nil + writeTVar var $ Just $ R.Info t' (change ip) ip + return ps + ps <- liftIO $ atomically $ atomicInsert arrival witnessed_ip showTable fork $ forM_ ps $ \(CheckPing ns)-> do forM_ ns $ \n -> do - alive <- PingResult n <$> probeNode (nodeAddr n) - let PingResult _ b = alive + (b,mip) <- probeNode (nodeAddr n) + let alive = PingResult n b $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) tm <- getTimestamp - liftIO $ atomically $ do - t <- readTVar var - (_,t') <- R.insert tm alive t - writeTVar var t' + liftIO $ atomically $ atomicInsert alive mip showTable return () -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) => NodeAddr ip -> a -> DHT ip (NodeId, b) -queryNode addr q = do - nid <- asks thisNodeId +queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q + +queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) + => NodeAddr ip -> a -> DHT ip (NodeId, b, Maybe ReflectedIP) +queryNode' addr q = do + nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) - Response remoteId r <- query (toSockAddr addr) (Query nid read_only q) - insertNode (NodeInfo remoteId addr) - return (remoteId, r) + (Response remoteId r, witnessed_ip) <- query' (toSockAddr addr) (Query nid read_only q) + $(logInfoS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) + <> " by " <> T.pack (show (toSockAddr addr)) + insertNode (NodeInfo remoteId addr) witnessed_ip + return (remoteId, r, witnessed_ip) -- | Infix version of 'queryNode' function. (<@>) :: Address ip => KRPC (Query a) (Response b) => a -> NodeAddr ip -> DHT ip b q <@> addr = snd <$> queryNode addr q {-# INLINE (<@>) #-} - -restoreTable :: Address ip => Table ip -> DHT ip () -restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index a4da8445..84e4d4ce 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs @@ -22,6 +22,7 @@ module Network.BitTorrent.DHT.Routing ( -- * Table Table + , Info(..) -- * Attributes , BucketCount @@ -343,6 +344,15 @@ type BucketCount = Int defaultBucketCount :: BucketCount defaultBucketCount = 20 +data Info ip = Info + { myBuckets :: Table ip + , myNodeId :: NodeId + , myAddress :: ip + } + deriving (Eq, Show, Generic) + +instance (Eq ip, Serialize ip) => Serialize (Info ip) + -- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ -- 160. The routing table is subdivided into 'Bucket's that each cover -- a portion of the space. An empty table has one bucket with an ID @@ -497,7 +507,7 @@ instance TableKey InfoHash where -- 'find_node' and 'get_peers' queries. kclosest :: Eq ip => TableKey a => K -> a -> Table ip -> [NodeInfo ip] kclosest k (toNodeId -> nid) - = L.take k . rank nid + = L.take k . rank nodeId nid . L.map PSQ.key . PSQ.toList . fromMaybe PSQ.empty . fmap bktNodes . lookupBucket nid @@ -536,8 +546,10 @@ modifyBucket nodeId f = go (0 :: BitIx) <|> go i (splitTip nid n i bucket) -- | Triggering event for atomic table update -data Event ip = TryInsert (NodeInfo ip) - | PingResult (NodeInfo ip) Bool +data Event ip = TryInsert { foreignNode :: NodeInfo ip } + | PingResult { foreignNode :: NodeInfo ip + , ponged :: Bool + } deriving (Eq,Ord,Show) eventId (TryInsert NodeInfo{..}) = nodeId diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 5a8d64ef..44a5f0e9 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -29,8 +29,10 @@ module Network.BitTorrent.DHT.Session -- * Session , Node , options - , thisNodeId - , routingTable + , tentativeNodeId + , myNodeIdAccordingTo + , routingInfo + , routableAddress -- ** Initialization , LogFun @@ -239,14 +241,14 @@ data Node ip = Node -- | Pseudo-unique self-assigned session identifier. This value is -- constant during DHT session and (optionally) between sessions. - , thisNodeId :: !NodeId + , tentativeNodeId :: !NodeId , resources :: !InternalState - , manager :: !(Manager (DHT ip)) -- ^ RPC manager; - , routingTable :: !(TVar (Table ip)) -- ^ search table; - , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; - , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; - , sessionTokens :: !(TVar SessionTokens) -- ^ query session IDs. + , manager :: !(Manager (DHT ip )) -- ^ RPC manager; + , routingInfo :: !(TVar (Maybe (R.Info ip))) -- ^ search table; + , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; + , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; + , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. , loggerFun :: !LogFun } @@ -323,7 +325,7 @@ newNode hs opts naddr logger mbid = do liftIO $ do myId <- maybe genNodeId return mbid node <- Node opts myId s m - <$> atomically (newTVar (nullTable myId (optBucketCount opts))) + <$> atomically (newTVar Nothing) <*> newTVarIO def <*> newTVarIO S.empty <*> (newTVarIO =<< nullSessionTokens) @@ -372,16 +374,33 @@ checkToken addr questionableToken = do toks <- asks sessionTokens >>= liftIO . readTVarIO return $ T.member addr questionableToken (tokenMap toks) + {----------------------------------------------------------------------- -- Routing table -----------------------------------------------------------------------} +-- | This nodes externally routable address reported by remote peers. +routableAddress :: DHT ip (Maybe ip) +routableAddress = do + info <- asks routingInfo >>= liftIO . atomically . readTVar + return $ myAddress <$> info + +-- | The current NodeId that the given remote node should know us by. +myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId +myNodeIdAccordingTo _ = do + info <- asks routingInfo >>= liftIO . atomically . readTVar + fallback <- asks tentativeNodeId + return $ maybe fallback myNodeId info + -- | Get current routing table. Normally you don't need to use this -- function, but it can be usefull for debugging and profiling purposes. -getTable :: DHT ip (Table ip) +getTable :: Eq ip => DHT ip (Table ip) getTable = do - var <- asks routingTable - liftIO (atomically $ readTVar var) + Node { tentativeNodeId = myId + , routingInfo = var + , options = opts } <- ask + let nil = nullTable myId (optBucketCount opts) + liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) -- | Find a set of closest nodes from routing table of this node. (in -- no particular order) -- cgit v1.2.3