From f556bf196bf07308f024cc43c1a51dfd4c21188c Mon Sep 17 00:00:00 2001 From: Sam T Date: Sat, 29 Jun 2013 23:22:25 +0400 Subject: + Scetch basic broadcasting. --- src/Network/BitTorrent.hs | 12 +++- src/Network/BitTorrent/Exchange.hs | 18 +++--- src/Network/BitTorrent/Internal.hs | 124 ++++++++++++++++++++++++++++++++----- src/Network/BitTorrent/Tracker.hs | 6 +- 4 files changed, 129 insertions(+), 31 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index b97db4b0..86c7802b 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs @@ -33,8 +33,15 @@ module Network.BitTorrent , SessionCount , getSessionCount + -- * Storage + , Storage + , bindTo + , unbind + -- * Discovery , discover + , exchange + -- * Peer to Peer , P2P @@ -122,12 +129,13 @@ discover swarm action = do -- \---------------------------/ -- + -- | Default P2P action. exchange :: Storage -> P2P () -exchange storage = handleEvent handler +exchange storage = handleEvent (\msg -> liftIO (print msg) >> handler msg) where handler (Available bf) - | Just m <- findMin bf = return (Want (BlockIx m 0 10)) + | Just m <- findMin bf = return (Want (BlockIx m 0 262144)) | otherwise = error "impossible" -- TODO findMin :: Bitfield -> PieceIx diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 3d05f7fc..505360a4 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs @@ -109,15 +109,18 @@ runPeerWire sock p2p = sinkSocket sock awaitMessage :: P2P Message -awaitMessage = P2P (ReaderT (const go)) +awaitMessage = P2P $ ReaderT $ const go where go = await >>= maybe (monadThrow PeerDisconnected) return {-# INLINE awaitMessage #-} yieldMessage :: Message -> P2P () -yieldMessage msg = P2P $ ReaderT $ \se -> C.yield msg +yieldMessage msg = P2P $ ReaderT $ const (C.yield msg) {-# INLINE yieldMessage #-} +flushPending :: P2P () +flushPending = ask >>= liftIO . getPending >>= mapM_ yieldMessage + {----------------------------------------------------------------------- P2P monad -----------------------------------------------------------------------} @@ -313,7 +316,9 @@ data Event -- forall (Fragment block). isPiece block == True -- awaitEvent :: P2P Event -awaitEvent = awaitMessage >>= go +awaitEvent = do + + awaitMessage >>= go where go KeepAlive = awaitEvent go Choke = do @@ -432,7 +437,7 @@ awaitEvent = awaitMessage >>= go -- most likely will be ignored without any network IO. -- yieldEvent :: Event -> P2P () -yieldEvent (Available _ ) = undefined +yieldEvent (Available ixs) = asks swarmSession >>= liftIO . available ixs yieldEvent (Want bix) = do offer <- peerOffer if ixPiece bix `BF.member` offer @@ -449,10 +454,5 @@ yieldEvent (Fragment blk) = do handleEvent :: (Event -> P2P Event) -> P2P () handleEvent action = awaitEvent >>= action >>= yieldEvent ---flushBroadcast :: P2P () ---flushBroadcast = nextBroadcast >>= maybe (return ()) go --- where --- go = undefined - checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool checkPiece = undefined diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index f163cadb..38388b9a 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs @@ -15,6 +15,7 @@ -- {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ViewPatterns #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE DeriveDataTypeable #-} module Network.BitTorrent.Internal @@ -47,6 +48,8 @@ module Network.BitTorrent.Internal , leaveSwarm , waitVacancy + , available + -- * Peer , PeerSession( PeerSession, connectedPeerAddr , swarmSession, enabledExtensions @@ -54,6 +57,7 @@ module Network.BitTorrent.Internal ) , SessionState , withPeerSession + , getPending -- ** Exceptions , SessionException(..) @@ -68,6 +72,8 @@ module Network.BitTorrent.Internal , updateIncoming, updateOutcoming ) where +import Prelude hiding (mapM_) + import Control.Applicative import Control.Concurrent import Control.Concurrent.STM @@ -79,6 +85,7 @@ import Control.Monad.Trans import Data.IORef import Data.Default import Data.Function +import Data.Foldable (mapM_) import Data.Ord import Data.Set as S import Data.Typeable @@ -104,7 +111,7 @@ import Network.BitTorrent.Tracker.Protocol as BT -----------------------------------------------------------------------} -- | 'Progress' contains upload/download/left stats about --- current client state and used to notify the tracker +-- current client state and used to notify the tracker. -- -- This data is considered as dynamic within one client -- session. This data also should be shared across client @@ -112,12 +119,14 @@ import Network.BitTorrent.Tracker.Protocol as BT -- to get initial 'Progress'. -- data Progress = Progress { - prUploaded :: !Integer -- ^ Total amount of bytes uploaded. - , prDownloaded :: !Integer -- ^ Total amount of bytes downloaded. - , prLeft :: !Integer -- ^ Total amount of bytes left. + _uploaded :: !Integer -- ^ Total amount of bytes uploaded. + , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. + , _left :: !Integer -- ^ Total amount of bytes left. } deriving (Show, Read, Eq) --- TODO make lenses +-- TODO use atomic bits and Word64 + +$(makeLenses ''Progress) -- | Initial progress is used when there are no session before. -- @@ -128,6 +137,29 @@ data Progress = Progress { startProgress :: Integer -> Progress startProgress = Progress 0 0 +-- | Used when the client download some data from /any/ peer. +downloadedProgress :: Int -> Progress -> Progress +downloadedProgress (fromIntegral -> amount) + = (left -~ amount) + . (downloaded +~ amount) +{-# INLINE downloadedProgress #-} + +-- | Used when the client upload some data to /any/ peer. +uploadedProgress :: Int -> Progress -> Progress +uploadedProgress (fromIntegral -> amount) = uploaded +~ amount +{-# INLINE uploadedProgress #-} + +-- | Used when leecher join client session. +enqueuedProgress :: Int -> Progress -> Progress +enqueuedProgress (fromIntegral -> amount) = left +~ amount +{-# INLINE enqueuedProgress #-} + +-- | Used when leecher leave client session. +-- (e.g. user deletes not completed torrent) +dequeuedProgress :: Int -> Progress -> Progress +dequeuedProgress (fromIntegral -> amount) = left -~ amount +{-# INLINE dequeuedProgress #-} + {----------------------------------------------------------------------- Client session -----------------------------------------------------------------------} @@ -193,6 +225,9 @@ data ClientSession = ClientSession { , currentProgress :: !(TVar Progress) } +-- currentProgress field is reduntant: progress depends on the all swarm bitfields +-- maybe we can remove the 'currentProgress' and compute it on demand? + instance Eq ClientSession where (==) = (==) `on` clientPeerID @@ -274,7 +309,15 @@ data SwarmSession = SwarmSession { -- | Modify this carefully updating global progress. , clientBitfield :: !(TVar Bitfield) + , connectedPeers :: !(TVar (Set PeerSession)) + + -- TODO use bounded broadcast chan with priority queue and drop old entries + -- | Channel used for replicate messages across all peers in + -- swarm. For exsample if we get some piece we should sent to all + -- connected (and interested in) peers HAVE message. + -- + , broadcastMessages :: !(TChan Message) } -- INVARIANT: @@ -294,6 +337,7 @@ newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} <*> MSem.new n <*> newTVarIO bf <*> newTVarIO S.empty + <*> newBroadcastTChanIO -- | New swarm session in which the client allowed to upload only. newSeeder :: ClientSession -> Torrent -> IO SwarmSession @@ -342,6 +386,9 @@ waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const +pieceLength :: SwarmSession -> Int +pieceLength = ciPieceLength . tInfo . torrentMeta + {----------------------------------------------------------------------- Peer session -----------------------------------------------------------------------} @@ -380,17 +427,15 @@ data PeerSession = PeerSession { , outcomingTimeout :: !TimeoutKey -- TODO use dupChan for broadcasting - - -- | Channel used for replicate messages across all peers in - -- swarm. For exsample if we get some piece we should sent to all - -- connected (and interested in) peers HAVE message. - -- - , broadcastMessages :: !(Chan [Message]) + -- | Broadcast messages waiting to be sent to peer. + , pendingMessages :: !(TChan Message) -- | Dymanic P2P data. - , sessionState :: !(IORef SessionState) + , sessionState :: !(IORef SessionState) } +-- TODO unpack some fields + data SessionState = SessionState { _bitfield :: !Bitfield -- ^ Other peer Have bitfield. , _status :: !SessionStatus -- ^ Status of both peers. @@ -451,7 +496,7 @@ withPeerSession ss @ SwarmSession {..} addr maxIncomingTime (return ()) <*> registerTimeout (eventManager clientSession) maxOutcomingTime (sendKA sock) - <*> newChan + <*> atomically (dupTChan broadcastMessages) <*> do { ; tc <- totalCount <$> readTVarIO clientBitfield ; newIORef (SessionState (haveNone tc) def) @@ -468,10 +513,55 @@ withPeerSession ss @ SwarmSession {..} addr findPieceCount :: PeerSession -> PieceCount findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession --- TODO use this type for broadcast messages instead of 'Message' ---data Signal = ---nextBroadcast :: P2P (Maybe Signal) ---nextBroadcast = +{----------------------------------------------------------------------- + Broadcasting: Have, Cancel, Bitfield, SuggestPiece +-----------------------------------------------------------------------} + +-- here we should enqueue broadcast messages and keep in mind that: +-- +-- * We should enqueue broadcast events as they are appear. +-- * We should yield broadcast messages as fast as we get them. +-- +-- these 2 phases might differ in time significantly + +-- TODO do this; but only when it'll be clean which other broadcast +-- messages & events we should send + +-- 1. Update client have bitfield --\____ in one transaction; +-- 2. Update downloaded stats --/ +-- 3. Signal to the all other peer about this. + +available :: Bitfield -> SwarmSession -> IO () +available bf se @ SwarmSession {..} = mark >> atomically broadcast + where + mark = do + let bytes = pieceLength se * BF.haveCount bf + atomically $ do + modifyTVar' clientBitfield (BF.union bf) + modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) + + broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) + + +-- TODO compute size of messages: if it's faster to send Bitfield +-- instead many Have do that +-- +-- also if there is single Have message in queue then the +-- corresponding piece is likely still in memory or disc cache, +-- when we can send SuggestPiece + +-- | Get pending messages queue appeared in result of asynchronously +-- changed client state. Resulting queue should be sent to a peer +-- immediately. +getPending :: PeerSession -> IO [Message] +getPending PeerSession {..} = atomically (readAvail pendingMessages) + +readAvail :: TChan a -> STM [a] +readAvail chan = do + m <- tryReadTChan chan + case m of + Just a -> (:) <$> pure a <*> readAvail chan + Nothing -> return [] {----------------------------------------------------------------------- Timeouts diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index ea45b75d..bfe4182d 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs @@ -90,9 +90,9 @@ genericReq ses pr = TRequest { , reqPeerID = tconnPeerID ses , reqPort = tconnPort ses - , reqUploaded = prUploaded pr - , reqDownloaded = prDownloaded pr - , reqLeft = prLeft pr + , reqUploaded = _uploaded pr + , reqDownloaded = _downloaded pr + , reqLeft = _left pr , reqIP = Nothing , reqNumWant = Nothing -- cgit v1.2.3