summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Internal.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Internal.hs')
-rw-r--r--src/Network/BitTorrent/Internal.hs124
1 files changed, 107 insertions, 17 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
index f163cadb..38388b9a 100644
--- a/src/Network/BitTorrent/Internal.hs
+++ b/src/Network/BitTorrent/Internal.hs
@@ -15,6 +15,7 @@
15-- 15--
16{-# LANGUAGE OverloadedStrings #-} 16{-# LANGUAGE OverloadedStrings #-}
17{-# LANGUAGE RecordWildCards #-} 17{-# LANGUAGE RecordWildCards #-}
18{-# LANGUAGE ViewPatterns #-}
18{-# LANGUAGE TemplateHaskell #-} 19{-# LANGUAGE TemplateHaskell #-}
19{-# LANGUAGE DeriveDataTypeable #-} 20{-# LANGUAGE DeriveDataTypeable #-}
20module Network.BitTorrent.Internal 21module Network.BitTorrent.Internal
@@ -47,6 +48,8 @@ module Network.BitTorrent.Internal
47 , leaveSwarm 48 , leaveSwarm
48 , waitVacancy 49 , waitVacancy
49 50
51 , available
52
50 -- * Peer 53 -- * Peer
51 , PeerSession( PeerSession, connectedPeerAddr 54 , PeerSession( PeerSession, connectedPeerAddr
52 , swarmSession, enabledExtensions 55 , swarmSession, enabledExtensions
@@ -54,6 +57,7 @@ module Network.BitTorrent.Internal
54 ) 57 )
55 , SessionState 58 , SessionState
56 , withPeerSession 59 , withPeerSession
60 , getPending
57 61
58 -- ** Exceptions 62 -- ** Exceptions
59 , SessionException(..) 63 , SessionException(..)
@@ -68,6 +72,8 @@ module Network.BitTorrent.Internal
68 , updateIncoming, updateOutcoming 72 , updateIncoming, updateOutcoming
69 ) where 73 ) where
70 74
75import Prelude hiding (mapM_)
76
71import Control.Applicative 77import Control.Applicative
72import Control.Concurrent 78import Control.Concurrent
73import Control.Concurrent.STM 79import Control.Concurrent.STM
@@ -79,6 +85,7 @@ import Control.Monad.Trans
79import Data.IORef 85import Data.IORef
80import Data.Default 86import Data.Default
81import Data.Function 87import Data.Function
88import Data.Foldable (mapM_)
82import Data.Ord 89import Data.Ord
83import Data.Set as S 90import Data.Set as S
84import Data.Typeable 91import Data.Typeable
@@ -104,7 +111,7 @@ import Network.BitTorrent.Tracker.Protocol as BT
104-----------------------------------------------------------------------} 111-----------------------------------------------------------------------}
105 112
106-- | 'Progress' contains upload/download/left stats about 113-- | 'Progress' contains upload/download/left stats about
107-- current client state and used to notify the tracker 114-- current client state and used to notify the tracker.
108-- 115--
109-- This data is considered as dynamic within one client 116-- This data is considered as dynamic within one client
110-- session. This data also should be shared across client 117-- session. This data also should be shared across client
@@ -112,12 +119,14 @@ import Network.BitTorrent.Tracker.Protocol as BT
112-- to get initial 'Progress'. 119-- to get initial 'Progress'.
113-- 120--
114data Progress = Progress { 121data Progress = Progress {
115 prUploaded :: !Integer -- ^ Total amount of bytes uploaded. 122 _uploaded :: !Integer -- ^ Total amount of bytes uploaded.
116 , prDownloaded :: !Integer -- ^ Total amount of bytes downloaded. 123 , _downloaded :: !Integer -- ^ Total amount of bytes downloaded.
117 , prLeft :: !Integer -- ^ Total amount of bytes left. 124 , _left :: !Integer -- ^ Total amount of bytes left.
118 } deriving (Show, Read, Eq) 125 } deriving (Show, Read, Eq)
119 126
120-- TODO make lenses 127-- TODO use atomic bits and Word64
128
129$(makeLenses ''Progress)
121 130
122-- | Initial progress is used when there are no session before. 131-- | Initial progress is used when there are no session before.
123-- 132--
@@ -128,6 +137,29 @@ data Progress = Progress {
128startProgress :: Integer -> Progress 137startProgress :: Integer -> Progress
129startProgress = Progress 0 0 138startProgress = Progress 0 0
130 139
140-- | Used when the client download some data from /any/ peer.
141downloadedProgress :: Int -> Progress -> Progress
142downloadedProgress (fromIntegral -> amount)
143 = (left -~ amount)
144 . (downloaded +~ amount)
145{-# INLINE downloadedProgress #-}
146
147-- | Used when the client upload some data to /any/ peer.
148uploadedProgress :: Int -> Progress -> Progress
149uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
150{-# INLINE uploadedProgress #-}
151
152-- | Used when leecher join client session.
153enqueuedProgress :: Int -> Progress -> Progress
154enqueuedProgress (fromIntegral -> amount) = left +~ amount
155{-# INLINE enqueuedProgress #-}
156
157-- | Used when leecher leave client session.
158-- (e.g. user deletes not completed torrent)
159dequeuedProgress :: Int -> Progress -> Progress
160dequeuedProgress (fromIntegral -> amount) = left -~ amount
161{-# INLINE dequeuedProgress #-}
162
131{----------------------------------------------------------------------- 163{-----------------------------------------------------------------------
132 Client session 164 Client session
133-----------------------------------------------------------------------} 165-----------------------------------------------------------------------}
@@ -193,6 +225,9 @@ data ClientSession = ClientSession {
193 , currentProgress :: !(TVar Progress) 225 , currentProgress :: !(TVar Progress)
194 } 226 }
195 227
228-- currentProgress field is reduntant: progress depends on the all swarm bitfields
229-- maybe we can remove the 'currentProgress' and compute it on demand?
230
196instance Eq ClientSession where 231instance Eq ClientSession where
197 (==) = (==) `on` clientPeerID 232 (==) = (==) `on` clientPeerID
198 233
@@ -274,7 +309,15 @@ data SwarmSession = SwarmSession {
274 309
275 -- | Modify this carefully updating global progress. 310 -- | Modify this carefully updating global progress.
276 , clientBitfield :: !(TVar Bitfield) 311 , clientBitfield :: !(TVar Bitfield)
312
277 , connectedPeers :: !(TVar (Set PeerSession)) 313 , connectedPeers :: !(TVar (Set PeerSession))
314
315 -- TODO use bounded broadcast chan with priority queue and drop old entries
316 -- | Channel used for replicate messages across all peers in
317 -- swarm. For exsample if we get some piece we should sent to all
318 -- connected (and interested in) peers HAVE message.
319 --
320 , broadcastMessages :: !(TChan Message)
278 } 321 }
279 322
280-- INVARIANT: 323-- INVARIANT:
@@ -294,6 +337,7 @@ newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
294 <*> MSem.new n 337 <*> MSem.new n
295 <*> newTVarIO bf 338 <*> newTVarIO bf
296 <*> newTVarIO S.empty 339 <*> newTVarIO S.empty
340 <*> newBroadcastTChanIO
297 341
298-- | New swarm session in which the client allowed to upload only. 342-- | New swarm session in which the client allowed to upload only.
299newSeeder :: ClientSession -> Torrent -> IO SwarmSession 343newSeeder :: ClientSession -> Torrent -> IO SwarmSession
@@ -342,6 +386,9 @@ waitVacancy se =
342 bracket (enterSwarm se) (const (leaveSwarm se)) 386 bracket (enterSwarm se) (const (leaveSwarm se))
343 . const 387 . const
344 388
389pieceLength :: SwarmSession -> Int
390pieceLength = ciPieceLength . tInfo . torrentMeta
391
345{----------------------------------------------------------------------- 392{-----------------------------------------------------------------------
346 Peer session 393 Peer session
347-----------------------------------------------------------------------} 394-----------------------------------------------------------------------}
@@ -380,17 +427,15 @@ data PeerSession = PeerSession {
380 , outcomingTimeout :: !TimeoutKey 427 , outcomingTimeout :: !TimeoutKey
381 428
382 -- TODO use dupChan for broadcasting 429 -- TODO use dupChan for broadcasting
383 430 -- | Broadcast messages waiting to be sent to peer.
384 -- | Channel used for replicate messages across all peers in 431 , pendingMessages :: !(TChan Message)
385 -- swarm. For exsample if we get some piece we should sent to all
386 -- connected (and interested in) peers HAVE message.
387 --
388 , broadcastMessages :: !(Chan [Message])
389 432
390 -- | Dymanic P2P data. 433 -- | Dymanic P2P data.
391 , sessionState :: !(IORef SessionState) 434 , sessionState :: !(IORef SessionState)
392 } 435 }
393 436
437-- TODO unpack some fields
438
394data SessionState = SessionState { 439data SessionState = SessionState {
395 _bitfield :: !Bitfield -- ^ Other peer Have bitfield. 440 _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
396 , _status :: !SessionStatus -- ^ Status of both peers. 441 , _status :: !SessionStatus -- ^ Status of both peers.
@@ -451,7 +496,7 @@ withPeerSession ss @ SwarmSession {..} addr
451 maxIncomingTime (return ()) 496 maxIncomingTime (return ())
452 <*> registerTimeout (eventManager clientSession) 497 <*> registerTimeout (eventManager clientSession)
453 maxOutcomingTime (sendKA sock) 498 maxOutcomingTime (sendKA sock)
454 <*> newChan 499 <*> atomically (dupTChan broadcastMessages)
455 <*> do { 500 <*> do {
456 ; tc <- totalCount <$> readTVarIO clientBitfield 501 ; tc <- totalCount <$> readTVarIO clientBitfield
457 ; newIORef (SessionState (haveNone tc) def) 502 ; newIORef (SessionState (haveNone tc) def)
@@ -468,10 +513,55 @@ withPeerSession ss @ SwarmSession {..} addr
468findPieceCount :: PeerSession -> PieceCount 513findPieceCount :: PeerSession -> PieceCount
469findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession 514findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
470 515
471-- TODO use this type for broadcast messages instead of 'Message' 516{-----------------------------------------------------------------------
472--data Signal = 517 Broadcasting: Have, Cancel, Bitfield, SuggestPiece
473--nextBroadcast :: P2P (Maybe Signal) 518-----------------------------------------------------------------------}
474--nextBroadcast = 519
520-- here we should enqueue broadcast messages and keep in mind that:
521--
522-- * We should enqueue broadcast events as they are appear.
523-- * We should yield broadcast messages as fast as we get them.
524--
525-- these 2 phases might differ in time significantly
526
527-- TODO do this; but only when it'll be clean which other broadcast
528-- messages & events we should send
529
530-- 1. Update client have bitfield --\____ in one transaction;
531-- 2. Update downloaded stats --/
532-- 3. Signal to the all other peer about this.
533
534available :: Bitfield -> SwarmSession -> IO ()
535available bf se @ SwarmSession {..} = mark >> atomically broadcast
536 where
537 mark = do
538 let bytes = pieceLength se * BF.haveCount bf
539 atomically $ do
540 modifyTVar' clientBitfield (BF.union bf)
541 modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
542
543 broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
544
545
546-- TODO compute size of messages: if it's faster to send Bitfield
547-- instead many Have do that
548--
549-- also if there is single Have message in queue then the
550-- corresponding piece is likely still in memory or disc cache,
551-- when we can send SuggestPiece
552
553-- | Get pending messages queue appeared in result of asynchronously
554-- changed client state. Resulting queue should be sent to a peer
555-- immediately.
556getPending :: PeerSession -> IO [Message]
557getPending PeerSession {..} = atomically (readAvail pendingMessages)
558
559readAvail :: TChan a -> STM [a]
560readAvail chan = do
561 m <- tryReadTChan chan
562 case m of
563 Just a -> (:) <$> pure a <*> readAvail chan
564 Nothing -> return []
475 565
476{----------------------------------------------------------------------- 566{-----------------------------------------------------------------------
477 Timeouts 567 Timeouts