From 012d138b1061d967ef3a05dfb7dc819d199b3902 Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 21 Jun 2017 22:34:40 -0400 Subject: Propogated the deletion of MonadKRPC to Network.BitTorrent.DHT.Query. --- src/Network/BitTorrent/DHT/Query.hs | 73 ++++++++++++------------ src/Network/BitTorrent/DHT/Session.hs | 102 +++++++++++++++++++++++----------- 2 files changed, 109 insertions(+), 66 deletions(-) (limited to 'src/Network/BitTorrent/DHT') diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 68c67900..254b347c 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -80,6 +80,7 @@ import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Time import Data.Time.Clock.POSIX +import Data.Hashable (Hashable) import Network.DatagramServer as KRPC hiding (Options, def) import Network.KRPC.Method as KRPC @@ -109,13 +110,9 @@ import Control.Monad.Trans.Control nodeHandler :: ( Address ip , KRPC (Query a) (Response b) -#ifdef VERSION_bencoding - , KRPC.Envelope (Query a) (Response b) ~ BValue ) -#else - , KPRC.Envelope (Query a) (Response b) ~ ByteString ) -#endif - => QueryMethod KMessageOf -> (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip -nodeHandler method action = handler method $ \ sockAddr qry -> do + ) + => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler +nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do #ifdef VERSION_bencoding let remoteId = queringNodeId qry read_only = queryIsReadOnly qry @@ -131,53 +128,55 @@ nodeHandler method action = handler method $ \ sockAddr qry -> do let ni = NodeInfo remoteId naddr () -- Do not route read-only nodes. (bep 43) if read_only - then $(logWarnS) "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) + then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) else insertNode ni Nothing >> return () -- TODO need to block. why? Response <$> myNodeIdAccordingTo naddr <*> action naddr q -- | Default 'Ping' handler. -pingH :: Address ip => NodeHandler ip -#ifdef VERSION_bencoding -pingH = nodeHandler "ping" $ \ _ Ping -> return Ping -#else -pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } -#endif +pingH :: NodeAddr ip -> Ping -> IO Ping +pingH _ Ping = return Ping +-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } -- | Default 'FindNode' handler. -findNodeH :: Address ip => NodeHandler ip -findNodeH = nodeHandler "find-nodes" $ \ _ (FindNode nid) -> do - NodeFound <$> getClosest nid +findNodeH :: (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> NodeAddr ip -> FindNode ip -> IO (NodeFound ip) +findNodeH getclosest _ (FindNode nid) = NodeFound <$> getclosest nid -#ifdef VERSION_bencoding -- | Default 'GetPeers' handler. -getPeersH :: Ord ip => Address ip => NodeHandler ip -getPeersH = nodeHandler "get_peers" $ \ naddr (GetPeers ih) -> do +getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) +getPeersH getPeerList toks naddr (GetPeers ih) = do ps <- getPeerList ih - tok <- grantToken naddr + tok <- grantToken toks naddr return $ GotPeers ps tok -- | Default 'Announce' handler. -announceH :: Ord ip => Address ip => NodeHandler ip -announceH = nodeHandler "announce_peer" $ \ naddr @ NodeAddr {..} (Announce {..}) -> do - valid <- checkToken naddr sessionToken +announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced +announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do + valid <- checkToken toks naddr sessionToken unless valid $ do throwIO $ InvalidParameter "token" let annPort = if impliedPort then nodePort else port peerAddr = PeerAddr Nothing nodeHost annPort - insertPeer topic announcedName peerAddr + insertPeer peers topic announcedName peerAddr return Announced -- | Includes all default query handlers. -defaultHandlers :: Ord ip => Address ip => [NodeHandler ip] -defaultHandlers = [pingH, findNodeH, getPeersH, announceH] -#else --- | Includes all default query handlers. -defaultHandlers :: Ord ip => Address ip => [NodeHandler ip] -defaultHandlers = [pingH, findNodeH] -#endif +defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT ip [NodeHandler] +defaultHandlers logger = do + groknode <- insertNode1 + toks <- asks sessionTokens + getclosest <- getClosest1 + mynid <- myNodeIdAccordingTo1 + peers <- asks contactInfo + getpeers <- getPeerList1 + let handler :: KRPC (Query a) (Response b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler + handler = nodeHandler groknode mynid (logt logger) + return [ handler "ping" $ pingH + , handler "find-nodes" $ findNodeH getclosest + , handler "get_peers" $ getPeersH getpeers toks + , handler "announce_peer" $ announceH peers toks ] {----------------------------------------------------------------------- -- Basic queries @@ -324,6 +323,11 @@ logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) -- routing table. insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () insertNode info witnessed_ip0 = do + f <- insertNode1 + liftIO $ f info witnessed_ip0 + +insertNode1 :: forall ip. Address ip => DHT ip (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) +insertNode1 = do bc <- optBucketCount <$> asks options nid <- asks tentativeNodeId logm0 <- embed_ (uncurry logc) @@ -349,7 +353,7 @@ insertNode info witnessed_ip0 = do , grokNode = DHT.insertNode params state , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () } - liftIO $ DHT.insertNode params state info witnessed_ip0 + return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 -- | Throws exception if node is not responding. queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) @@ -362,7 +366,8 @@ queryNode' addr q = do nid <- myNodeIdAccordingTo addr let read_only = False -- TODO: check for NAT issues. (BEP 43) let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) - (Response remoteId r, witnessed_ip) <- query' name (toSockAddr addr) (Query nid read_only q) + mgr <- asks manager + (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr name (toSockAddr addr) (Query nid read_only q) -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) -- <> " by " <> T.pack (show (toSockAddr addr)) _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 7e87df6c..d8665773 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -31,12 +31,19 @@ module Network.BitTorrent.DHT.Session , options , tentativeNodeId , myNodeIdAccordingTo + , myNodeIdAccordingTo1 , routingInfo , routableAddress , getTimestamp + , SessionTokens + , sessionTokens + , contactInfo + , PeerStore + , manager -- ** Initialization , LogFun + , logt , NodeHandler , newNode , closeNode @@ -54,11 +61,13 @@ module Network.BitTorrent.DHT.Session -- ** Routing table , getTable , getClosest + , getClosest1 #ifdef VERSION_bencoding -- ** Peer storage , insertPeer , getPeerList + , getPeerList1 , insertTopic , deleteTopic , getSwarms @@ -333,7 +342,7 @@ instance MonadLogger (DHT ip) where liftIO $ logger loc src lvl (toLogStr msg) #ifdef VERSION_bencoding -type NodeHandler ip = Handler IO KMessageOf BValue +type NodeHandler = Handler IO KMessageOf BValue #else type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString #endif @@ -368,10 +377,10 @@ locFromCS cs = case getCallStack cs of -- 'closeNode' function, otherwise socket or other scarce resources may -- leak. newNode :: Address ip - => [NodeHandler ip] -- ^ handlers to run on accepted queries; - -> Options -- ^ various dht options; - -> NodeAddr ip -- ^ node address to bind; - -> LogFun -- ^ + => [NodeHandler] -- ^ handlers to run on accepted queries; + -> Options -- ^ various dht options; + -> NodeAddr ip -- ^ node address to bind; + -> LogFun -- ^ invoked on log messages; #ifdef VERSION_bencoding -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. #else @@ -420,24 +429,23 @@ runDHT node action = runReaderT (unDHT action) node -- Tokens -----------------------------------------------------------------------} -tryUpdateSecret :: DHT ip () -tryUpdateSecret = do +tryUpdateSecret :: TVar SessionTokens -> IO () +tryUpdateSecret toks = do curTime <- liftIO getCurrentTime - toks <- asks sessionTokens liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime) -grantToken :: Hashable a => NodeAddr a -> DHT ip Token -grantToken addr = do - tryUpdateSecret - toks <- asks sessionTokens >>= liftIO . readTVarIO +grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token +grantToken sessionTokens addr = do + tryUpdateSecret sessionTokens + toks <- readTVarIO sessionTokens return $ T.lookup addr $ tokenMap toks -- | Throws 'HandlerError' if the token is invalid or already -- expired. See 'TokenMap' for details. -checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip Bool -checkToken addr questionableToken = do - tryUpdateSecret - toks <- asks sessionTokens >>= liftIO . readTVarIO +checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool +checkToken sessionTokens addr questionableToken = do + tryUpdateSecret sessionTokens + toks <- readTVarIO sessionTokens return $ T.member addr questionableToken (tokenMap toks) @@ -463,6 +471,14 @@ myNodeIdAccordingTo _ = do (return . myNodeId) info +myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) +myNodeIdAccordingTo1 = do + var <- asks routingInfo + tid <- asks tentativeNodeId + return $ \ _ -> do + info <- atomically $ readTVar var + return $ maybe tid myNodeId info + -- | Get current routing table. Normally you don't need to use this -- function, but it can be usefull for debugging and profiling purposes. #ifdef VERSION_bencoding @@ -515,38 +531,48 @@ getClosest node = do k <- asks (optK . options) kclosest k node <$> getTable +getClosest1 :: ( Eq ip + , TableKey KMessageOf k + ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) +getClosest1 = do + k <- asks (optK . options) + nobkts <- asks (optBucketCount . options) + myid <- asks tentativeNodeId + var <- asks routingInfo + return $ \node -> do nfo <- atomically $ readTVar var + let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo + return $ kclosest k node tbl + {----------------------------------------------------------------------- -- Peer storage -----------------------------------------------------------------------} -refreshContacts :: DHT ip () -refreshContacts = +refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO () +refreshContacts var = -- TODO limit dht peer store in size (probably by removing oldest peers) return () -- | Insert peer to peer store. Used to handle announce requests. -insertPeer :: Ord ip => InfoHash -> Maybe ByteString -> PeerAddr ip -> DHT ip () -insertPeer ih name addr = do - refreshContacts - var <- asks contactInfo - liftIO $ atomically $ modifyTVar' var (P.insertPeer ih name addr) +insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO () +insertPeer var ih name addr = do + refreshContacts var + atomically $ modifyTVar' var (P.insertPeer ih name addr) -- | Get peer set for specific swarm. -lookupPeers :: Ord ip => InfoHash -> DHT ip [PeerAddr ip] -lookupPeers ih = do - refreshContacts - var <- asks contactInfo +lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip] +lookupPeers var ih = do + refreshContacts var tm <- getTimestamp - liftIO $ atomically $ do + atomically $ do (ps,store') <- P.freshPeers ih tm <$> readTVar var writeTVar var store' return ps -getTimestamp :: DHT ip Timestamp +getTimestamp :: IO Timestamp getTimestamp = do - utcTime <- liftIO $ getCurrentTime - $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) + utcTime <- getCurrentTime + -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) return $ utcTimeToPOSIXSeconds utcTime @@ -557,11 +583,23 @@ getTimestamp = do -- getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) getPeerList ih = do - ps <- lookupPeers ih + var <- asks contactInfo + ps <- liftIO $ lookupPeers var ih if L.null ps then Left <$> getClosest ih else return (Right ps) +getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) +getPeerList1 = do + var <- asks contactInfo + getclosest <- getClosest1 + return $ \ih -> do + ps <- lookupPeers var ih + if L.null ps + then Left <$> getclosest ih + else return (Right ps) + + insertTopic :: InfoHash -> PortNumber -> DHT ip () insertTopic ih p = do var <- asks announceInfo -- cgit v1.2.3