From d2be835c8e210e086458f69520be680f8fef9599 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Mon, 13 Jan 2014 12:07:39 +0400 Subject: Add announce set to DHT session --- src/Network/BitTorrent/DHT/Session.hs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index e26cbad1..3adbb840 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -84,6 +84,7 @@ import Data.Hashable import Data.List as L import Data.Maybe import Data.Monoid +import Data.Set as S import Data.Text as T import Data.Time import Data.Time.Clock.POSIX @@ -227,14 +228,19 @@ invalidateTokens curTime ts @ SessionTokens {..} -- Session -----------------------------------------------------------------------} +type AnnounceSet = Set (InfoHash, PortNumber) + +type Logger = Loc -> LogSource -> LogLevel -> LogStr -> IO () + data Node ip = Node { options :: !Options , thisNodeId :: !NodeId , manager :: !(Manager (DHT ip)) , routingTable :: !(MVar (Table ip)) , contactInfo :: !(TVar (PeerStore ip)) + , announceInfo :: !(TVar AnnounceSet ) , sessionTokens :: !(TVar SessionTokens) - , loggerFun :: !(Loc -> LogSource -> LogLevel -> LogStr -> IO ()) + , loggerFun :: !Logger } newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } @@ -277,6 +283,7 @@ runDHT handlers opts naddr action = runResourceT $ do node <- liftIO $ Node opts myId m <$> newMVar (nullTable myId (optBucketCount opts)) <*> newTVarIO def + <*> newTVarIO S.empty <*> (newTVarIO =<< nullSessionTokens) <*> pure logger runReaderT (unDHT (listen >> action)) node @@ -336,7 +343,7 @@ checkToken :: Hashable a => NodeAddr a -> Token -> DHT ip () checkToken addr questionableToken = do tryUpdateSecret toks <- asks sessionTokens >>= liftIO . readTVarIO - unless (member addr questionableToken (tokenMap toks)) $ + unless (T.member addr questionableToken (tokenMap toks)) $ throwIO $ InvalidParameter "token" {----------------------------------------------------------------------- @@ -412,10 +419,19 @@ getPeerList ih = do else return (Right ps) insertTopic :: InfoHash -> PortNumber -> DHT ip () -insertTopic = undefined +insertTopic ih p = do + var <- asks announceInfo + liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) deleteTopic :: InfoHash -> PortNumber -> DHT ip () -deleteTopic = undefined +deleteTopic ih p = do + var <- asks announceInfo + liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) + +republish :: DHT ip ThreadId +republish = fork $ do + i <- asks (optReannounce . options) + error "DHT.republish: not implemented" {----------------------------------------------------------------------- -- Messaging @@ -512,9 +528,10 @@ search k action = do mapM_ yield results publish :: Address ip => InfoHash -> PortNumber -> DHT ip () -publish ih port = do +publish ih p = do nodes <- getClosest ih - _ <- sourceList [nodes] $= search ih (announceQ ih port) $$ C.take 20 + r <- asks (optReplication . options) + _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r return () {----------------------------------------------------------------------- -- cgit v1.2.3