From f0df039183e7027a49eafe51de53340fc43723e3 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sat, 28 Dec 2013 08:47:02 +0400 Subject: Add node sessions --- bittorrent.cabal | 6 +- src/Network/BitTorrent/Core.hs | 50 +++++ src/Network/BitTorrent/DHT.hs | 86 ++++++++- src/Network/BitTorrent/DHT/Protocol.hs | 329 --------------------------------- src/Network/BitTorrent/DHT/Routing.hs | 22 ++- src/Network/BitTorrent/DHT/Session.hs | 251 +++++++++++++++++++++++++ 6 files changed, 400 insertions(+), 344 deletions(-) delete mode 100644 src/Network/BitTorrent/DHT/Protocol.hs create mode 100644 src/Network/BitTorrent/DHT/Session.hs diff --git a/bittorrent.cabal b/bittorrent.cabal index 48fe51dc..9bc91647 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal @@ -57,10 +57,10 @@ library Network.BitTorrent.Core.Node Network.BitTorrent.Core.PeerId Network.BitTorrent.Core.PeerAddr --- Network.BitTorrent.DHT + Network.BitTorrent.DHT Network.BitTorrent.DHT.Message --- Network.BitTorrent.DHT.Protocol Network.BitTorrent.DHT.Routing + Network.BitTorrent.DHT.Session Network.BitTorrent.DHT.Token -- Network.BitTorrent.Exchange Network.BitTorrent.Exchange.Assembler @@ -95,6 +95,8 @@ library , lens >= 3.0 , resourcet >= 0.4 , mtl + , monad-control + , transformers-base -- Concurrency -- , SafeSemaphore diff --git a/src/Network/BitTorrent/Core.hs b/src/Network/BitTorrent/Core.hs index 9cfb3dd7..6024f5a5 100644 --- a/src/Network/BitTorrent/Core.hs +++ b/src/Network/BitTorrent/Core.hs @@ -9,6 +9,7 @@ -- module Network.BitTorrent.Core ( module Core + , Address (..) -- * Re-exports from Data.IP , IPv4 @@ -16,9 +17,58 @@ module Network.BitTorrent.Core , IP (..) ) where +import Control.Applicative import Data.IP +import Data.Serialize +import Data.Typeable +import Network.Socket (SockAddr (..), PortNumber) import Network.BitTorrent.Core.Fingerprint as Core import Network.BitTorrent.Core.Node as Core import Network.BitTorrent.Core.PeerId as Core import Network.BitTorrent.Core.PeerAddr as Core + + +class (Eq a, Serialize a, Typeable a) => Address a where + toSockAddr :: a -> SockAddr + fromSockAddr :: SockAddr -> Maybe a + +-- | Note that port is zeroed. +instance Address IPv4 where + toSockAddr = SockAddrInet 0 . toHostAddress + fromSockAddr (SockAddrInet _ h) = Just (fromHostAddress h) + fromSockAddr _ = Nothing + +-- | Note that port is zeroed. +instance Address IPv6 where + toSockAddr h = SockAddrInet6 0 0 (toHostAddress6 h) 0 + fromSockAddr (SockAddrInet6 _ _ h _) = Just (fromHostAddress6 h) + fromSockAddr _ = Nothing + +-- | Note that port is zeroed. +instance Address IP where + toSockAddr (IPv4 h) = toSockAddr h + toSockAddr (IPv6 h) = toSockAddr h + fromSockAddr sa = + IPv4 <$> fromSockAddr sa + <|> IPv6 <$> fromSockAddr sa + +setPort :: PortNumber -> SockAddr -> SockAddr +setPort port (SockAddrInet _ h ) = SockAddrInet port h +setPort port (SockAddrInet6 _ f h s) = SockAddrInet6 port f h s +setPort _ (SockAddrUnix s ) = SockAddrUnix s +{-# INLINE setPort #-} + +getPort :: SockAddr -> Maybe PortNumber +getPort (SockAddrInet p _ ) = Just p +getPort (SockAddrInet6 p _ _ _) = Just p +getPort (SockAddrUnix _ ) = Nothing +{-# INLINE getPort #-} + +instance Address a => Address (NodeAddr a) where + toSockAddr NodeAddr {..} = setPort nodePort $ toSockAddr nodeHost + fromSockAddr sa = NodeAddr <$> fromSockAddr sa <*> getPort sa + +instance Address a => Address (PeerAddr a) where + toSockAddr PeerAddr {..} = setPort peerPort $ toSockAddr peerHost + fromSockAddr sa = PeerAddr Nothing <$> fromSockAddr sa <*> getPort sa diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index b0aac002..bdb76c76 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -1,6 +1,86 @@ module Network.BitTorrent.DHT - ( newNodeSession - , dhtServer + ( dht + , ping + , Network.BitTorrent.DHT.bootstrap + , Network.BitTorrent.DHT.lookup + , Network.BitTorrent.DHT.insert ) where -import Network.BitTorrent.DHT.Protocol \ No newline at end of file +import Control.Applicative +import Control.Monad +import Control.Monad.Reader +import Data.List as L +import Network.Socket (PortNumber) + +import Data.Torrent.InfoHash +import Network.BitTorrent.Core +import Network.BitTorrent.DHT.Message +import Network.BitTorrent.DHT.Session + + +{----------------------------------------------------------------------- +-- Handlers +-----------------------------------------------------------------------} + +pingH :: Address ip => NodeHandler ip +pingH = nodeHandler $ \ _ Ping -> return Ping + +{- +findNodeH :: (Eq ip, Serialize ip, Typeable ip) => Handler (DHT ip) +findNodeH = dhtHandler $ \ _ (FindNode nid) -> + NodeFound <$> getClosest nid + +getPeersH :: (Eq ip, Serialize ip, Typeable ip) => Handler (DHT ip) +getPeersH = dhtHandler $ \ addr (GetPeers ih) -> + GotPeers <$> getPeerList ih <*> grantToken addr + +announceH :: Handler (DHT ip) +announceH = dhtHandler $ \ addr (Announce {..}) -> do + checkToken addr sessionToken + insertPeer topic undefined -- PeerAddr (add, port) + return Announced +-} + +handlers :: Address ip => [NodeHandler ip] +handlers = [pingH] + +{----------------------------------------------------------------------- +-- Query +-----------------------------------------------------------------------} + +-- | Run DHT on specified port. +dht :: Address ip => NodeAddr ip -> DHT ip a -> IO a +dht addr = runDHT addr handlers + +ping :: Address ip => NodeAddr ip -> DHT ip () +ping addr = do + Ping <- Ping <@> addr + return () + +-- | One good node may be sufficient. +bootstrap :: Address ip => [NodeAddr ip] -> DHT ip () +bootstrap = mapM_ insertClosest + where + insertClosest addr = do + nid <- getNodeId + NodeFound closest <- FindNode nid <@> addr + forM_ closest insertNode + +-- | Get list of peers which downloading +lookup :: Address ip => InfoHash -> DHT ip [PeerAddr ip] +lookup ih = getClosestHash ih >>= collect + where + collect nodes = L.concat <$> forM (nodeAddr <$> nodes) retrieve + retrieve addr = do + GotPeers {..} <- GetPeers ih <@> addr + either collect pure peers + +-- | Announce that /this/ peer may have some pieces of the specified +-- torrent. +insert :: Address ip => InfoHash -> PortNumber -> DHT ip () +insert ih port = do + nodes <- getClosestHash ih + forM_ (nodeAddr <$> nodes) $ \ addr -> do +-- GotPeers {..} <- GetPeers ih <@> addr +-- Announced <- Announce False ih undefined grantedToken <@> addr + return () diff --git a/src/Network/BitTorrent/DHT/Protocol.hs b/src/Network/BitTorrent/DHT/Protocol.hs deleted file mode 100644 index 8528f0e0..00000000 --- a/src/Network/BitTorrent/DHT/Protocol.hs +++ /dev/null @@ -1,329 +0,0 @@ -module Network.BitTorrent.DHT.Protocol - ( - newNodeSession - - -- * Tracker - , ping - , findNode - , getPeers - , announcePeer - - -- * Server - , dhtServer - ) where - -import Control.Applicative -import Control.Concurrent -import Control.Concurrent.STM -import Control.Monad -import Control.Exception -import Data.ByteString -import Data.Serialize as S -import Data.Function -import Data.Ord -import Data.Maybe -import Data.List as L -import Data.Map as M -import Data.HashMap.Strict as HM -import Network -import Network.Socket -import System.Entropy - -import Data.BEncode -import Network.KRPC -import Network.KRPC.Protocol -import Network.BitTorrent.Peer -import Network.BitTorrent.Exchange.Protocol () - -{----------------------------------------------------------------------- - Node ------------------------------------------------------------------------} - -type NodeId = ByteString - --- TODO WARN is the 'system' random suitable for this? --- | Generate random NodeID used for the entire session. --- Distribution of ID's should be as uniform as possible. --- -genNodeId :: IO NodeId -genNodeId = getEntropy 20 - -data NodeAddr = NodeAddr { - nodeIP :: {-# UNPACK #-} !HostAddress - , nodePort :: {-# UNPACK #-} !PortNumber - } deriving (Show, Eq) - -instance Serialize NodeAddr where - get = NodeAddr <$> getWord32be <*> get - put NodeAddr {..} = putWord32be nodeIP >> put nodePort - -data NodeInfo = NodeInfo { - nodeID :: !NodeId - , nodeAddr :: !NodeAddr - } deriving (Show, Eq) - -instance Serialize NodeInfo where - get = NodeInfo <$> getByteString 20 <*> get - put NodeInfo {..} = put nodeID >> put nodeAddr - -type CompactInfo = ByteString - -decodeCompact :: CompactInfo -> [NodeInfo] -decodeCompact = either (const []) id . S.runGet (many get) - -encodeCompact :: [NodeId] -> CompactInfo -encodeCompact = S.runPut . mapM_ put - -decodePeerList :: [BEncode] -> [PeerAddr] -decodePeerList = undefined - -encodePeerList :: [PeerAddr] -> [BEncode] -encodePeerList = undefined - -type Distance = NodeId - -{----------------------------------------------------------------------- - Tokens ------------------------------------------------------------------------} - -type Secret = Int - -genSecret :: IO Secret -genSecret = error "secret" - --- | Instead of periodically loop over the all nodes in the routing --- table with some given interval (or some other tricky method --- e.g. using timeouts) we can just update tokens on demand - if no --- one asks for a token then the token _should_ not change at all. --- -type Token = ByteString - -defaultToken :: Token -defaultToken = "0xdeadbeef" - -genToken :: NodeAddr -> Secret -> Token -genToken _ _ = defaultToken - -{----------------------------------------------------------------------- - Routing table ------------------------------------------------------------------------} - -type ContactInfo = HashMap InfoHash [PeerAddr] - -insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo -insertPeer ih addr = HM.insertWith (++) ih [addr] - -lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr] -lookupPeers ih = fromMaybe [] . HM.lookup ih - --- TODO use more compact routing table -type RoutingTable = HashMap NodeId NodeAddr - -insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable -insertNode = HM.insert - -type Alpha = Int - -defaultAlpha :: Alpha -defaultAlpha = 8 - --- TODO -kclosest :: Int -> NodeId -> RoutingTable -> [NodeId] -kclosest = undefined - -{----------------------------------------------------------------------- - Node session ------------------------------------------------------------------------} - -data NodeSession = NodeSession { - nodeId :: !NodeId - , routingTable :: !(TVar RoutingTable) - , contactInfo :: !(TVar ContactInfo) --- , currentSecret :: !(TVar Secret) --- , secretTimestamp :: !(TVar Timestamp) - , alpha :: !Alpha - , listenerPort :: !PortNumber - } - -instance Eq NodeSession where - (==) = (==) `on` nodeId - -instance Ord NodeSession where - compare = comparing nodeId - -newNodeSession :: PortNumber -> IO NodeSession -newNodeSession lport - = NodeSession - <$> genNodeId - <*> newTVarIO HM.empty - <*> newTVarIO HM.empty - <*> pure defaultAlpha - <*> pure lport - -assignToken :: NodeSession -> NodeId -> IO Token -assignToken _ _ = return "" - --- TODO -checkToken :: NodeId -> Token -> NodeSession -> IO Bool -checkToken _ _ _ = return True - -updateTimestamp :: NodeSession -> NodeId -> IO () -updateTimestamp = error "updateTimestamp" - -updateToken :: NodeSession -> NodeId -> Token -> IO () -updateToken _ _ _ = error "updateToken" - -{----------------------------------------------------------------------- - DHT Queries ------------------------------------------------------------------------} - -pingM :: Method NodeId NodeId -pingM = method "ping" ["id"] ["id"] - -findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo) -findNodeM = method "find_node" ["id", "target"] ["id", "nodes"] - --- | Lookup peers by a torrent infohash. This method might return --- different kind of responses depending on the routing table of --- queried node: --- --- * If quieried node contains a peer list for the given infohash --- then the node should return the list in a "value" key. Note that --- list is encoded as compact peer address, not a compact node info. --- The result of 'get_peers' method have the following scheme: --- --- > { "id" : "dht_server_node_id" --- > , "token" : "assigned_token" --- > , "values" : ["_IP_PO", "_ip_po"] --- > } --- --- * If quieried node does not contain a list of peers associated --- with the given infohash, then node should return --- --- > { "id" : "dht_server_node_id" --- > , "token" : "assigned_token" --- > , "nodes" : "compact_nodes_info" --- > } --- --- The resulting dictionaries might differ only in a values\/nodes --- keys. --- -getPeersM :: Method (NodeId, InfoHash) BEncode -getPeersM = method "get_peers" ["id", "info_hash"] [] - --- | Used to announce that the peer, controlling the quering node is --- downloading a torrent on a port. -announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId -announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"] - -{----------------------------------------------------------------------- - DHT Tracker ------------------------------------------------------------------------} --- TODO: update node timestamp on each successful call - --- | Note that tracker side query functions could throw RPCException. -type DHT a b = NodeSession -> NodeAddr -> a -> IO b - -ping :: DHT () () -ping NodeSession {..} addr @ NodeAddr {..} () = do - nid <- call (nodeIP, nodePort) pingM nodeId - atomically $ modifyTVar' routingTable $ HM.insert nid addr - -findNode :: DHT NodeId [NodeInfo] -findNode ses @ NodeSession {..} NodeAddr {..} qnid = do - (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid) - updateTimestamp ses nid - return (decodeCompact info) - -getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr]) -getPeers ses @ NodeSession {..} NodeAddr {..} ih = do - resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih) - (nid, tok, res) <- extrResp resp - updateTimestamp ses nid - updateToken ses nid tok - return res - where - extrResp (BDict d) - | Just (BString nid ) <- M.lookup "id" d - , Just (BString tok ) <- M.lookup "token" d - , Just (BList values) <- M.lookup "values" d - = return $ (nid, tok, Right $ decodePeerList values) - - | Just (BString nid ) <- M.lookup "id" d - , Just (BString tok ) <- M.lookup "token" d - , Just (BString nodes) <- M.lookup "nodes" d - = return (nid, tok, Left $ decodeCompact nodes) - - extrResp _ = throw $ RPCException msg - where msg = ProtocolError "unable to extract getPeers resp" - --- remove token from signature, handle the all token stuff by NodeSession - --- | Note that before ever calling this method you should call the --- getPeerList. -announcePeer :: DHT (InfoHash, Token) NodeId -announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do - nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok) - updateTimestamp ses nid - return nid - -{----------------------------------------------------------------------- - DHT Server ------------------------------------------------------------------------} --- TODO: update node timestamp on each successful call --- NOTE: ensure all server operations run in O(1) - -type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b - -pingS :: ServerHandler NodeId NodeId -pingS NodeSession {..} addr nid = do - atomically $ modifyTVar' routingTable $ insertNode nid addr - return nodeId - -findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo) -findNodeS ses @ NodeSession {..} _ (nid, qnid) = do - updateTimestamp ses nid - rt <- atomically $ readTVar routingTable - return (nodeId, encodeCompact $ kclosest alpha qnid rt) - -getPeersS :: ServerHandler (NodeId, InfoHash) BEncode -getPeersS ses @ NodeSession {..} _ (nid, ih) = do - updateTimestamp ses nid - mkResp <$> assignToken ses nid <*> findPeers - where - findPeers = do - list <- lookupPeers ih <$> readTVarIO contactInfo - if not (L.null list) - then return $ Right list - else do - rt <- readTVarIO routingTable - let nodes = kclosest alpha (getInfoHash ih) rt - return $ Left nodes - - mkDict tok res = [("id",BString nodeId), ("token", BString tok), res] - mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes) - mkResult (Right values) = ("values", BList $ encodePeerList values) - mkResp tok = BDict . M.fromList . mkDict tok . mkResult - -announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId -announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do - updateTimestamp ses nid - registered <- checkToken nid token ses - when registered $ do - atomically $ do - let peerAddr = PeerAddr Nothing nodeIP port - modifyTVar contactInfo $ insertPeer ih peerAddr - return nodeId - -dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO () -dhtTracker = undefined - -dhtServer :: NodeSession -> PortNumber -> IO () -dhtServer s p = server p methods - where - methods = - [ pingM ==> pingS s undefined - , findNodeM ==> findNodeS s undefined - , getPeersM ==> getPeersS s undefined - , announcePeerM ==> announcePeerS s undefined - ] \ No newline at end of file diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 5f00a924..fd2197f0 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs @@ -8,12 +8,14 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE DeriveGeneric #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} module Network.BitTorrent.DHT.Routing ( -- * Routing table Table , BucketCount -- * Routing + , Timestamp , Routing , runRouting @@ -89,12 +91,11 @@ insert ping (k, v) = go 0 -----------------------------------------------------------------------} type Timestamp = POSIXTime -type PingInterval = POSIXTime data Routing ip result = Full result | Done (Timestamp -> result) - | Refresh (NodeAddr ip) (([NodeInfo ip], Timestamp) -> Routing ip result) + | Refresh NodeId (([NodeInfo ip], Timestamp) -> Routing ip result) | NeedPing (NodeAddr ip) (Maybe Timestamp -> Routing ip result) instance Functor (Routing ip) where @@ -107,23 +108,24 @@ runRouting :: (Monad m, Eq ip) => (NodeAddr ip -> m Bool) -- ^ ping_node -> (NodeId -> m [NodeInfo ip]) -- ^ find_nodes -> m Timestamp -- ^ timestamper - -> Routing ip f + -> Routing ip f -- ^ action -> m f -- ^ result -runRouting ping_node find_nodes timestamp = go +runRouting ping_node find_nodes timestamper = go where go (Full r) = return r - go (Done f) = liftM f timestamp + go (Done f) = liftM f timestamper go (NeedPing addr f) = do pong <- ping_node addr if pong then do - time <- timestamp + time <- timestamper go (f (Just time)) else go (f Nothing) - go (Refresh nodes f) = do - let nid = undefined - go (f undefined) + go (Refresh nid f) = do + infos <- find_nodes nid + time <- timestamper + go (f (infos, time)) {----------------------------------------------------------------------- Bucket @@ -186,7 +188,7 @@ insertNode info bucket -- update the all bucket if it is too outdated | Just (NodeInfo {..} :-> lastSeen) <- lastChanged bucket , lastSeen > delta - = Refresh nodeAddr $ \ (infos, t) -> + = Refresh nodeId $ \ (infos, t) -> insertNode info $ L.foldr (\ x -> PSQ.insertWith max x t) bucket infos diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs new file mode 100644 index 00000000..71400609 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -0,0 +1,251 @@ +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeFamilies #-} + +{-# LANGUAGE RankNTypes #-} -- TODO remove +module Network.BitTorrent.DHT.Session + ( -- * Session + DHT + , runDHT + + -- * Tokens + , grantToken + , checkToken + + -- * Routing table + , getNodeId + , getClosest + , getClosestHash + , insertNode + + -- * Peer storage + , insertPeer + , getPeerList + + -- * Messaging + , (<@>) + , NodeHandler + , nodeHandler + ) where + +import Control.Applicative +import Control.Concurrent.STM +import Control.Exception hiding (Handler) +import Control.Monad.Reader +import Control.Monad.Base +import Control.Monad.Trans.Control +import Control.Monad.Trans.Resource +import Data.Default +import Data.Hashable +import Data.List as L +import Data.Time +import Data.Time.Clock.POSIX +import System.Random (randomIO) + +import Data.Torrent.InfoHash +import Network.KRPC +import Network.BitTorrent.Core +import Network.BitTorrent.Core.PeerAddr as P +import Network.BitTorrent.DHT.Message +import Network.BitTorrent.DHT.Routing as R +import Network.BitTorrent.DHT.Token as T + + +{----------------------------------------------------------------------- +-- Tokens policy +-----------------------------------------------------------------------} + +data SessionTokens = SessionTokens + { tokenMap :: !TokenMap + , lastUpdate :: !UTCTime + , maxInterval :: !NominalDiffTime + } + +nullSessionTokens :: IO SessionTokens +nullSessionTokens = SessionTokens + <$> (tokens <$> liftIO randomIO) + <*> liftIO getCurrentTime + <*> pure defaultUpdateInterval + +invalidateTokens :: UTCTime -> SessionTokens -> SessionTokens +invalidateTokens curTime ts @ SessionTokens {..} + | curTime `diffUTCTime` lastUpdate > maxInterval = SessionTokens + { tokenMap = update tokenMap + , lastUpdate = curTime + , maxInterval = maxInterval + } + | otherwise = ts + +{----------------------------------------------------------------------- +-- Session +-----------------------------------------------------------------------} + +data Node ip = Node + { manager :: !(Manager (DHT ip)) + , routingTable :: !(TVar (Table ip)) + , contactInfo :: !(TVar (PeerStore ip)) + , sessionTokens :: !(TVar SessionTokens) + } + +newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } + deriving ( Functor, Applicative, Monad + , MonadIO, MonadBase IO + , MonadReader (Node ip) + ) +instance MonadBaseControl IO (DHT ip) where + newtype StM (DHT ip) a = StM { + unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a + } + liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> + cc $ \ (DHT m) -> StM <$> cc' m + {-# INLINE liftBaseWith #-} + + restoreM = DHT . restoreM . unSt + {-# INLINE restoreM #-} + +instance MonadKRPC (DHT ip) (DHT ip) where + getManager = asks manager + +runDHT :: forall ip a. Address ip + => NodeAddr ip -- ^ node address to bind; + -> [Handler (DHT ip)] -- ^ handlers to run on accepted queries; + -> DHT ip a -- ^ DHT action to run; + -> IO a -- ^ result. +runDHT naddr handlers action = runResourceT $ do + (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager + myId <- liftIO genNodeId + node <- liftIO $ Node m + <$> newTVarIO (nullTable myId) + <*> newTVarIO def + <*> (newTVarIO =<< nullSessionTokens) + runReaderT (unDHT (listen >> action)) node + +{----------------------------------------------------------------------- +-- Routing +-----------------------------------------------------------------------} + +-- TODO fork? +routing :: Address ip => Routing ip a -> DHT ip a +routing = runRouting ping refreshNodes getTimestamp + +-- TODO add timeout +ping :: Address ip => NodeAddr ip -> DHT ip Bool +ping addr = do + Ping <- Ping <@> addr + return True + +-- FIXME do not use getClosest sinse we should /refresh/ them +refreshNodes :: Address ip => NodeId -> DHT ip [NodeInfo ip] +refreshNodes nid = do + nodes <- getClosest nid + nss <- forM (nodeAddr <$> nodes) $ \ addr -> do + NodeFound ns <- FindNode nid <@> addr + return ns + return $ L.concat nss + +getTimestamp :: DHT ip Timestamp +getTimestamp = liftIO $ utcTimeToPOSIXSeconds <$> getCurrentTime + +{----------------------------------------------------------------------- +-- Tokens +-----------------------------------------------------------------------} + +tryUpdateSecret :: DHT ip () +tryUpdateSecret = do + curTime <- liftIO getCurrentTime + toks <- asks sessionTokens + liftIO $ atomically $ modifyTVar' toks (invalidateTokens curTime) + +grantToken :: Hashable a => NodeAddr a -> DHT ip Token +grantToken addr = do + tryUpdateSecret + toks <- asks sessionTokens >>= liftIO . readTVarIO + return $ T.lookup addr $ tokenMap toks + +-- | Throws 'ProtocolError' if token is invalid or already expired. +checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip () +checkToken addr questionableToken = do + tryUpdateSecret + toks <- asks sessionTokens >>= liftIO . readTVarIO + unless (member addr questionableToken (tokenMap toks)) $ + liftIO $ throwIO $ KError ProtocolError "bad token" "" + -- todo reset transaction id in krpc + +{----------------------------------------------------------------------- +-- Routing table +-----------------------------------------------------------------------} + +getTable :: DHT ip (Table ip) +getTable = do + var <- asks routingTable + liftIO (readTVarIO var) + +putTable :: Table ip -> DHT ip () +putTable table = do + var <- asks routingTable + liftIO (atomically (writeTVar var table)) + +getNodeId :: DHT ip NodeId +getNodeId = thisId <$> getTable + +getClosest :: Eq ip => NodeId -> DHT ip [NodeInfo ip] +getClosest nid = kclosest 8 nid <$> getTable + +getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip] +getClosestHash ih = kclosestHash 8 ih <$> getTable + +insertNode :: Address ip => NodeInfo ip -> DHT ip () +insertNode info = do + t <- getTable + t' <- routing (R.insert info t) + putTable t' + +{----------------------------------------------------------------------- +-- Peer storage +-----------------------------------------------------------------------} + +insertPeer :: Eq ip => InfoHash -> PeerAddr ip -> DHT ip () +insertPeer ih addr = do + var <- asks contactInfo + liftIO $ atomically $ modifyTVar' var (P.insert ih addr) + +lookupPeers :: InfoHash -> DHT ip [PeerAddr ip] +lookupPeers ih = do + var <- asks contactInfo + liftIO $ P.lookup ih <$> readTVarIO var + +type PeerList ip = Either [NodeInfo ip] [PeerAddr ip] + +getPeerList :: Eq ip => InfoHash -> DHT ip (PeerList ip) +getPeerList ih = do + ps <- lookupPeers ih + if L.null ps + then Left <$> getClosestHash ih + else return (Right ps) + +{----------------------------------------------------------------------- +-- Messaging +-----------------------------------------------------------------------} + +(<@>) :: Address ip => KRPC (Query a) (Response b) + => a -> NodeAddr ip -> DHT ip b +q <@> addr = do + nid <- getNodeId + Response remoteId r <- query (toSockAddr addr) (Query nid q) + insertNode (NodeInfo remoteId addr) + return r + +type NodeHandler ip = Handler (DHT ip) + +nodeHandler :: Address ip => KRPC (Query a) (Response b) + => (NodeAddr ip -> a -> DHT ip b) -> NodeHandler ip +nodeHandler action = handler $ \ sockAddr (Query remoteId q) -> do + case fromSockAddr sockAddr of + Nothing -> liftIO $ throwIO $ KError GenericError "bad address" "" + Just naddr -> do + insertNode (NodeInfo remoteId naddr) + Response <$> getNodeId <*> action naddr q -- cgit v1.2.3