From e1b2fc9c7a5efd828a8c66f3e3a1d0a547397080 Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 30 Jun 2017 13:21:29 -0400 Subject: It builds! --- src/Network/BitTorrent/DHT.hs | 65 +++++++++++++++++++++++-- src/Network/BitTorrent/DHT/Query.hs | 68 +++++++++++++++++++++++--- src/Network/BitTorrent/DHT/Session.hs | 91 +++++++++-------------------------- src/Network/DHT/Mainline.hs | 76 ++++++++++++++++++++++++++++- src/Network/DHT/Types.hs | 8 ++- src/Network/KRPC/Method.hs | 10 ++-- 6 files changed, 228 insertions(+), 90 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 8bc423a3..6d31eab2 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -76,8 +76,13 @@ import Data.Typeable import Data.Monoid import Network.DatagramServer.Mainline (KMessageOf) import qualified Network.DatagramServer as KRPC (listen, Protocol(..)) - - +import Network.DatagramServer.Types +import Network.DHT.Types +import Data.Bits +import Data.Default +import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) +import Network.KRPC.Method +import Network.BitTorrent.DHT.Query (DataHandlers) {----------------------------------------------------------------------- -- DHT types @@ -96,7 +101,31 @@ fullLogging :: LogSource -> LogLevel -> Bool fullLogging _ _ = True -- | Run DHT on specified port. -dht :: (Ord ip, Address ip) +dht :: + ( Ord ip + , Address ip + , Functor dht + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Serialize (NodeId dht) + , Show (NodeId dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , Eq (QueryMethod dht) + , Show (QueryMethod dht) + , Pretty (NodeInfo dht ip u) + , Kademlia dht + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , DataHandlers raw dht + , WireFormat raw dht + , Show u + , Default u + ) => Options -- ^ normally you need to use 'Data.Default.def'; -> NodeAddr ip -- ^ address to bind this node; -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default @@ -179,7 +208,33 @@ resolveHostName NodeAddr {..} = do -- -- This operation do block, use -- 'Control.Concurrent.Async.Lifted.async' if needed. -bootstrap :: forall raw dht u ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip () +bootstrap :: forall raw dht u ip. + ( Ord ip + , Address ip + , Functor dht + , Ord (NodeId dht) + , FiniteBits (NodeId dht) + , Serialize (NodeId dht) + , Show (NodeId dht) + , Pretty (NodeId dht) + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + , Ord (TransactionID dht) + , Serialize (TransactionID dht) + , Eq (QueryMethod dht) + , Show (QueryMethod dht) + , Pretty (NodeInfo dht ip u) + , Kademlia dht + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , DataHandlers raw dht + , WireFormat raw dht + , Show u + , Default u + , Serialize u + ) => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip () bootstrap mbs startNodes = do restored <- case decode <$> mbs of @@ -192,7 +247,7 @@ bootstrap mbs startNodes = do let searchAll aliveNodes = do nid <- myNodeIdAccordingTo (error "FIXME") ns <- bgsearch ioFindNodes nid - return ( ns :: [NodeInfo KMessageOf ip ()] ) + return ( ns :: [NodeInfo dht ip u] ) input_nodes <- (restored ++) . T.toList <$> getTable -- Step 1: Use iterative searches to flesh out the table.. do let knowns = map (map $ nodeAddr . fst) input_nodes diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index e5d9bd5f..67dc4541 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -16,6 +16,8 @@ {-# LANGUAGE TupleSections #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE GADTs #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE MultiParamTypeClasses #-} module Network.BitTorrent.DHT.Query ( -- * Handler -- | To bind specific set of handlers you need to pass @@ -25,6 +27,7 @@ module Network.BitTorrent.DHT.Query , getPeersH , announceH , defaultHandlers + , DataHandlers -- * Query -- ** Basic @@ -113,6 +116,7 @@ import Data.Serialize import System.IO.Unsafe (unsafeInterleaveIO) import Data.String + {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} @@ -215,20 +219,68 @@ kademliaHandlers logger = do , handler (nameFindNodes dht) $ findNodeH getclosest ] +class DataHandlers raw dht where + dataHandlers :: + ( Ord ip , Hashable ip, Typeable ip, Serialize ip) => + (NodeId dht -> IO [NodeInfo dht ip ()]) + -> DHTData dht ip + -> [MethodHandler raw dht ip] + +instance DataHandlers BValue KMessageOf where + dataHandlers = bthandlers + +bthandlers :: + ( Ord ip , Hashable ip, Typeable ip, Serialize ip) => + (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) + -> DHTData KMessageOf ip + -> [MethodHandler BValue KMessageOf ip] +bthandlers getclosest dta = + [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta) + , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta) + ] + where + getpeers dta ih = do + ps <- lookupPeers (contactInfo dta) ih + if L.null ps + then Left <$> getclosest (toNodeId ih) + else return (Right ps) + +data MethodHandler raw dht ip = + forall a b. ( SerializableTo raw (Response dht b) + , SerializableTo raw (Query dht a) + ) => MethodHandler (QueryMethod dht) (NodeAddr ip -> a -> IO b) -- | Includes all default query handlers. -defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] +defaultHandlers :: forall raw dht u ip. + ( Ord (TransactionID dht) + , Ord (NodeId dht) + , Show u + , SerializableTo raw (Response dht (Ping dht)) + , SerializableTo raw (Query dht (Ping dht)) + , Show (QueryMethod dht) + , Show (NodeId dht) + , FiniteBits (NodeId dht) + , Default u + , Serialize (TransactionID dht) + , WireFormat raw dht + , Kademlia dht + , KRPC (Query dht (Ping dht)) (Response dht (Ping dht)) + , Functor dht + , Pretty (NodeInfo dht ip u) + , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) + , SerializableTo raw (Response dht (NodeFound dht ip)) + , SerializableTo raw (Query dht (FindNode dht ip)) + , Eq ip, Ord ip, Address ip, DataHandlers raw dht + ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] defaultHandlers logger = do groknode <- insertNode1 mynid <- myNodeIdAccordingTo1 - let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler - handler = nodeHandler groknode mynid (logt logger) - toks <- asks sessionTokens - peers <- asks contactInfo - getpeers <- getPeerList1 + let handler :: MethodHandler raw dht ip -> Handler IO dht raw + handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) name action + dta <- asks dhtData + getclosest <- getClosest1 hs <- kademliaHandlers logger - return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks - , handler "announce_peer" $ announceH peers toks ] + return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta) {----------------------------------------------------------------------- -- Basic queries diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index f96ba707..d94f028f 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -35,9 +35,10 @@ module Network.BitTorrent.DHT.Session , routingInfo , routableAddress , getTimestamp - , SessionTokens - , sessionTokens - , contactInfo + -- , SessionTokens + -- , sessionTokens + -- , contactInfo + , dhtData , PeerStore , manager @@ -55,8 +56,8 @@ module Network.BitTorrent.DHT.Session , runDHT -- ** Tokens - , grantToken - , checkToken + -- , grantToken + -- , checkToken -- ** Routing table , getTable @@ -68,6 +69,7 @@ module Network.BitTorrent.DHT.Session , insertPeer , getPeerList , getPeerList1 + , lookupPeers , insertTopic , deleteTopic , getSwarms @@ -113,6 +115,7 @@ import Data.Time.Clock.POSIX import Data.Text as Text import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.Serialize as S +import Network.DHT.Types import Data.Torrent as Torrent @@ -228,33 +231,6 @@ instance Default Options where seconds :: NominalDiffTime -> Int seconds dt = fromEnum (realToFrac dt :: Uni) - -{----------------------------------------------------------------------- --- Tokens policy ------------------------------------------------------------------------} - -data SessionTokens = SessionTokens - { tokenMap :: !TokenMap - , lastUpdate :: !UTCTime - , maxInterval :: !NominalDiffTime - } - -nullSessionTokens :: IO SessionTokens -nullSessionTokens = SessionTokens - <$> (tokens <$> liftIO randomIO) - <*> liftIO getCurrentTime - <*> pure defaultUpdateInterval - --- TODO invalidate *twice* if needed -invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens -invalidateTokens curTime ts @ SessionTokens {..} - | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens - { tokenMap = update tokenMap - , lastUpdate = curTime - , maxInterval = maxInterval - } - | otherwise = ts - {----------------------------------------------------------------------- -- Session -----------------------------------------------------------------------} @@ -277,9 +253,8 @@ data Node raw dht u ip = Node , resources :: !InternalState , manager :: !(Manager raw dht) -- ^ RPC manager; , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; - , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; - , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. + , dhtData :: DHTData dht ip , loggerFun :: !LogFun } @@ -371,6 +346,7 @@ locFromCS cs = case getCallStack cs of newNode :: ( Address ip , FiniteBits (NodeId dht) , Serialize (NodeId dht) + , Kademlia dht ) => -- [NodeHandler] -- ^ handlers to run on accepted queries; Options -- ^ various dht options; @@ -389,12 +365,12 @@ newNode opts naddr logger mbid = do s <- getInternalState (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr []) closeManager liftIO $ do + dta <- initializeDHTData myId <- maybe genNodeId return mbid node <- Node opts myId s m <$> atomically (newTVar Nothing) - <*> newTVarIO def <*> newTVarIO S.empty - <*> (newTVarIO =<< nullSessionTokens) + <*> pure dta <*> pure logger return node @@ -415,29 +391,6 @@ runDHT node action = runReaderT (unDHT action) node -- /pick a random ID/ in the range of the bucket and perform a -- find_nodes search on it. -{----------------------------------------------------------------------- --- Tokens ------------------------------------------------------------------------} - -tryUpdateSecret :: TVar SessionTokens -> IO () -tryUpdateSecret toks = do - curTime <- liftIO getCurrentTime - liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime) - -grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token -grantToken sessionTokens addr = do - tryUpdateSecret sessionTokens - toks <- readTVarIO sessionTokens - return $ T.lookup addr $ tokenMap toks - --- | Throws 'HandlerError' if the token is invalid or already --- expired. See 'TokenMap' for details. -checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool -checkToken sessionTokens addr questionableToken = do - tryUpdateSecret sessionTokens - toks <- readTVarIO sessionTokens - return $ T.member addr questionableToken (tokenMap toks) - {----------------------------------------------------------------------- -- Routing table @@ -475,28 +428,28 @@ getTable = do let nil = nullTable myId (optBucketCount opts) liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) -getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] +getSwarms :: Ord ip => DHT raw KMessageOf u ip [ (InfoHash, Int, Maybe ByteString) ] getSwarms = do - store <- asks contactInfo >>= liftIO . atomically . readTVar + store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar return $ P.knownSwarms store -savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString +savePeerStore :: (Ord ip, Address ip) => DHT raw KMessageOf u ip ByteString savePeerStore = do - var <- asks contactInfo + var <- asks (contactInfo . dhtData) peers <- liftIO $ atomically $ readTVar var return $ S.encode peers -mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () +mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw KMessageOf u ip () mergeSavedPeers bs = do - var <- asks contactInfo + var <- asks (contactInfo . dhtData) case S.decode bs of Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies) Left _ -> return () -allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] +allPeers :: Ord ip => InfoHash -> DHT raw KMessageOf u ip [ PeerAddr ip ] allPeers ih = do - store <- asks contactInfo >>= liftIO . atomically . readTVar + store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar return $ P.lookup ih store -- | Find a set of closest nodes from routing table of this node. (in @@ -566,7 +519,7 @@ getTimestamp = do -- getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) getPeerList ih = do - var <- asks contactInfo + var <- asks (contactInfo . dhtData) ps <- liftIO $ lookupPeers var ih if L.null ps then Left <$> getClosest ih @@ -574,7 +527,7 @@ getPeerList ih = do getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) getPeerList1 = do - var <- asks contactInfo + var <- asks (contactInfo . dhtData) getclosest <- getClosest1 return $ \ih -> do ps <- lookupPeers var ih diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs index 29d4231d..aefd7742 100644 --- a/src/Network/DHT/Mainline.hs +++ b/src/Network/DHT/Mainline.hs @@ -88,6 +88,10 @@ module Network.DHT.Mainline , Announce (..) , Announced (..) #endif + , DHTData(..) + , SessionTokens(..) + , grantToken + , checkToken ) where import Control.Applicative @@ -118,13 +122,19 @@ import Network.DatagramServer.Mainline import Data.Maybe import Data.Torrent (InfoHash) -import Network.BitTorrent.DHT.Token +import Network.BitTorrent.DHT.Token as T +import Network.BitTorrent.DHT.ContactInfo #ifdef VERSION_bencoding import Network.DatagramServer () #endif import Network.DatagramServer.Types hiding (Query,Response) import Network.DHT.Types import Network.DHT.Routing +import Data.Time +import Control.Concurrent.STM +import System.Random +import Data.Hashable + {----------------------------------------------------------------------- -- envelopes @@ -472,6 +482,59 @@ bep42 addr (NodeId r) where msk | BS.length ip == 4 = ip4mask | otherwise = ip6mask +{----------------------------------------------------------------------- +-- Tokens policy +-----------------------------------------------------------------------} + +data SessionTokens = SessionTokens + { tokenMap :: !TokenMap + , lastUpdate :: !UTCTime + , maxInterval :: !NominalDiffTime + } + +nullSessionTokens :: IO SessionTokens +nullSessionTokens = SessionTokens + <$> (tokens <$> randomIO) + <*> getCurrentTime + <*> pure defaultUpdateInterval + +-- TODO invalidate *twice* if needed +invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens +invalidateTokens curTime ts @ SessionTokens {..} + | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens + { tokenMap = update tokenMap + , lastUpdate = curTime + , maxInterval = maxInterval + } + | otherwise = ts + +{----------------------------------------------------------------------- +-- Tokens +-----------------------------------------------------------------------} + +tryUpdateSecret :: TVar SessionTokens -> IO () +tryUpdateSecret toks = do + curTime <- getCurrentTime + atomically $ modifyTVar' toks (invalidateTokens curTime) + +grantToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> IO Token +grantToken sessionTokens addr = do + tryUpdateSecret sessionTokens + toks <- readTVarIO sessionTokens + return $ T.lookup addr $ tokenMap toks + +-- | Throws 'HandlerError' if the token is invalid or already +-- expired. See 'TokenMap' for details. +checkToken :: Hashable a => TVar SessionTokens -> NodeAddr a -> Token -> IO Bool +checkToken sessionTokens addr questionableToken = do + tryUpdateSecret sessionTokens + toks <- readTVarIO sessionTokens + return $ T.member addr questionableToken (tokenMap toks) + + +-------------------------- + + instance Kademlia KMessageOf where data Ping KMessageOf = Ping deriving (Show, Eq, Typeable) @@ -479,10 +542,17 @@ instance Kademlia KMessageOf where deriving (Show, Eq, Typeable) newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable) + data DHTData KMessageOf ip = TorrentData + { contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; + , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. + } + pingMessage _ = Ping pongMessage _ = Ping findNodeMessage _ k = FindNode (toNodeId k) + findWho (FindNode nid) = nid foundNodes (NodeFound ns) = ns + foundNodesMessage ns = NodeFound ns dhtAdjustID _ fallback ip0 arrival = fromMaybe fallback $ do @@ -494,3 +564,7 @@ instance Kademlia KMessageOf where namePing _ = "ping" nameFindNodes _ = "find-nodes" + + initializeDHTData = TorrentData + <$> newTVarIO def + <*> (newTVarIO =<< nullSessionTokens) diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs index 79f9e1d3..31ae5948 100644 --- a/src/Network/DHT/Types.hs +++ b/src/Network/DHT/Types.hs @@ -2,6 +2,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE DeriveGeneric #-} module Network.DHT.Types ( module Network.DHT.Types , TableKey @@ -12,6 +13,7 @@ import Network.Socket (SockAddr) import Network.DatagramServer.Types import Network.DHT.Routing import Data.Typeable +import GHC.Generics data TableParameters msg ip u = TableParameters { maxBuckets :: Int @@ -27,7 +29,7 @@ data Query dht a = Query { queringNodeId :: NodeId dht -- ^ node id of /quering/ node; , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43 , queryParams :: a -- ^ query parameters. - } deriving (Typeable) + } deriving (Typeable,Generic) deriving instance (Eq (NodeId dht), Eq a ) => Eq (Query dht a) deriving instance (Show (NodeId dht), Show a ) => Show (Query dht a) @@ -37,7 +39,7 @@ deriving instance (Show (NodeId dht), Show a ) => Show (Query dht a) data Response dht a = Response { queredNodeId :: NodeId dht -- ^ node id of /quered/ node; , responseVals :: a -- ^ query result. - } deriving (Typeable) + } deriving (Typeable,Generic) deriving instance (Eq (NodeId dht), Eq a ) => Eq (Response dht a) deriving instance (Show (NodeId dht), Show a ) => Show (Response dht a) @@ -51,6 +53,7 @@ class Kademlia dht where -- given its ID. data FindNode dht ip data NodeFound dht ip + data DHTData dht ip pingMessage :: Proxy dht -> Ping dht pongMessage :: Proxy dht -> Ping dht findNodeMessage :: TableKey dht k => Proxy dht -> k -> FindNode dht ip @@ -60,3 +63,4 @@ class Kademlia dht where dhtAdjustID :: Address ip => Proxy dht -> NodeId dht -> SockAddr -> Event dht ip u -> NodeId dht namePing :: Proxy dht -> QueryMethod dht nameFindNodes :: Proxy dht -> QueryMethod dht + initializeDHTData :: IO (DHTData dht ip) diff --git a/src/Network/KRPC/Method.hs b/src/Network/KRPC/Method.hs index 3a2bd020..d0eb136a 100644 --- a/src/Network/KRPC/Method.hs +++ b/src/Network/KRPC/Method.hs @@ -89,11 +89,11 @@ showsMethod (Method name) = -- @ -- class ( Typeable req, Typeable resp -#ifdef VERSION_bencoding - , BEncode req, BEncode resp -#else - , Serialize req, Serialize resp -#endif +-- #ifdef VERSION_bencoding + -- , BEncode req, BEncode resp +-- #else + -- , Serialize req, Serialize resp +-- #endif ) => KRPC req resp | req -> resp, resp -> req where -- cgit v1.2.3