diff options
Diffstat (limited to 'src/Network/BitTorrent/Internal.hs')
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 124 |
1 files changed, 107 insertions, 17 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index f163cadb..38388b9a 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -15,6 +15,7 @@ | |||
15 | -- | 15 | -- |
16 | {-# LANGUAGE OverloadedStrings #-} | 16 | {-# LANGUAGE OverloadedStrings #-} |
17 | {-# LANGUAGE RecordWildCards #-} | 17 | {-# LANGUAGE RecordWildCards #-} |
18 | {-# LANGUAGE ViewPatterns #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | 19 | {-# LANGUAGE TemplateHaskell #-} |
19 | {-# LANGUAGE DeriveDataTypeable #-} | 20 | {-# LANGUAGE DeriveDataTypeable #-} |
20 | module Network.BitTorrent.Internal | 21 | module Network.BitTorrent.Internal |
@@ -47,6 +48,8 @@ module Network.BitTorrent.Internal | |||
47 | , leaveSwarm | 48 | , leaveSwarm |
48 | , waitVacancy | 49 | , waitVacancy |
49 | 50 | ||
51 | , available | ||
52 | |||
50 | -- * Peer | 53 | -- * Peer |
51 | , PeerSession( PeerSession, connectedPeerAddr | 54 | , PeerSession( PeerSession, connectedPeerAddr |
52 | , swarmSession, enabledExtensions | 55 | , swarmSession, enabledExtensions |
@@ -54,6 +57,7 @@ module Network.BitTorrent.Internal | |||
54 | ) | 57 | ) |
55 | , SessionState | 58 | , SessionState |
56 | , withPeerSession | 59 | , withPeerSession |
60 | , getPending | ||
57 | 61 | ||
58 | -- ** Exceptions | 62 | -- ** Exceptions |
59 | , SessionException(..) | 63 | , SessionException(..) |
@@ -68,6 +72,8 @@ module Network.BitTorrent.Internal | |||
68 | , updateIncoming, updateOutcoming | 72 | , updateIncoming, updateOutcoming |
69 | ) where | 73 | ) where |
70 | 74 | ||
75 | import Prelude hiding (mapM_) | ||
76 | |||
71 | import Control.Applicative | 77 | import Control.Applicative |
72 | import Control.Concurrent | 78 | import Control.Concurrent |
73 | import Control.Concurrent.STM | 79 | import Control.Concurrent.STM |
@@ -79,6 +85,7 @@ import Control.Monad.Trans | |||
79 | import Data.IORef | 85 | import Data.IORef |
80 | import Data.Default | 86 | import Data.Default |
81 | import Data.Function | 87 | import Data.Function |
88 | import Data.Foldable (mapM_) | ||
82 | import Data.Ord | 89 | import Data.Ord |
83 | import Data.Set as S | 90 | import Data.Set as S |
84 | import Data.Typeable | 91 | import Data.Typeable |
@@ -104,7 +111,7 @@ import Network.BitTorrent.Tracker.Protocol as BT | |||
104 | -----------------------------------------------------------------------} | 111 | -----------------------------------------------------------------------} |
105 | 112 | ||
106 | -- | 'Progress' contains upload/download/left stats about | 113 | -- | 'Progress' contains upload/download/left stats about |
107 | -- current client state and used to notify the tracker | 114 | -- current client state and used to notify the tracker. |
108 | -- | 115 | -- |
109 | -- This data is considered as dynamic within one client | 116 | -- This data is considered as dynamic within one client |
110 | -- session. This data also should be shared across client | 117 | -- session. This data also should be shared across client |
@@ -112,12 +119,14 @@ import Network.BitTorrent.Tracker.Protocol as BT | |||
112 | -- to get initial 'Progress'. | 119 | -- to get initial 'Progress'. |
113 | -- | 120 | -- |
114 | data Progress = Progress { | 121 | data Progress = Progress { |
115 | prUploaded :: !Integer -- ^ Total amount of bytes uploaded. | 122 | _uploaded :: !Integer -- ^ Total amount of bytes uploaded. |
116 | , prDownloaded :: !Integer -- ^ Total amount of bytes downloaded. | 123 | , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. |
117 | , prLeft :: !Integer -- ^ Total amount of bytes left. | 124 | , _left :: !Integer -- ^ Total amount of bytes left. |
118 | } deriving (Show, Read, Eq) | 125 | } deriving (Show, Read, Eq) |
119 | 126 | ||
120 | -- TODO make lenses | 127 | -- TODO use atomic bits and Word64 |
128 | |||
129 | $(makeLenses ''Progress) | ||
121 | 130 | ||
122 | -- | Initial progress is used when there are no session before. | 131 | -- | Initial progress is used when there are no session before. |
123 | -- | 132 | -- |
@@ -128,6 +137,29 @@ data Progress = Progress { | |||
128 | startProgress :: Integer -> Progress | 137 | startProgress :: Integer -> Progress |
129 | startProgress = Progress 0 0 | 138 | startProgress = Progress 0 0 |
130 | 139 | ||
140 | -- | Used when the client download some data from /any/ peer. | ||
141 | downloadedProgress :: Int -> Progress -> Progress | ||
142 | downloadedProgress (fromIntegral -> amount) | ||
143 | = (left -~ amount) | ||
144 | . (downloaded +~ amount) | ||
145 | {-# INLINE downloadedProgress #-} | ||
146 | |||
147 | -- | Used when the client upload some data to /any/ peer. | ||
148 | uploadedProgress :: Int -> Progress -> Progress | ||
149 | uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
150 | {-# INLINE uploadedProgress #-} | ||
151 | |||
152 | -- | Used when leecher join client session. | ||
153 | enqueuedProgress :: Int -> Progress -> Progress | ||
154 | enqueuedProgress (fromIntegral -> amount) = left +~ amount | ||
155 | {-# INLINE enqueuedProgress #-} | ||
156 | |||
157 | -- | Used when leecher leave client session. | ||
158 | -- (e.g. user deletes not completed torrent) | ||
159 | dequeuedProgress :: Int -> Progress -> Progress | ||
160 | dequeuedProgress (fromIntegral -> amount) = left -~ amount | ||
161 | {-# INLINE dequeuedProgress #-} | ||
162 | |||
131 | {----------------------------------------------------------------------- | 163 | {----------------------------------------------------------------------- |
132 | Client session | 164 | Client session |
133 | -----------------------------------------------------------------------} | 165 | -----------------------------------------------------------------------} |
@@ -193,6 +225,9 @@ data ClientSession = ClientSession { | |||
193 | , currentProgress :: !(TVar Progress) | 225 | , currentProgress :: !(TVar Progress) |
194 | } | 226 | } |
195 | 227 | ||
228 | -- currentProgress field is reduntant: progress depends on the all swarm bitfields | ||
229 | -- maybe we can remove the 'currentProgress' and compute it on demand? | ||
230 | |||
196 | instance Eq ClientSession where | 231 | instance Eq ClientSession where |
197 | (==) = (==) `on` clientPeerID | 232 | (==) = (==) `on` clientPeerID |
198 | 233 | ||
@@ -274,7 +309,15 @@ data SwarmSession = SwarmSession { | |||
274 | 309 | ||
275 | -- | Modify this carefully updating global progress. | 310 | -- | Modify this carefully updating global progress. |
276 | , clientBitfield :: !(TVar Bitfield) | 311 | , clientBitfield :: !(TVar Bitfield) |
312 | |||
277 | , connectedPeers :: !(TVar (Set PeerSession)) | 313 | , connectedPeers :: !(TVar (Set PeerSession)) |
314 | |||
315 | -- TODO use bounded broadcast chan with priority queue and drop old entries | ||
316 | -- | Channel used for replicate messages across all peers in | ||
317 | -- swarm. For exsample if we get some piece we should sent to all | ||
318 | -- connected (and interested in) peers HAVE message. | ||
319 | -- | ||
320 | , broadcastMessages :: !(TChan Message) | ||
278 | } | 321 | } |
279 | 322 | ||
280 | -- INVARIANT: | 323 | -- INVARIANT: |
@@ -294,6 +337,7 @@ newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | |||
294 | <*> MSem.new n | 337 | <*> MSem.new n |
295 | <*> newTVarIO bf | 338 | <*> newTVarIO bf |
296 | <*> newTVarIO S.empty | 339 | <*> newTVarIO S.empty |
340 | <*> newBroadcastTChanIO | ||
297 | 341 | ||
298 | -- | New swarm session in which the client allowed to upload only. | 342 | -- | New swarm session in which the client allowed to upload only. |
299 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | 343 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession |
@@ -342,6 +386,9 @@ waitVacancy se = | |||
342 | bracket (enterSwarm se) (const (leaveSwarm se)) | 386 | bracket (enterSwarm se) (const (leaveSwarm se)) |
343 | . const | 387 | . const |
344 | 388 | ||
389 | pieceLength :: SwarmSession -> Int | ||
390 | pieceLength = ciPieceLength . tInfo . torrentMeta | ||
391 | |||
345 | {----------------------------------------------------------------------- | 392 | {----------------------------------------------------------------------- |
346 | Peer session | 393 | Peer session |
347 | -----------------------------------------------------------------------} | 394 | -----------------------------------------------------------------------} |
@@ -380,17 +427,15 @@ data PeerSession = PeerSession { | |||
380 | , outcomingTimeout :: !TimeoutKey | 427 | , outcomingTimeout :: !TimeoutKey |
381 | 428 | ||
382 | -- TODO use dupChan for broadcasting | 429 | -- TODO use dupChan for broadcasting |
383 | 430 | -- | Broadcast messages waiting to be sent to peer. | |
384 | -- | Channel used for replicate messages across all peers in | 431 | , pendingMessages :: !(TChan Message) |
385 | -- swarm. For exsample if we get some piece we should sent to all | ||
386 | -- connected (and interested in) peers HAVE message. | ||
387 | -- | ||
388 | , broadcastMessages :: !(Chan [Message]) | ||
389 | 432 | ||
390 | -- | Dymanic P2P data. | 433 | -- | Dymanic P2P data. |
391 | , sessionState :: !(IORef SessionState) | 434 | , sessionState :: !(IORef SessionState) |
392 | } | 435 | } |
393 | 436 | ||
437 | -- TODO unpack some fields | ||
438 | |||
394 | data SessionState = SessionState { | 439 | data SessionState = SessionState { |
395 | _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | 440 | _bitfield :: !Bitfield -- ^ Other peer Have bitfield. |
396 | , _status :: !SessionStatus -- ^ Status of both peers. | 441 | , _status :: !SessionStatus -- ^ Status of both peers. |
@@ -451,7 +496,7 @@ withPeerSession ss @ SwarmSession {..} addr | |||
451 | maxIncomingTime (return ()) | 496 | maxIncomingTime (return ()) |
452 | <*> registerTimeout (eventManager clientSession) | 497 | <*> registerTimeout (eventManager clientSession) |
453 | maxOutcomingTime (sendKA sock) | 498 | maxOutcomingTime (sendKA sock) |
454 | <*> newChan | 499 | <*> atomically (dupTChan broadcastMessages) |
455 | <*> do { | 500 | <*> do { |
456 | ; tc <- totalCount <$> readTVarIO clientBitfield | 501 | ; tc <- totalCount <$> readTVarIO clientBitfield |
457 | ; newIORef (SessionState (haveNone tc) def) | 502 | ; newIORef (SessionState (haveNone tc) def) |
@@ -468,10 +513,55 @@ withPeerSession ss @ SwarmSession {..} addr | |||
468 | findPieceCount :: PeerSession -> PieceCount | 513 | findPieceCount :: PeerSession -> PieceCount |
469 | findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | 514 | findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession |
470 | 515 | ||
471 | -- TODO use this type for broadcast messages instead of 'Message' | 516 | {----------------------------------------------------------------------- |
472 | --data Signal = | 517 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece |
473 | --nextBroadcast :: P2P (Maybe Signal) | 518 | -----------------------------------------------------------------------} |
474 | --nextBroadcast = | 519 | |
520 | -- here we should enqueue broadcast messages and keep in mind that: | ||
521 | -- | ||
522 | -- * We should enqueue broadcast events as they are appear. | ||
523 | -- * We should yield broadcast messages as fast as we get them. | ||
524 | -- | ||
525 | -- these 2 phases might differ in time significantly | ||
526 | |||
527 | -- TODO do this; but only when it'll be clean which other broadcast | ||
528 | -- messages & events we should send | ||
529 | |||
530 | -- 1. Update client have bitfield --\____ in one transaction; | ||
531 | -- 2. Update downloaded stats --/ | ||
532 | -- 3. Signal to the all other peer about this. | ||
533 | |||
534 | available :: Bitfield -> SwarmSession -> IO () | ||
535 | available bf se @ SwarmSession {..} = mark >> atomically broadcast | ||
536 | where | ||
537 | mark = do | ||
538 | let bytes = pieceLength se * BF.haveCount bf | ||
539 | atomically $ do | ||
540 | modifyTVar' clientBitfield (BF.union bf) | ||
541 | modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
542 | |||
543 | broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
544 | |||
545 | |||
546 | -- TODO compute size of messages: if it's faster to send Bitfield | ||
547 | -- instead many Have do that | ||
548 | -- | ||
549 | -- also if there is single Have message in queue then the | ||
550 | -- corresponding piece is likely still in memory or disc cache, | ||
551 | -- when we can send SuggestPiece | ||
552 | |||
553 | -- | Get pending messages queue appeared in result of asynchronously | ||
554 | -- changed client state. Resulting queue should be sent to a peer | ||
555 | -- immediately. | ||
556 | getPending :: PeerSession -> IO [Message] | ||
557 | getPending PeerSession {..} = atomically (readAvail pendingMessages) | ||
558 | |||
559 | readAvail :: TChan a -> STM [a] | ||
560 | readAvail chan = do | ||
561 | m <- tryReadTChan chan | ||
562 | case m of | ||
563 | Just a -> (:) <$> pure a <*> readAvail chan | ||
564 | Nothing -> return [] | ||
475 | 565 | ||
476 | {----------------------------------------------------------------------- | 566 | {----------------------------------------------------------------------- |
477 | Timeouts | 567 | Timeouts |