diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-06-29 23:22:25 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-06-29 23:22:25 +0400 |
commit | f556bf196bf07308f024cc43c1a51dfd4c21188c (patch) | |
tree | 228de5a632e8b758d507df7ddabf7fd85d113694 /src/Network/BitTorrent | |
parent | d4ada1b8a392d67f2835935084c5f0f3ecef2ab5 (diff) |
+ Scetch basic broadcasting.
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 18 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 124 | ||||
-rw-r--r-- | src/Network/BitTorrent/Tracker.hs | 6 |
3 files changed, 119 insertions, 29 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 3d05f7fc..505360a4 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -109,15 +109,18 @@ runPeerWire sock p2p = | |||
109 | sinkSocket sock | 109 | sinkSocket sock |
110 | 110 | ||
111 | awaitMessage :: P2P Message | 111 | awaitMessage :: P2P Message |
112 | awaitMessage = P2P (ReaderT (const go)) | 112 | awaitMessage = P2P $ ReaderT $ const go |
113 | where | 113 | where |
114 | go = await >>= maybe (monadThrow PeerDisconnected) return | 114 | go = await >>= maybe (monadThrow PeerDisconnected) return |
115 | {-# INLINE awaitMessage #-} | 115 | {-# INLINE awaitMessage #-} |
116 | 116 | ||
117 | yieldMessage :: Message -> P2P () | 117 | yieldMessage :: Message -> P2P () |
118 | yieldMessage msg = P2P $ ReaderT $ \se -> C.yield msg | 118 | yieldMessage msg = P2P $ ReaderT $ const (C.yield msg) |
119 | {-# INLINE yieldMessage #-} | 119 | {-# INLINE yieldMessage #-} |
120 | 120 | ||
121 | flushPending :: P2P () | ||
122 | flushPending = ask >>= liftIO . getPending >>= mapM_ yieldMessage | ||
123 | |||
121 | {----------------------------------------------------------------------- | 124 | {----------------------------------------------------------------------- |
122 | P2P monad | 125 | P2P monad |
123 | -----------------------------------------------------------------------} | 126 | -----------------------------------------------------------------------} |
@@ -313,7 +316,9 @@ data Event | |||
313 | -- forall (Fragment block). isPiece block == True | 316 | -- forall (Fragment block). isPiece block == True |
314 | -- | 317 | -- |
315 | awaitEvent :: P2P Event | 318 | awaitEvent :: P2P Event |
316 | awaitEvent = awaitMessage >>= go | 319 | awaitEvent = do |
320 | |||
321 | awaitMessage >>= go | ||
317 | where | 322 | where |
318 | go KeepAlive = awaitEvent | 323 | go KeepAlive = awaitEvent |
319 | go Choke = do | 324 | go Choke = do |
@@ -432,7 +437,7 @@ awaitEvent = awaitMessage >>= go | |||
432 | -- most likely will be ignored without any network IO. | 437 | -- most likely will be ignored without any network IO. |
433 | -- | 438 | -- |
434 | yieldEvent :: Event -> P2P () | 439 | yieldEvent :: Event -> P2P () |
435 | yieldEvent (Available _ ) = undefined | 440 | yieldEvent (Available ixs) = asks swarmSession >>= liftIO . available ixs |
436 | yieldEvent (Want bix) = do | 441 | yieldEvent (Want bix) = do |
437 | offer <- peerOffer | 442 | offer <- peerOffer |
438 | if ixPiece bix `BF.member` offer | 443 | if ixPiece bix `BF.member` offer |
@@ -449,10 +454,5 @@ yieldEvent (Fragment blk) = do | |||
449 | handleEvent :: (Event -> P2P Event) -> P2P () | 454 | handleEvent :: (Event -> P2P Event) -> P2P () |
450 | handleEvent action = awaitEvent >>= action >>= yieldEvent | 455 | handleEvent action = awaitEvent >>= action >>= yieldEvent |
451 | 456 | ||
452 | --flushBroadcast :: P2P () | ||
453 | --flushBroadcast = nextBroadcast >>= maybe (return ()) go | ||
454 | -- where | ||
455 | -- go = undefined | ||
456 | |||
457 | checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool | 457 | checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool |
458 | checkPiece = undefined | 458 | checkPiece = undefined |
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index f163cadb..38388b9a 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -15,6 +15,7 @@ | |||
15 | -- | 15 | -- |
16 | {-# LANGUAGE OverloadedStrings #-} | 16 | {-# LANGUAGE OverloadedStrings #-} |
17 | {-# LANGUAGE RecordWildCards #-} | 17 | {-# LANGUAGE RecordWildCards #-} |
18 | {-# LANGUAGE ViewPatterns #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | 19 | {-# LANGUAGE TemplateHaskell #-} |
19 | {-# LANGUAGE DeriveDataTypeable #-} | 20 | {-# LANGUAGE DeriveDataTypeable #-} |
20 | module Network.BitTorrent.Internal | 21 | module Network.BitTorrent.Internal |
@@ -47,6 +48,8 @@ module Network.BitTorrent.Internal | |||
47 | , leaveSwarm | 48 | , leaveSwarm |
48 | , waitVacancy | 49 | , waitVacancy |
49 | 50 | ||
51 | , available | ||
52 | |||
50 | -- * Peer | 53 | -- * Peer |
51 | , PeerSession( PeerSession, connectedPeerAddr | 54 | , PeerSession( PeerSession, connectedPeerAddr |
52 | , swarmSession, enabledExtensions | 55 | , swarmSession, enabledExtensions |
@@ -54,6 +57,7 @@ module Network.BitTorrent.Internal | |||
54 | ) | 57 | ) |
55 | , SessionState | 58 | , SessionState |
56 | , withPeerSession | 59 | , withPeerSession |
60 | , getPending | ||
57 | 61 | ||
58 | -- ** Exceptions | 62 | -- ** Exceptions |
59 | , SessionException(..) | 63 | , SessionException(..) |
@@ -68,6 +72,8 @@ module Network.BitTorrent.Internal | |||
68 | , updateIncoming, updateOutcoming | 72 | , updateIncoming, updateOutcoming |
69 | ) where | 73 | ) where |
70 | 74 | ||
75 | import Prelude hiding (mapM_) | ||
76 | |||
71 | import Control.Applicative | 77 | import Control.Applicative |
72 | import Control.Concurrent | 78 | import Control.Concurrent |
73 | import Control.Concurrent.STM | 79 | import Control.Concurrent.STM |
@@ -79,6 +85,7 @@ import Control.Monad.Trans | |||
79 | import Data.IORef | 85 | import Data.IORef |
80 | import Data.Default | 86 | import Data.Default |
81 | import Data.Function | 87 | import Data.Function |
88 | import Data.Foldable (mapM_) | ||
82 | import Data.Ord | 89 | import Data.Ord |
83 | import Data.Set as S | 90 | import Data.Set as S |
84 | import Data.Typeable | 91 | import Data.Typeable |
@@ -104,7 +111,7 @@ import Network.BitTorrent.Tracker.Protocol as BT | |||
104 | -----------------------------------------------------------------------} | 111 | -----------------------------------------------------------------------} |
105 | 112 | ||
106 | -- | 'Progress' contains upload/download/left stats about | 113 | -- | 'Progress' contains upload/download/left stats about |
107 | -- current client state and used to notify the tracker | 114 | -- current client state and used to notify the tracker. |
108 | -- | 115 | -- |
109 | -- This data is considered as dynamic within one client | 116 | -- This data is considered as dynamic within one client |
110 | -- session. This data also should be shared across client | 117 | -- session. This data also should be shared across client |
@@ -112,12 +119,14 @@ import Network.BitTorrent.Tracker.Protocol as BT | |||
112 | -- to get initial 'Progress'. | 119 | -- to get initial 'Progress'. |
113 | -- | 120 | -- |
114 | data Progress = Progress { | 121 | data Progress = Progress { |
115 | prUploaded :: !Integer -- ^ Total amount of bytes uploaded. | 122 | _uploaded :: !Integer -- ^ Total amount of bytes uploaded. |
116 | , prDownloaded :: !Integer -- ^ Total amount of bytes downloaded. | 123 | , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. |
117 | , prLeft :: !Integer -- ^ Total amount of bytes left. | 124 | , _left :: !Integer -- ^ Total amount of bytes left. |
118 | } deriving (Show, Read, Eq) | 125 | } deriving (Show, Read, Eq) |
119 | 126 | ||
120 | -- TODO make lenses | 127 | -- TODO use atomic bits and Word64 |
128 | |||
129 | $(makeLenses ''Progress) | ||
121 | 130 | ||
122 | -- | Initial progress is used when there are no session before. | 131 | -- | Initial progress is used when there are no session before. |
123 | -- | 132 | -- |
@@ -128,6 +137,29 @@ data Progress = Progress { | |||
128 | startProgress :: Integer -> Progress | 137 | startProgress :: Integer -> Progress |
129 | startProgress = Progress 0 0 | 138 | startProgress = Progress 0 0 |
130 | 139 | ||
140 | -- | Used when the client download some data from /any/ peer. | ||
141 | downloadedProgress :: Int -> Progress -> Progress | ||
142 | downloadedProgress (fromIntegral -> amount) | ||
143 | = (left -~ amount) | ||
144 | . (downloaded +~ amount) | ||
145 | {-# INLINE downloadedProgress #-} | ||
146 | |||
147 | -- | Used when the client upload some data to /any/ peer. | ||
148 | uploadedProgress :: Int -> Progress -> Progress | ||
149 | uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
150 | {-# INLINE uploadedProgress #-} | ||
151 | |||
152 | -- | Used when leecher join client session. | ||
153 | enqueuedProgress :: Int -> Progress -> Progress | ||
154 | enqueuedProgress (fromIntegral -> amount) = left +~ amount | ||
155 | {-# INLINE enqueuedProgress #-} | ||
156 | |||
157 | -- | Used when leecher leave client session. | ||
158 | -- (e.g. user deletes not completed torrent) | ||
159 | dequeuedProgress :: Int -> Progress -> Progress | ||
160 | dequeuedProgress (fromIntegral -> amount) = left -~ amount | ||
161 | {-# INLINE dequeuedProgress #-} | ||
162 | |||
131 | {----------------------------------------------------------------------- | 163 | {----------------------------------------------------------------------- |
132 | Client session | 164 | Client session |
133 | -----------------------------------------------------------------------} | 165 | -----------------------------------------------------------------------} |
@@ -193,6 +225,9 @@ data ClientSession = ClientSession { | |||
193 | , currentProgress :: !(TVar Progress) | 225 | , currentProgress :: !(TVar Progress) |
194 | } | 226 | } |
195 | 227 | ||
228 | -- currentProgress field is reduntant: progress depends on the all swarm bitfields | ||
229 | -- maybe we can remove the 'currentProgress' and compute it on demand? | ||
230 | |||
196 | instance Eq ClientSession where | 231 | instance Eq ClientSession where |
197 | (==) = (==) `on` clientPeerID | 232 | (==) = (==) `on` clientPeerID |
198 | 233 | ||
@@ -274,7 +309,15 @@ data SwarmSession = SwarmSession { | |||
274 | 309 | ||
275 | -- | Modify this carefully updating global progress. | 310 | -- | Modify this carefully updating global progress. |
276 | , clientBitfield :: !(TVar Bitfield) | 311 | , clientBitfield :: !(TVar Bitfield) |
312 | |||
277 | , connectedPeers :: !(TVar (Set PeerSession)) | 313 | , connectedPeers :: !(TVar (Set PeerSession)) |
314 | |||
315 | -- TODO use bounded broadcast chan with priority queue and drop old entries | ||
316 | -- | Channel used for replicate messages across all peers in | ||
317 | -- swarm. For exsample if we get some piece we should sent to all | ||
318 | -- connected (and interested in) peers HAVE message. | ||
319 | -- | ||
320 | , broadcastMessages :: !(TChan Message) | ||
278 | } | 321 | } |
279 | 322 | ||
280 | -- INVARIANT: | 323 | -- INVARIANT: |
@@ -294,6 +337,7 @@ newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | |||
294 | <*> MSem.new n | 337 | <*> MSem.new n |
295 | <*> newTVarIO bf | 338 | <*> newTVarIO bf |
296 | <*> newTVarIO S.empty | 339 | <*> newTVarIO S.empty |
340 | <*> newBroadcastTChanIO | ||
297 | 341 | ||
298 | -- | New swarm session in which the client allowed to upload only. | 342 | -- | New swarm session in which the client allowed to upload only. |
299 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | 343 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession |
@@ -342,6 +386,9 @@ waitVacancy se = | |||
342 | bracket (enterSwarm se) (const (leaveSwarm se)) | 386 | bracket (enterSwarm se) (const (leaveSwarm se)) |
343 | . const | 387 | . const |
344 | 388 | ||
389 | pieceLength :: SwarmSession -> Int | ||
390 | pieceLength = ciPieceLength . tInfo . torrentMeta | ||
391 | |||
345 | {----------------------------------------------------------------------- | 392 | {----------------------------------------------------------------------- |
346 | Peer session | 393 | Peer session |
347 | -----------------------------------------------------------------------} | 394 | -----------------------------------------------------------------------} |
@@ -380,17 +427,15 @@ data PeerSession = PeerSession { | |||
380 | , outcomingTimeout :: !TimeoutKey | 427 | , outcomingTimeout :: !TimeoutKey |
381 | 428 | ||
382 | -- TODO use dupChan for broadcasting | 429 | -- TODO use dupChan for broadcasting |
383 | 430 | -- | Broadcast messages waiting to be sent to peer. | |
384 | -- | Channel used for replicate messages across all peers in | 431 | , pendingMessages :: !(TChan Message) |
385 | -- swarm. For exsample if we get some piece we should sent to all | ||
386 | -- connected (and interested in) peers HAVE message. | ||
387 | -- | ||
388 | , broadcastMessages :: !(Chan [Message]) | ||
389 | 432 | ||
390 | -- | Dymanic P2P data. | 433 | -- | Dymanic P2P data. |
391 | , sessionState :: !(IORef SessionState) | 434 | , sessionState :: !(IORef SessionState) |
392 | } | 435 | } |
393 | 436 | ||
437 | -- TODO unpack some fields | ||
438 | |||
394 | data SessionState = SessionState { | 439 | data SessionState = SessionState { |
395 | _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | 440 | _bitfield :: !Bitfield -- ^ Other peer Have bitfield. |
396 | , _status :: !SessionStatus -- ^ Status of both peers. | 441 | , _status :: !SessionStatus -- ^ Status of both peers. |
@@ -451,7 +496,7 @@ withPeerSession ss @ SwarmSession {..} addr | |||
451 | maxIncomingTime (return ()) | 496 | maxIncomingTime (return ()) |
452 | <*> registerTimeout (eventManager clientSession) | 497 | <*> registerTimeout (eventManager clientSession) |
453 | maxOutcomingTime (sendKA sock) | 498 | maxOutcomingTime (sendKA sock) |
454 | <*> newChan | 499 | <*> atomically (dupTChan broadcastMessages) |
455 | <*> do { | 500 | <*> do { |
456 | ; tc <- totalCount <$> readTVarIO clientBitfield | 501 | ; tc <- totalCount <$> readTVarIO clientBitfield |
457 | ; newIORef (SessionState (haveNone tc) def) | 502 | ; newIORef (SessionState (haveNone tc) def) |
@@ -468,10 +513,55 @@ withPeerSession ss @ SwarmSession {..} addr | |||
468 | findPieceCount :: PeerSession -> PieceCount | 513 | findPieceCount :: PeerSession -> PieceCount |
469 | findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | 514 | findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession |
470 | 515 | ||
471 | -- TODO use this type for broadcast messages instead of 'Message' | 516 | {----------------------------------------------------------------------- |
472 | --data Signal = | 517 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece |
473 | --nextBroadcast :: P2P (Maybe Signal) | 518 | -----------------------------------------------------------------------} |
474 | --nextBroadcast = | 519 | |
520 | -- here we should enqueue broadcast messages and keep in mind that: | ||
521 | -- | ||
522 | -- * We should enqueue broadcast events as they are appear. | ||
523 | -- * We should yield broadcast messages as fast as we get them. | ||
524 | -- | ||
525 | -- these 2 phases might differ in time significantly | ||
526 | |||
527 | -- TODO do this; but only when it'll be clean which other broadcast | ||
528 | -- messages & events we should send | ||
529 | |||
530 | -- 1. Update client have bitfield --\____ in one transaction; | ||
531 | -- 2. Update downloaded stats --/ | ||
532 | -- 3. Signal to the all other peer about this. | ||
533 | |||
534 | available :: Bitfield -> SwarmSession -> IO () | ||
535 | available bf se @ SwarmSession {..} = mark >> atomically broadcast | ||
536 | where | ||
537 | mark = do | ||
538 | let bytes = pieceLength se * BF.haveCount bf | ||
539 | atomically $ do | ||
540 | modifyTVar' clientBitfield (BF.union bf) | ||
541 | modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
542 | |||
543 | broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
544 | |||
545 | |||
546 | -- TODO compute size of messages: if it's faster to send Bitfield | ||
547 | -- instead many Have do that | ||
548 | -- | ||
549 | -- also if there is single Have message in queue then the | ||
550 | -- corresponding piece is likely still in memory or disc cache, | ||
551 | -- when we can send SuggestPiece | ||
552 | |||
553 | -- | Get pending messages queue appeared in result of asynchronously | ||
554 | -- changed client state. Resulting queue should be sent to a peer | ||
555 | -- immediately. | ||
556 | getPending :: PeerSession -> IO [Message] | ||
557 | getPending PeerSession {..} = atomically (readAvail pendingMessages) | ||
558 | |||
559 | readAvail :: TChan a -> STM [a] | ||
560 | readAvail chan = do | ||
561 | m <- tryReadTChan chan | ||
562 | case m of | ||
563 | Just a -> (:) <$> pure a <*> readAvail chan | ||
564 | Nothing -> return [] | ||
475 | 565 | ||
476 | {----------------------------------------------------------------------- | 566 | {----------------------------------------------------------------------- |
477 | Timeouts | 567 | Timeouts |
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index ea45b75d..bfe4182d 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs | |||
@@ -90,9 +90,9 @@ genericReq ses pr = TRequest { | |||
90 | , reqPeerID = tconnPeerID ses | 90 | , reqPeerID = tconnPeerID ses |
91 | , reqPort = tconnPort ses | 91 | , reqPort = tconnPort ses |
92 | 92 | ||
93 | , reqUploaded = prUploaded pr | 93 | , reqUploaded = _uploaded pr |
94 | , reqDownloaded = prDownloaded pr | 94 | , reqDownloaded = _downloaded pr |
95 | , reqLeft = prLeft pr | 95 | , reqLeft = _left pr |
96 | 96 | ||
97 | , reqIP = Nothing | 97 | , reqIP = Nothing |
98 | , reqNumWant = Nothing | 98 | , reqNumWant = Nothing |