From 9737a06bff6c6539a6afd67f7970a6923b401d86 Mon Sep 17 00:00:00 2001 From: Sam T Date: Wed, 28 Aug 2013 07:29:31 +0400 Subject: ~ Refactor tracker. --- src/Network/BitTorrent/Sessions.hs | 13 ++-- src/Network/BitTorrent/Tracker.hs | 132 ++++++++++++++++--------------------- 2 files changed, 64 insertions(+), 81 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs index 9713f438..0f0d7ecd 100644 --- a/src/Network/BitTorrent/Sessions.hs +++ b/src/Network/BitTorrent/Sessions.hs @@ -58,6 +58,7 @@ import Prelude hiding (mapM_, elem) import Control.Applicative import Control.Concurrent import Control.Concurrent.STM +import Control.Concurrent.BoundedChan as BC import Control.Concurrent.MSem as MSem import Control.Monad (forever, (>=>)) import Control.Exception @@ -202,12 +203,12 @@ discover swarm @ SwarmSession {..} = {-# SCC discover #-} do , tconnPort = port } - progress <- getCurrentProgress clientSession - - withTracker progress conn $ \tses -> do - forever $ do - addr <- getPeerAddr tses - forkThrottle swarm $ do + let progress = currentProgress clientSession + ch <- newBoundedChan 100 -- TODO + tid <- forkIO $ tracker ch progress conn + forever $ do + addr <- BC.readChan ch + forkThrottle swarm $ do initiatePeerSession swarm addr $ \pconn -> do print addr runP2P pconn p2p diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index 74b0b593..147d1ea5 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs @@ -11,15 +11,13 @@ -- {-# LANGUAGE TemplateHaskell #-} module Network.BitTorrent.Tracker - ( withTracker, completedReq - - -- * Connection - , TConnection(..), tconnection + ( -- * Connection + TConnection(..) + , tconnection -- * Session , TSession - , getPeerAddr - , getProgress, waitInterval + , tracker -- * Re-export , defaultPorts @@ -32,10 +30,9 @@ import Control.Concurrent.BoundedChan as BC import Control.Concurrent.STM import Control.Exception import Control.Monad - -import Data.List as L -import Data.IORef - +import Data.List as L +import Data.IORef +import Data.Text as T import Network import Network.URI @@ -77,10 +74,14 @@ instance Tracker BitTracker where -- This data is considered as static within one session. -- data TConnection = TConnection { - tconnAnnounce :: URI -- ^ Announce URL. - , tconnInfoHash :: InfoHash -- ^ Hash of info part of current .torrent file. - , tconnPeerId :: PeerId -- ^ Client peer ID. - , tconnPort :: PortNumber -- ^ The port number the client is listenning on. + tconnAnnounce :: URI + -- ^ Announce URL. + , tconnInfoHash :: InfoHash + -- ^ Hash of info part of current .torrent file. + , tconnPeerId :: PeerId + -- ^ Client peer ID. + , tconnPort :: PortNumber + -- ^ The port number the client is listenning on. } deriving Show -- TODO tconnection :: SwarmSession -> TConnection @@ -166,45 +167,6 @@ network a long time than a new. type TimeInterval = Int -data TSession = TSession { - -- TODO synchonize progress with client session - seProgress :: TVar Progress - , seInterval :: IORef TimeInterval - , sePeers :: BoundedChan PeerAddr - , seTracker :: BitTracker - } - -type PeerCount = Int - -defaultChanSize :: PeerCount -defaultChanSize = defaultNumWant * 2 - -getPeerAddr :: TSession -> IO PeerAddr -getPeerAddr = BC.readChan . sePeers - -getProgress :: TSession -> IO Progress -getProgress = readTVarIO . seProgress - -newSession :: PeerCount -> Progress -> TimeInterval -> [PeerAddr] - -> BitTracker - -> IO TSession -newSession chanSize pr i ps tr - | chanSize < 1 - = throwIO $ userError "size of chan should be more that 1" - - | otherwise = do - chan <- newBoundedChan chanSize - - -- if length of the "ps" is more than the "chanSize" we will block - -- forever; to avoid this we remove excessive peers - let ps' = take chanSize ps - BC.writeList2Chan chan ps' - - TSession <$> newTVarIO pr - <*> newIORef i - <*> pure chan - <*> pure tr - waitInterval :: TSession -> IO () waitInterval TSession {..} = do delay <- readIORef seInterval @@ -212,30 +174,53 @@ waitInterval TSession {..} = do where sec = 1000 * 1000 :: Int -announceLoop :: IO (BoundedChan PeerAddr) -announceLoop = undefined - -openSession :: Progress -> TConnection -> IO TSession -openSession initProgress conn = do - t <- Tracker.connect (tconnAnnounce conn) - resp <- Tracker.announce t (startedReq conn initProgress) - newSession defaultChanSize initProgress - (respInterval resp) (respPeers resp) t +data TSession = TSession + { seConnection :: !TConnection + , seTracker :: !BitTracker + , seProgress :: !(TVar Progress) + , sePeers :: !(BoundedChan PeerAddr) + , seInterval :: {-# UNPACK #-} !(IORef TimeInterval) + } -closeSession :: TConnection -> TSession -> IO () -closeSession conn se @ TSession {..} = do - pr <- getProgress se - Tracker.announce seTracker (stoppedReq conn pr) +openSession :: BoundedChan PeerAddr + -> TVar Progress + -> TConnection -> IO TSession +openSession chan progress conn @ TConnection {..} = do + trac <- Tracker.connect tconnAnnounce + pr <- readTVarIO progress + resp <- Tracker.announce trac $ startedReq conn pr + print resp + case resp of + Failure e -> throwIO $ userError $ T.unpack e + AnnounceInfo {..} -> do + -- TODO make use of rest AnnounceInfo fields + BC.writeList2Chan chan respPeers + TSession conn trac progress chan + <$> newIORef respInterval + +closeSession :: TSession -> IO () +closeSession TSession {..} = do + pr <- readTVarIO seProgress + _ <- Tracker.announce seTracker (stoppedReq seConnection pr) return () -syncSession :: TConnection -> TSession -> IO () -syncSession conn se @ TSession {..} = forever $ do +withSession :: BoundedChan PeerAddr + -> TVar Progress + -> TConnection -> (TSession -> IO a) -> IO a +withSession chan prog conn + = bracket (openSession chan prog conn) closeSession + +askPeers :: TSession -> IO () +askPeers se @ TSession {..} = forever $ do waitInterval se - pr <- getProgress se + pr <- readTVarIO seProgress resp <- tryJust isIOException $ do - Tracker.announce seTracker (regularReq defaultNumWant conn pr) + let req = regularReq defaultNumWant seConnection pr + Tracker.announce seTracker req + print resp case resp of Left _ -> return () + Right (Failure e) -> throwIO $ userError $ T.unpack e Right (AnnounceInfo {..}) -> do writeIORef seInterval respInterval @@ -249,8 +234,5 @@ syncSession conn se @ TSession {..} = forever $ do isIOException :: IOException -> Maybe IOException isIOException = return -withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a -withTracker initProgress conn - = bracket - (openSession initProgress conn) - (closeSession conn) +tracker :: BoundedChan PeerAddr -> TVar Progress -> TConnection -> IO () +tracker chan prog conn = withSession chan prog conn askPeers \ No newline at end of file -- cgit v1.2.3