From e642358ffe4673ab0a03c5aafa628ffc86b17abd Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 22 Jan 2014 00:30:01 +0400 Subject: Cleanup old sessions --- src/Network/BitTorrent/Exchange.hs | 428 +--------------------------- src/Network/BitTorrent/Sessions.hs | 446 ------------------------------ src/Network/BitTorrent/Sessions/Types.lhs | 316 --------------------- 3 files changed, 1 insertion(+), 1189 deletions(-) delete mode 100644 src/Network/BitTorrent/Sessions.hs delete mode 100644 src/Network/BitTorrent/Sessions/Types.lhs (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index c1377449..934c646d 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs @@ -1,16 +1,3 @@ -{- TODO turn awaitEvent and yieldEvent to sourcePeer and sinkPeer - - sourceSocket sock $= - conduitGet S.get $= - sourcePeer $= - p2p $= - sinkPeer $= - conduitPut S.put $$ - sinkSocket sock - - measure performance - -} - -- | -- Copyright : (c) Sam Truzjan 2013 -- License : BSD3 @@ -18,419 +5,6 @@ -- Stability : experimental -- Portability : portable -- --- This module provides P2P communication and aims to hide the --- following stuff under the hood: --- --- * TODO; --- --- * /keep alives/ -- ; --- --- * /choking mechanism/ -- is used ; --- --- * /message broadcasting/ -- ; --- --- * /message filtering/ -- due to network latency and concurrency --- some arriving messages might not make sense in the current --- session context; --- --- * /scatter\/gather pieces/ -- ; --- --- * /various P2P protocol extensions/ -- . --- --- Finally we get a simple event-based communication model. --- -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE BangPatterns #-} module Network.BitTorrent.Exchange - ( P2P - , runP2P - - -- * Query - , getHaveCount - , getWantCount - , getPieceCount - , peerOffer - - -- * Events - , Event(..) - , awaitEvent - , yieldEvent - , handleEvent - , exchange - , p2p - - -- * Exceptions - , disconnect - , protocolError - - -- * Block - , Block(..), BlockIx(..) - - -- * Status - , PeerStatus(..), SessionStatus(..) - , inverseStatus - , canDownload, canUpload + ( ) where - -import Control.Applicative -import Control.Concurrent.STM -import Control.Exception -import Control.Lens -import Control.Monad.Reader -import Control.Monad.State -import Control.Monad.Trans.Resource - -import Data.IORef -import Data.Conduit as C -import Data.Conduit.Cereal as S -import Data.Conduit.Network -import Data.Serialize as S -import Text.PrettyPrint as PP hiding (($$)) - -import Network - -import Data.Torrent.Block -import Data.Torrent.Bitfield as BF -import Network.BitTorrent.Extension -import Network.BitTorrent.Exchange.Protocol -import Network.BitTorrent.Sessions.Types -import System.Torrent.Storage - - -{----------------------------------------------------------------------- - Exceptions ------------------------------------------------------------------------} - --- | Terminate the current 'P2P' session. -disconnect :: P2P a -disconnect = monadThrow PeerDisconnected - --- TODO handle all protocol details here so we can hide this from --- public interface | -protocolError :: Doc -> P2P a -protocolError = monadThrow . ProtocolError - -{----------------------------------------------------------------------- - Helpers ------------------------------------------------------------------------} - -getClientBF :: P2P Bitfield -getClientBF = asks swarmSession >>= liftIO . getClientBitfield -{-# INLINE getClientBF #-} - --- | Count of client /have/ pieces. -getHaveCount :: P2P PieceCount -getHaveCount = haveCount <$> getClientBF -{-# INLINE getHaveCount #-} - --- | Count of client do not /have/ pieces. -getWantCount :: P2P PieceCount -getWantCount = totalCount <$> getClientBF -{-# INLINE getWantCount #-} - --- | Count of both /have/ and /want/ pieces. -getPieceCount :: P2P PieceCount -getPieceCount = asks findPieceCount -{-# INLINE getPieceCount #-} - --- for internal use only -emptyBF :: P2P Bitfield -emptyBF = liftM haveNone getPieceCount - -fullBF :: P2P Bitfield -fullBF = liftM haveAll getPieceCount - -singletonBF :: PieceIx -> P2P Bitfield -singletonBF i = liftM (BF.singleton i) getPieceCount - -adjustBF :: Bitfield -> P2P Bitfield -adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount - -peerWant :: P2P Bitfield -peerWant = BF.difference <$> getClientBF <*> use bitfield - -clientWant :: P2P Bitfield -clientWant = BF.difference <$> use bitfield <*> getClientBF - -peerOffer :: P2P Bitfield -peerOffer = do - sessionStatus <- use status - if canDownload sessionStatus then clientWant else emptyBF - -clientOffer :: P2P Bitfield -clientOffer = do - sessionStatus <- use status - if canUpload sessionStatus then peerWant else emptyBF - - - -revise :: P2P Bitfield -revise = do - want <- clientWant - let peerInteresting = not (BF.null want) - clientInterested <- use (status.clientStatus.interested) - - when (clientInterested /= peerInteresting) $ do - yieldMessage $ if peerInteresting then Interested else NotInterested - status.clientStatus.interested .= peerInteresting - - return want - -requireExtension :: Extension -> P2P () -requireExtension required = do - enabled <- asks enabledExtensions - unless (required `elem` enabled) $ - protocolError $ ppExtension required <+> "not enabled" - --- haveMessage bf = do --- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession --- if undefined -- ix `member` bf --- then nextEvent se --- else undefined -- return $ Available diff - -{----------------------------------------------------------------------- - Exchange ------------------------------------------------------------------------} - - --- | The 'Event' occur when either client or a peer change their --- state. 'Event' are similar to 'Message' but differ in. We could --- both wait for an event or raise an event using the 'awaitEvent' and --- 'yieldEvent' functions respectively. --- --- --- 'awaitEvent'\/'yieldEvent' properties: --- --- * between any await or yield state of the (another)peer could not change. --- -data Event - -- | Generalize 'Bitfield', 'Have', 'HaveAll', 'HaveNone', - -- 'SuggestPiece', 'AllowedFast' messages. - = Available Bitfield - - -- | Generalize 'Request' and 'Interested' messages. - | Want BlockIx - - -- | Generalize 'Piece' and 'Unchoke' messages. - | Fragment Block - deriving Show - --- INVARIANT: --- --- * Available Bitfield is never empty --- - --- | You could think of 'awaitEvent' as wait until something interesting occur. --- --- The following table shows which events may occur: --- --- > +----------+---------+ --- > | Leacher | Seeder | --- > |----------+---------+ --- > | Available| | --- > | Want | Want | --- > | Fragment | | --- > +----------+---------+ --- --- The reason is that seeder is not interested in any piece, and --- both available or fragment events doesn't make sense in this context. --- --- Some properties: --- --- forall (Fragment block). isPiece block == True --- -awaitEvent :: P2P Event -awaitEvent = {-# SCC awaitEvent #-} do - flushPending - msg <- awaitMessage - go msg - where - go KeepAlive = awaitEvent - go Choke = do - status.peerStatus.choking .= True - awaitEvent - - go Unchoke = do - status.peerStatus.choking .= False - offer <- peerOffer - if BF.null offer - then awaitEvent - else return (Available offer) - - go Interested = do - status.peerStatus.interested .= True - awaitEvent - - go NotInterested = do - status.peerStatus.interested .= False - awaitEvent - - go (Have idx) = do - bitfield %= have idx - _ <- revise - - offer <- peerOffer - if not (BF.null offer) - then return (Available offer) - else awaitEvent - - go (Bitfield bf) = do - new <- adjustBF bf - bitfield .= new - _ <- revise - - offer <- peerOffer - if not (BF.null offer) - then return (Available offer) - else awaitEvent - - go (Request bix) = do - bf <- clientOffer - if ixPiece bix `BF.member` bf - then return (Want bix) - else do --- check if extension is enabled --- yieldMessage (RejectRequest bix) - awaitEvent - - go (Piece blk) = do - -- this protect us from malicious peers and duplication - wanted <- clientWant - if blkPiece blk `BF.member` wanted - then return (Fragment blk) - else awaitEvent - - go (Cancel _) = do - error "cancel message not implemented" - - go (Port _) = do - requireExtension ExtDHT - error "port message not implemented" - - go HaveAll = do - requireExtension ExtFast - bitfield <~ fullBF - _ <- revise - awaitEvent - - go HaveNone = do - requireExtension ExtFast - bitfield <~ emptyBF - _ <- revise - awaitEvent - - go (SuggestPiece idx) = do - requireExtension ExtFast - bf <- use bitfield - if idx `BF.notMember` bf - then Available <$> singletonBF idx - else awaitEvent - - go (RejectRequest _) = do - requireExtension ExtFast - awaitEvent - - go (AllowedFast _) = do - requireExtension ExtFast - awaitEvent - --- TODO minimize number of peerOffer calls - --- | Raise an events which may occur --- --- This table shows when a some specific events /makes sense/ to yield: --- --- @ --- +----------+---------+ --- | Leacher | Seeder | --- |----------+---------+ --- | Available| | --- | Want |Fragment | --- | Fragment | | --- +----------+---------+ --- @ --- --- Seeder should not yield: --- --- * Available -- seeder could not store anything new. --- --- * Want -- seeder alread have everything, no reason to want. --- --- Hovewer, it's okay to not obey the rules -- if we are yield some --- event which doesn't /makes sense/ in the current context then it --- most likely will be ignored without any network IO. --- -yieldEvent :: Event -> P2P () -yieldEvent e = {-# SCC yieldEvent #-} do - go e - flushPending - where - go (Available ixs) = do - ses <- asks swarmSession - liftIO $ atomically $ available ixs ses - - go (Want bix) = do - offer <- peerOffer - if ixPiece bix `BF.member` offer - then yieldMessage (Request bix) - else return () - - go (Fragment blk) = do - offer <- clientOffer - if blkPiece blk `BF.member` offer - then yieldMessage (Piece blk) - else return () - - -handleEvent :: (Event -> P2P Event) -> P2P () -handleEvent action = awaitEvent >>= action >>= yieldEvent - --- Event translation table looks like: --- --- Available -> Want --- Want -> Fragment --- Fragment -> Available --- --- If we join the chain we get the event loop: --- --- Available -> Want -> Fragment --\ --- /|\ | --- \---------------------------/ --- - - --- | Default P2P action. -exchange :: Storage -> P2P () -exchange storage = {-# SCC exchange #-} awaitEvent >>= handler - where - handler (Available bf) = do - ixs <- selBlk (findMin bf) storage - mapM_ (yieldEvent . Want) ixs -- TODO yield vectored - - handler (Want bix) = do - liftIO $ print bix - blk <- liftIO $ getBlk bix storage - yieldEvent (Fragment blk) - - handler (Fragment blk @ Block {..}) = do - done <- liftIO $ putBlk blk storage - when done $ do - yieldEvent $ Available $ singleton blkPiece (succ blkPiece) - - -- WARN this is not reliable: if peer do not return all piece - -- block we could slow don't until some other event occured - offer <- peerOffer - if BF.null offer - then return () - else handler (Available offer) - -yieldInit :: P2P () -yieldInit = yieldMessage . Bitfield =<< getClientBF - -p2p :: P2P () -p2p = do - yieldInit - storage <- asks (storage . swarmSession) - forever $ do - exchange storage \ No newline at end of file diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs deleted file mode 100644 index d558438f..00000000 --- a/src/Network/BitTorrent/Sessions.hs +++ /dev/null @@ -1,446 +0,0 @@ --- | --- Copyright : (c) Sam Truzjan 2013 --- License : BSD3 --- Maintainer : pxqr.sta@gmail.com --- Stability : experimental --- Portability : portable --- -module Network.BitTorrent.Sessions - ( -- * Progress - Progress(..), startProgress - , ClientService(..) - , startService - , withRunning - - -- * Client - , ClientSession ( ClientSession - , clientPeerId, allowedExtensions - ) - , withClientSession - - , ThreadCount - , defaultThreadCount - - , TorrentLoc(..) - , registerTorrent - , unregisterTorrent - , getRegistered - - , getCurrentProgress - , getSwarmCount - , getPeerCount - , getActiveSwarms - , getSwarm - , getStorage - , getTorrentInfo - , openSwarmSession - - -- * Swarm - , SwarmSession( SwarmSession, torrentMeta - , clientSession, storage - ) - - , SessionCount - , getSessionCount - , getClientBitfield - , getActivePeers - - , discover - - , PeerSession ( connectedPeerAddr, enabledExtensions ) - , getSessionState - - , SessionState (..) - ) where - -import Prelude hiding (mapM_, elem) - -import Control.Applicative -import Control.Concurrent -import Control.Concurrent.STM -import Control.Concurrent.BoundedChan as BC -import Control.Concurrent.MSem as MSem -import Control.Monad (forever, (>=>)) -import Control.Exception -import Control.Monad.Trans - -import Data.IORef -import Data.Map as M -import Data.HashMap.Strict as HM -import Data.Foldable as F -import Data.Set as S - -import Network hiding (accept) -import Network.BSD -import Network.Socket - -import Data.Torrent.Bitfield as BF -import Data.Torrent -import Network.BitTorrent.Extension -import Network.BitTorrent.Peer -import Network.BitTorrent.Sessions.Types -import Network.BitTorrent.Exchange.Protocol as BT -import Network.BitTorrent.Tracker.Protocol as BT -import Network.BitTorrent.Tracker as BT -import Network.BitTorrent.Exchange as BT -import System.Torrent.Storage - -{----------------------------------------------------------------------- - Client Services ------------------------------------------------------------------------} - -startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO () -startService s port m = do - stopService s - putMVar s =<< spawn - where - spawn = ClientService port <$> forkIO (m port) - -stopService :: MVar ClientService -> IO () -stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread) - --- | Service A might depend on service B. -withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO () -withRunning dep failure action = tryTakeMVar dep >>= maybe failure action - -{----------------------------------------------------------------------- - Torrent presence ------------------------------------------------------------------------} - -data TorrentPresence = Active SwarmSession - | Registered TorrentLoc - | Unknown - -torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence -torrentPresence ClientSession {..} ih = do - sws <- readTVarIO swarmSessions - case M.lookup ih sws of - Just ss -> return $ Active ss - Nothing -> do - tm <- readTVarIO torrentMap - return $ maybe Unknown Registered $ HM.lookup ih tm - -{----------------------------------------------------------------------- - Client sessions ------------------------------------------------------------------------} - -startListener :: ClientSession -> PortNumber -> IO () -startListener cs @ ClientSession {..} port = - startService peerListener port $ listener cs $ \conn @ (_, PeerSession{..}) -> do - runP2P conn p2p - --- | Create a new client session. The data passed to this function are --- usually loaded from configuration file. -openClientSession :: SessionCount -> [Extension] -> PortNumber -> PortNumber -> IO ClientSession -openClientSession n exts listenerPort _ = do - cs <- ClientSession - <$> genPeerId - <*> pure exts - <*> newEmptyMVar - <*> MSem.new n - <*> pure n - <*> newTVarIO M.empty - <*> newTVarIO (startProgress 0) - <*> newTVarIO HM.empty - - startListener cs listenerPort - return cs - -closeClientSession :: ClientSession -> IO () -closeClientSession ClientSession {..} = do - stopService peerListener - - sws <- readTVarIO swarmSessions - forM_ sws closeSwarmSession - -withClientSession :: SessionCount -> [Extension] - -> PortNumber -> PortNumber - -> (ClientSession -> IO ()) -> IO () -withClientSession c es l d = bracket (openClientSession c es l d) closeClientSession - --- | Get current global progress of the client. This value is usually --- shown to a user. -getCurrentProgress :: MonadIO m => ClientSession -> m Progress -getCurrentProgress = liftIO . readTVarIO . currentProgress - --- | Get number of swarms client aware of. -getSwarmCount :: MonadIO m => ClientSession -> m SessionCount -getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions - --- | Get number of peers the client currently connected to. -getPeerCount :: MonadIO m => ClientSession -> m ThreadCount -getPeerCount ClientSession {..} = liftIO $ do - unused <- peekAvail activeThreads - return (maxActive - unused) - -getActiveSwarms :: ClientSession -> IO [SwarmSession] -getActiveSwarms ClientSession {..} = M.elems <$> readTVarIO swarmSessions - -getListenerPort :: ClientSession -> IO PortNumber -getListenerPort ClientSession {..} = servPort <$> readMVar peerListener - -{----------------------------------------------------------------------- - Swarm session ------------------------------------------------------------------------} - -defSeederConns :: SessionCount -defSeederConns = defaultUnchokeSlots - -defLeecherConns :: SessionCount -defLeecherConns = defaultNumWant - --- discovery should hide tracker and DHT communication under the hood --- thus we can obtain an unified interface - -discover :: SwarmSession -> IO () -discover swarm @ SwarmSession {..} = {-# SCC discover #-} do - port <- getListenerPort clientSession - - let conn = TConnection { - tconnAnnounce = tAnnounce torrentMeta - , tconnInfoHash = tInfoHash torrentMeta - , tconnPeerId = clientPeerId clientSession - , tconnPort = port - } - - let progress = currentProgress clientSession - ch <- newBoundedChan 100 -- TODO - tid <- forkIO $ tracker ch progress conn - forever $ do - addr <- BC.readChan ch - forkThrottle swarm $ do - initiatePeerSession swarm addr $ \pconn -> do - print addr - runP2P pconn p2p - -registerSwarmSession :: SwarmSession -> STM () -registerSwarmSession ss @ SwarmSession {..} = - modifyTVar' (swarmSessions clientSession) $ - M.insert (tInfoHash torrentMeta) ss - -unregisterSwarmSession :: SwarmSession -> STM () -unregisterSwarmSession SwarmSession {..} = - modifyTVar' (swarmSessions clientSession) $ - M.delete $ tInfoHash torrentMeta - -openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession -openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do - t <- validateLocation loc - let bf = haveNone $ pieceCount $ tInfo t - - ss <- SwarmSession t cs - <$> MSem.new defLeecherConns - <*> openStorage t dataDirPath bf - <*> newTVarIO S.empty - <*> newBroadcastTChanIO - - atomically $ do - modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t - registerSwarmSession ss - - _ <- forkIO $ discover ss - - return ss - -closeSwarmSession :: SwarmSession -> IO () -closeSwarmSession se @ SwarmSession {..} = do - atomically $ unregisterSwarmSession se - -- TODO stop discovery - -- TODO killall peer sessions - -- TODO the order is important! - closeStorage storage - -getSwarm :: ClientSession -> InfoHash -> IO SwarmSession -getSwarm cs @ ClientSession {..} ih = do - tstatus <- torrentPresence cs ih - case tstatus of - Unknown -> throw $ UnknownTorrent ih - Active sw -> return sw - Registered loc -> openSwarmSession cs loc - --- TODO do not spawn session! -getStorage :: ClientSession -> InfoHash -> IO Storage -getStorage cs ih = storage <$> getSwarm cs ih - --- TODO keep sorted? -getActivePeers :: SwarmSession -> IO [PeerSession] -getActivePeers SwarmSession {..} = S.toList <$> readTVarIO connectedPeers - -getTorrentInfo :: ClientSession -> InfoHash -> IO (Maybe Torrent) -getTorrentInfo cs ih = do - tstatus <- torrentPresence cs ih - case tstatus of - Unknown -> return Nothing - Active (SwarmSession {..}) -> return $ Just torrentMeta - Registered (TorrentLoc {..}) -> Just <$> fromFile metafilePath - --- | Get the number of connected peers in the given swarm. -getSessionCount :: SwarmSession -> IO SessionCount -getSessionCount SwarmSession {..} = do - S.size <$> readTVarIO connectedPeers - -swarmHandshake :: SwarmSession -> Handshake -swarmHandshake SwarmSession {..} = Handshake { - hsProtocol = defaultBTProtocol - , hsReserved = encodeExts $ allowedExtensions $ clientSession - , hsInfoHash = tInfoHash torrentMeta - , hsPeerId = clientPeerId $ clientSession - } - -{----------------------------------------------------------------------- - Peer sessions throttling ------------------------------------------------------------------------} - --- | The number of threads suitable for a typical BT client. -defaultThreadCount :: ThreadCount -defaultThreadCount = 1000 - -enterSwarm :: SwarmSession -> IO () -enterSwarm SwarmSession {..} = do - MSem.wait (activeThreads clientSession) - MSem.wait vacantPeers `onException` - MSem.signal (activeThreads clientSession) - -leaveSwarm :: SwarmSession -> IO () -leaveSwarm SwarmSession {..} = mask_ $ do - MSem.signal vacantPeers - MSem.signal (activeThreads clientSession) - -forkThrottle :: SwarmSession -> IO () -> IO ThreadId -forkThrottle se action = do - enterSwarm se - (forkIO $ do - action `finally` leaveSwarm se) - `onException` leaveSwarm se - --- TODO: check content files location; -validateLocation :: TorrentLoc -> IO Torrent -validateLocation = fromFile . metafilePath - -registerTorrent :: ClientSession -> TorrentLoc -> IO () -registerTorrent ClientSession {..} loc @ TorrentLoc {..} = do - torrent <- fromFile metafilePath - atomically $ modifyTVar' torrentMap $ HM.insert (tInfoHash torrent) loc - --- TODO kill sessions -unregisterTorrent :: ClientSession -> InfoHash -> IO () -unregisterTorrent ClientSession {..} ih = do - atomically $ modifyTVar' torrentMap $ HM.delete ih - -getRegistered :: ClientSession -> IO TorrentMap -getRegistered ClientSession {..} = readTVarIO torrentMap - -{----------------------------------------------------------------------- - Peer session creation ------------------------------------------------------------------------- -The peer session cycle looks like: - - * acquire vacant session and vacant thread slot; - * (fork could be here, but not necessary) - * establish peer connection; - * register peer session; - * ... exchange process ... - * unregister peer session; - * close peer connection; - * release acquired session and thread slot. - -TODO: explain why this order -TODO: thread throttling -TODO: check if it connected yet peer -TODO: utilize peer Id. -TODO: use STM semaphore ------------------------------------------------------------------------} - -registerPeerSession :: PeerSession -> IO () -registerPeerSession ps @ PeerSession {..} = - atomically $ modifyTVar' (connectedPeers swarmSession) (S.insert ps) - -unregisterPeerSession :: PeerSession -> IO () -unregisterPeerSession ps @ PeerSession {..} = - atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps) - -openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession -openSession ss @ SwarmSession {..} addr Handshake {..} = do - let clientCaps = encodeExts $ allowedExtensions $ clientSession - let enabled = decodeExts (enabledCaps clientCaps hsReserved) - - bf <- getClientBitfield ss - ps <- PeerSession addr ss enabled - <$> atomically (dupTChan broadcastMessages) - <*> newIORef (initialSessionState (totalCount bf)) - -- TODO we could implement more interesting throtling scheme - -- using connected peer information - registerPeerSession ps - return ps - --- TODO kill thread -closeSession :: PeerSession -> IO () -closeSession = unregisterPeerSession - -type PeerConn = (Socket, PeerSession) -type Exchange = PeerConn -> IO () - --- | Exchange action depends on session and socket, whereas session depends --- on socket: --- --- socket------>-----exchange --- | | --- \-->--session-->--/ --- --- To handle exceptions properly we double bracket socket and session --- then joining the resources and also ignoring session local exceptions. --- -runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO () -runSession connector opener action = - handle isSessionException $ - bracket connector close $ \sock -> - bracket (opener sock) closeSession $ \ses -> - action (sock, ses) - --- | Used then the client want to connect to a peer. -initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () -initiatePeerSession ss @ SwarmSession {..} addr - = runSession (connectToPeer addr) initiated - where - initiated sock = do - phs <- handshake sock (swarmHandshake ss) - ps <- openSession ss addr phs - return ps - --- | Used the a peer want to connect to the client. -acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO () -acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted - where - accepted sock = do - phs <- recvHandshake sock - swarm <- getSwarm cs $ hsInfoHash phs - ps <- openSession swarm addr phs - sendHandshake sock $ Handshake { - hsProtocol = defaultBTProtocol - , hsReserved = encodeExts $ enabledExtensions ps - , hsInfoHash = hsInfoHash phs - , hsPeerId = clientPeerId - } - return ps - -listener :: ClientSession -> Exchange -> PortNumber -> IO () -listener cs action serverPort = bracket openListener close loop - where - loop sock = forever $ handle isIOError $ do - (conn, addr) <- accept sock - putStrLn "accepted" - case addr of - SockAddrInet port host -> do - _ <- forkIO $ do - acceptPeerSession cs (PeerAddr Nothing host port) conn action - return () - _ -> return () - - isIOError :: IOError -> IO () - isIOError _ = return () - - openListener = do - sock <- socket AF_INET Stream =<< getProtocolNumber "tcp" - bindSocket sock (SockAddrInet serverPort iNADDR_ANY) - listen sock maxListenQueue - return sock diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs deleted file mode 100644 index dea47405..00000000 --- a/src/Network/BitTorrent/Sessions/Types.lhs +++ /dev/null @@ -1,316 +0,0 @@ -> -- | -> -- Copyright : (c) Sam Truzjan 2013 -> -- License : BSD3 -> -- Maintainer : pxqr.sta@gmail.com -> -- Stability : experimental -> -- Portability : portable -> -- -> -> {-# LANGUAGE RecordWildCards #-} -> {-# LANGUAGE ViewPatterns #-} -> {-# LANGUAGE TemplateHaskell #-} -> {-# LANGUAGE DeriveDataTypeable #-} -> -> module Network.BitTorrent.Sessions.Types -> ( ClientService(..) -> , ThreadCount -> , TorrentLoc (..) -> , TorrentMap -> -> , ClientSession (..) -> -> , SwarmSession (..) -> , getClientBitfield -> , getPending, available -> -> , PeerSession (..) -> , SessionCount -> , findPieceCount -> -> , SessionState (..) -> , status, bitfield -> , initialSessionState -> , getSessionState -> -> , SessionException (..) -> , isSessionException, putSessionException -> ) where - -> import Control.Applicative -> import Control.Concurrent -> import Control.Concurrent.STM -> import Control.Concurrent.MSem as MSem -> import Control.Lens -> import Control.Exception - -> import Data.IORef -> import Data.Default -> import Data.Function -> import Data.Map as M -> import Data.HashMap.Strict as HM -> import Data.Ord -> import Data.Set as S -> import Data.Typeable -> import Text.PrettyPrint - -> import Network - -> import Data.Torrent.Bitfield as BF -> import Network.BitTorrent.Extension -> import Network.BitTorrent.Peer -> import Network.BitTorrent.Exchange.Protocol as BT -> import System.Torrent.Storage - -Thread layout ------------------------------------------------------------------------- - -When client session created 2 new threads appear: - - * Peer listener - accept new P2P connection initiated by other -peers; - - * Tracker announcer - announce that the peer have this torrent. - - * OPTIONAL: DHT listener - replies to DHT requests; - -When swarn session created 3 new threads appear: - - * DHT request loop asks for new peers; - - * Tracker request loop asks for new peers; - - * controller which fork new avaand manage running P2P sessions. - -Peer session is one always forked thread. - -When client\/swarm\/peer session gets closed kill the corresponding -threads, but flush data to disc. (for e.g. storage block map) - -So for e.g., in order to obtain our first block we need to spawn at -least 7 threads: main thread, 2 client session threads, 3 swarm session -threads and PeerSession thread. - -Thread throttling ------------------------------------------------------------------------- - -If we will not restrict number of threads we could end up -with thousands of connected swarm and make no particular progress. - -Note also we do not bound number of swarms! This is not optimal -strategy because each swarm might have say 1 thread and we could end -up bounded by the meaningless limit. Bounding global number of p2p -sessions should work better, and simpler. - -**TODO:** priority based throttling: leecher thread have more priority -than seeder threads. - -> -- | Each client might have a limited number of threads. -> type ThreadCount = Int - -Client Services ------------------------------------------------------------------------- - -There are two servers started as client start: - - * DHT node listener - needed by other peers to discover - * Peer listener - need by other peers to join this client. - -Thus any client (assuming DHT is enabled) provides at least 2 services -so we can abstract out into ClientService: - -> data ClientService = ClientService { -> servPort :: !PortNumber -> , servThread :: !ThreadId -> } deriving Show - -Torrent Map ------------------------------------------------------------------------- - -TODO: keep track global peer have piece set. - -Keeping all seeding torrent metafiles in memory is a _bad_ idea: for -1TB of data we need at least 100MB of metadata. (using 256KB piece -size). This solution do not scale further. - -To avoid this we keep just *metainfo* about *metainfo*: - -> -- | Local info about torrent location. -> data TorrentLoc = TorrentLoc { -> -- | Full path to .torrent metafile. -> metafilePath :: FilePath -> -- | Full path to directory contating content files associated -> -- with the metafile. -> , dataDirPath :: FilePath -> } deriving Show - -TorrentMap is used to keep track all known torrents for the -client. When some peer trying to connect to us it's necessary to -dispatch appropriate 'SwarmSession' (or start new one if there are -none) in the listener loop: we only know 'InfoHash' from 'Handshake' -but nothing more. So to accept new 'PeerSession' we need to lookup -torrent metainfo and content files (if there are some) by the -'InfoHash' and only after that enter exchange loop. - -Solution with TorrentLoc is much better and takes much more less -space, moreover it depends on count of torrents but not on count of -data itself. To scale further, in future we might add something like -database (for e.g. sqlite) for this kind of things. - -> -- | Used to find torrent info and data in order to accept connection. -> type TorrentMap = HashMap InfoHash TorrentLoc - -While *registering* torrent we need to check if torrent metafile is -correct, all the files are present in the filesystem and so -forth. However content validation using hashes will take a long time, -so we need to do this on demand: if a peer asks for a block, we -validate corresponding piece and only after read and send the block -back. - -Client Sessions ------------------------------------------------------------------------- - -Basically, client session should contain options which user -application store in configuration files and related to the -protocol. Moreover it should contain the all client identification -info, for e.g. DHT. - -Client session is the basic unit of bittorrent network, it has: - - * The /peer ID/ used as unique identifier of the client in -network. Obviously, this value is not changed during client session. - - * The number of /protocol extensions/ it might use. This value is -static as well, but if you want to dynamically reconfigure the client -you might kill the end the current session and create a new with the -fresh required extensions. - - * The number of /swarms/ to join, each swarm described by the -'SwarmSession'. - -Normally, you would have one client session, however, if we needed, in -one application we could have many clients with different peer ID's -and different enabled extensions at the same time. - -> -- | -> data ClientSession = ClientSession { -> -- | Used in handshakes and discovery mechanism. -> clientPeerId :: !PeerId - -> -- | Extensions we should try to use. Hovewer some particular peer -> -- might not support some extension, so we keep enabledExtension in -> -- 'PeerSession'. -> , allowedExtensions :: [Extension] - -> , peerListener :: !(MVar ClientService) - -> -- | Semaphor used to bound number of active P2P sessions. -> , activeThreads :: !(MSem ThreadCount) - -> -- | Max number of active connections. -> , maxActive :: !ThreadCount - -> -- | Used to traverse the swarm session. -> , swarmSessions :: !(TVar (Map InfoHash SwarmSession)) - -> -- | Used to keep track global client progress. -> , currentProgress :: !(TVar Progress) - -> -- | Used to keep track available torrents. -> , torrentMap :: !(TVar TorrentMap) -> } - -NOTE: currentProgress field is reduntant: progress depends on the all swarm -bitfields maybe we can remove the 'currentProgress' and compute it on -demand? - -> instance Eq ClientSession where -> (==) = (==) `on` clientPeerId - -> instance Ord ClientSession where -> compare = comparing clientPeerId - -Swarm sessions ------------------------------------------------------------------------- - -NOTE: If client is a leecher then there is NO particular reason to -set max sessions count more than the_number_of_unchoke_slots * k: - - * thread slot(activeThread semaphore) - * will take but no - -So if client is a leecher then max sessions count depends on the -number of unchoke slots. - -> -- | Used to bound the number of simultaneous connections and, which -> -- is the same, P2P sessions within the swarm session. -> type SessionCount = Int - -However if client is a seeder then the value depends on . - -> -- | Swarm session is -> data SwarmSession = SwarmSession { -> torrentMeta :: !Torrent - -> , clientSession :: !ClientSession - -TODO: lower "vacantPeers" when client becomes seeder according to -throttling policy. - -Represent count of peers we _currently_ can connect to in the -swarm. Used to bound number of concurrent threads. See also *Thread -Throttling* section. - -> , vacantPeers :: !(MSem SessionCount) - -Client bitfield is used to keep track "the client have" piece set. -Modify this carefully always updating global progress. - -> , storage :: !Storage - -We keep set of the all connected peers for the each particular torrent -to prevent duplicated and therefore reduntant TCP connections. For -example consider the following very simle and realistic scenario: - - * Peer A lookup tracker for peers. - - * Peer B lookup tracker for peers. - - * Finally, Peer A connect to B and Peer B connect to peer A -simultaneously. - -There some other situation the problem may occur: duplicates in -successive tracker responses, tracker and DHT returns. So without any -protection we end up with two session between the same peers. That's -bad because this could lead: - - * Reduced throughput - multiple sessions between the same peers will -mutiply control overhead (control messages, session state). - - * Thread occupation - duplicated sessions will eat thread slots and -discourage other, possible more useful, peers to establish connection. - -To avoid this we could check, into the one transaction, if a peer is -already connected and add a connection only if it is not. - -> , connectedPeers :: !(TVar (Set PeerSession)) - -TODO: use bounded broadcast chan with priority queue and drop old entries. - -Channel used for replicate messages across all peers in swarm. For -exsample if we get some piece we should sent to all connected (and -interested in) peers HAVE message. - -> , broadcastMessages :: !(TChan Message) -> } - -INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers - -> instance Eq SwarmSession where -> (==) = (==) `on` (tInfoHash . torrentMeta) - -> instance Ord SwarmSession where -> compare = comparing (tInfoHash . torrentMeta) - -> getClientBitfield :: SwarmSession -> IO Bitfield -> getClientBitfield SwarmSession {..} = atomically $ getCompleteBitfield storage - -- cgit v1.2.3