summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-06-29 23:22:25 +0400
committerSam T <pxqr.sta@gmail.com>2013-06-29 23:22:25 +0400
commitf556bf196bf07308f024cc43c1a51dfd4c21188c (patch)
tree228de5a632e8b758d507df7ddabf7fd85d113694 /src/Network/BitTorrent
parentd4ada1b8a392d67f2835935084c5f0f3ecef2ab5 (diff)
+ Scetch basic broadcasting.
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r--src/Network/BitTorrent/Exchange.hs18
-rw-r--r--src/Network/BitTorrent/Internal.hs124
-rw-r--r--src/Network/BitTorrent/Tracker.hs6
3 files changed, 119 insertions, 29 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 3d05f7fc..505360a4 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -109,15 +109,18 @@ runPeerWire sock p2p =
109 sinkSocket sock 109 sinkSocket sock
110 110
111awaitMessage :: P2P Message 111awaitMessage :: P2P Message
112awaitMessage = P2P (ReaderT (const go)) 112awaitMessage = P2P $ ReaderT $ const go
113 where 113 where
114 go = await >>= maybe (monadThrow PeerDisconnected) return 114 go = await >>= maybe (monadThrow PeerDisconnected) return
115{-# INLINE awaitMessage #-} 115{-# INLINE awaitMessage #-}
116 116
117yieldMessage :: Message -> P2P () 117yieldMessage :: Message -> P2P ()
118yieldMessage msg = P2P $ ReaderT $ \se -> C.yield msg 118yieldMessage msg = P2P $ ReaderT $ const (C.yield msg)
119{-# INLINE yieldMessage #-} 119{-# INLINE yieldMessage #-}
120 120
121flushPending :: P2P ()
122flushPending = ask >>= liftIO . getPending >>= mapM_ yieldMessage
123
121{----------------------------------------------------------------------- 124{-----------------------------------------------------------------------
122 P2P monad 125 P2P monad
123-----------------------------------------------------------------------} 126-----------------------------------------------------------------------}
@@ -313,7 +316,9 @@ data Event
313-- forall (Fragment block). isPiece block == True 316-- forall (Fragment block). isPiece block == True
314-- 317--
315awaitEvent :: P2P Event 318awaitEvent :: P2P Event
316awaitEvent = awaitMessage >>= go 319awaitEvent = do
320
321 awaitMessage >>= go
317 where 322 where
318 go KeepAlive = awaitEvent 323 go KeepAlive = awaitEvent
319 go Choke = do 324 go Choke = do
@@ -432,7 +437,7 @@ awaitEvent = awaitMessage >>= go
432-- most likely will be ignored without any network IO. 437-- most likely will be ignored without any network IO.
433-- 438--
434yieldEvent :: Event -> P2P () 439yieldEvent :: Event -> P2P ()
435yieldEvent (Available _ ) = undefined 440yieldEvent (Available ixs) = asks swarmSession >>= liftIO . available ixs
436yieldEvent (Want bix) = do 441yieldEvent (Want bix) = do
437 offer <- peerOffer 442 offer <- peerOffer
438 if ixPiece bix `BF.member` offer 443 if ixPiece bix `BF.member` offer
@@ -449,10 +454,5 @@ yieldEvent (Fragment blk) = do
449handleEvent :: (Event -> P2P Event) -> P2P () 454handleEvent :: (Event -> P2P Event) -> P2P ()
450handleEvent action = awaitEvent >>= action >>= yieldEvent 455handleEvent action = awaitEvent >>= action >>= yieldEvent
451 456
452--flushBroadcast :: P2P ()
453--flushBroadcast = nextBroadcast >>= maybe (return ()) go
454-- where
455-- go = undefined
456
457checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool 457checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool
458checkPiece = undefined 458checkPiece = undefined
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
index f163cadb..38388b9a 100644
--- a/src/Network/BitTorrent/Internal.hs
+++ b/src/Network/BitTorrent/Internal.hs
@@ -15,6 +15,7 @@
15-- 15--
16{-# LANGUAGE OverloadedStrings #-} 16{-# LANGUAGE OverloadedStrings #-}
17{-# LANGUAGE RecordWildCards #-} 17{-# LANGUAGE RecordWildCards #-}
18{-# LANGUAGE ViewPatterns #-}
18{-# LANGUAGE TemplateHaskell #-} 19{-# LANGUAGE TemplateHaskell #-}
19{-# LANGUAGE DeriveDataTypeable #-} 20{-# LANGUAGE DeriveDataTypeable #-}
20module Network.BitTorrent.Internal 21module Network.BitTorrent.Internal
@@ -47,6 +48,8 @@ module Network.BitTorrent.Internal
47 , leaveSwarm 48 , leaveSwarm
48 , waitVacancy 49 , waitVacancy
49 50
51 , available
52
50 -- * Peer 53 -- * Peer
51 , PeerSession( PeerSession, connectedPeerAddr 54 , PeerSession( PeerSession, connectedPeerAddr
52 , swarmSession, enabledExtensions 55 , swarmSession, enabledExtensions
@@ -54,6 +57,7 @@ module Network.BitTorrent.Internal
54 ) 57 )
55 , SessionState 58 , SessionState
56 , withPeerSession 59 , withPeerSession
60 , getPending
57 61
58 -- ** Exceptions 62 -- ** Exceptions
59 , SessionException(..) 63 , SessionException(..)
@@ -68,6 +72,8 @@ module Network.BitTorrent.Internal
68 , updateIncoming, updateOutcoming 72 , updateIncoming, updateOutcoming
69 ) where 73 ) where
70 74
75import Prelude hiding (mapM_)
76
71import Control.Applicative 77import Control.Applicative
72import Control.Concurrent 78import Control.Concurrent
73import Control.Concurrent.STM 79import Control.Concurrent.STM
@@ -79,6 +85,7 @@ import Control.Monad.Trans
79import Data.IORef 85import Data.IORef
80import Data.Default 86import Data.Default
81import Data.Function 87import Data.Function
88import Data.Foldable (mapM_)
82import Data.Ord 89import Data.Ord
83import Data.Set as S 90import Data.Set as S
84import Data.Typeable 91import Data.Typeable
@@ -104,7 +111,7 @@ import Network.BitTorrent.Tracker.Protocol as BT
104-----------------------------------------------------------------------} 111-----------------------------------------------------------------------}
105 112
106-- | 'Progress' contains upload/download/left stats about 113-- | 'Progress' contains upload/download/left stats about
107-- current client state and used to notify the tracker 114-- current client state and used to notify the tracker.
108-- 115--
109-- This data is considered as dynamic within one client 116-- This data is considered as dynamic within one client
110-- session. This data also should be shared across client 117-- session. This data also should be shared across client
@@ -112,12 +119,14 @@ import Network.BitTorrent.Tracker.Protocol as BT
112-- to get initial 'Progress'. 119-- to get initial 'Progress'.
113-- 120--
114data Progress = Progress { 121data Progress = Progress {
115 prUploaded :: !Integer -- ^ Total amount of bytes uploaded. 122 _uploaded :: !Integer -- ^ Total amount of bytes uploaded.
116 , prDownloaded :: !Integer -- ^ Total amount of bytes downloaded. 123 , _downloaded :: !Integer -- ^ Total amount of bytes downloaded.
117 , prLeft :: !Integer -- ^ Total amount of bytes left. 124 , _left :: !Integer -- ^ Total amount of bytes left.
118 } deriving (Show, Read, Eq) 125 } deriving (Show, Read, Eq)
119 126
120-- TODO make lenses 127-- TODO use atomic bits and Word64
128
129$(makeLenses ''Progress)
121 130
122-- | Initial progress is used when there are no session before. 131-- | Initial progress is used when there are no session before.
123-- 132--
@@ -128,6 +137,29 @@ data Progress = Progress {
128startProgress :: Integer -> Progress 137startProgress :: Integer -> Progress
129startProgress = Progress 0 0 138startProgress = Progress 0 0
130 139
140-- | Used when the client download some data from /any/ peer.
141downloadedProgress :: Int -> Progress -> Progress
142downloadedProgress (fromIntegral -> amount)
143 = (left -~ amount)
144 . (downloaded +~ amount)
145{-# INLINE downloadedProgress #-}
146
147-- | Used when the client upload some data to /any/ peer.
148uploadedProgress :: Int -> Progress -> Progress
149uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
150{-# INLINE uploadedProgress #-}
151
152-- | Used when leecher join client session.
153enqueuedProgress :: Int -> Progress -> Progress
154enqueuedProgress (fromIntegral -> amount) = left +~ amount
155{-# INLINE enqueuedProgress #-}
156
157-- | Used when leecher leave client session.
158-- (e.g. user deletes not completed torrent)
159dequeuedProgress :: Int -> Progress -> Progress
160dequeuedProgress (fromIntegral -> amount) = left -~ amount
161{-# INLINE dequeuedProgress #-}
162
131{----------------------------------------------------------------------- 163{-----------------------------------------------------------------------
132 Client session 164 Client session
133-----------------------------------------------------------------------} 165-----------------------------------------------------------------------}
@@ -193,6 +225,9 @@ data ClientSession = ClientSession {
193 , currentProgress :: !(TVar Progress) 225 , currentProgress :: !(TVar Progress)
194 } 226 }
195 227
228-- currentProgress field is reduntant: progress depends on the all swarm bitfields
229-- maybe we can remove the 'currentProgress' and compute it on demand?
230
196instance Eq ClientSession where 231instance Eq ClientSession where
197 (==) = (==) `on` clientPeerID 232 (==) = (==) `on` clientPeerID
198 233
@@ -274,7 +309,15 @@ data SwarmSession = SwarmSession {
274 309
275 -- | Modify this carefully updating global progress. 310 -- | Modify this carefully updating global progress.
276 , clientBitfield :: !(TVar Bitfield) 311 , clientBitfield :: !(TVar Bitfield)
312
277 , connectedPeers :: !(TVar (Set PeerSession)) 313 , connectedPeers :: !(TVar (Set PeerSession))
314
315 -- TODO use bounded broadcast chan with priority queue and drop old entries
316 -- | Channel used for replicate messages across all peers in
317 -- swarm. For exsample if we get some piece we should sent to all
318 -- connected (and interested in) peers HAVE message.
319 --
320 , broadcastMessages :: !(TChan Message)
278 } 321 }
279 322
280-- INVARIANT: 323-- INVARIANT:
@@ -294,6 +337,7 @@ newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
294 <*> MSem.new n 337 <*> MSem.new n
295 <*> newTVarIO bf 338 <*> newTVarIO bf
296 <*> newTVarIO S.empty 339 <*> newTVarIO S.empty
340 <*> newBroadcastTChanIO
297 341
298-- | New swarm session in which the client allowed to upload only. 342-- | New swarm session in which the client allowed to upload only.
299newSeeder :: ClientSession -> Torrent -> IO SwarmSession 343newSeeder :: ClientSession -> Torrent -> IO SwarmSession
@@ -342,6 +386,9 @@ waitVacancy se =
342 bracket (enterSwarm se) (const (leaveSwarm se)) 386 bracket (enterSwarm se) (const (leaveSwarm se))
343 . const 387 . const
344 388
389pieceLength :: SwarmSession -> Int
390pieceLength = ciPieceLength . tInfo . torrentMeta
391
345{----------------------------------------------------------------------- 392{-----------------------------------------------------------------------
346 Peer session 393 Peer session
347-----------------------------------------------------------------------} 394-----------------------------------------------------------------------}
@@ -380,17 +427,15 @@ data PeerSession = PeerSession {
380 , outcomingTimeout :: !TimeoutKey 427 , outcomingTimeout :: !TimeoutKey
381 428
382 -- TODO use dupChan for broadcasting 429 -- TODO use dupChan for broadcasting
383 430 -- | Broadcast messages waiting to be sent to peer.
384 -- | Channel used for replicate messages across all peers in 431 , pendingMessages :: !(TChan Message)
385 -- swarm. For exsample if we get some piece we should sent to all
386 -- connected (and interested in) peers HAVE message.
387 --
388 , broadcastMessages :: !(Chan [Message])
389 432
390 -- | Dymanic P2P data. 433 -- | Dymanic P2P data.
391 , sessionState :: !(IORef SessionState) 434 , sessionState :: !(IORef SessionState)
392 } 435 }
393 436
437-- TODO unpack some fields
438
394data SessionState = SessionState { 439data SessionState = SessionState {
395 _bitfield :: !Bitfield -- ^ Other peer Have bitfield. 440 _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
396 , _status :: !SessionStatus -- ^ Status of both peers. 441 , _status :: !SessionStatus -- ^ Status of both peers.
@@ -451,7 +496,7 @@ withPeerSession ss @ SwarmSession {..} addr
451 maxIncomingTime (return ()) 496 maxIncomingTime (return ())
452 <*> registerTimeout (eventManager clientSession) 497 <*> registerTimeout (eventManager clientSession)
453 maxOutcomingTime (sendKA sock) 498 maxOutcomingTime (sendKA sock)
454 <*> newChan 499 <*> atomically (dupTChan broadcastMessages)
455 <*> do { 500 <*> do {
456 ; tc <- totalCount <$> readTVarIO clientBitfield 501 ; tc <- totalCount <$> readTVarIO clientBitfield
457 ; newIORef (SessionState (haveNone tc) def) 502 ; newIORef (SessionState (haveNone tc) def)
@@ -468,10 +513,55 @@ withPeerSession ss @ SwarmSession {..} addr
468findPieceCount :: PeerSession -> PieceCount 513findPieceCount :: PeerSession -> PieceCount
469findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession 514findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
470 515
471-- TODO use this type for broadcast messages instead of 'Message' 516{-----------------------------------------------------------------------
472--data Signal = 517 Broadcasting: Have, Cancel, Bitfield, SuggestPiece
473--nextBroadcast :: P2P (Maybe Signal) 518-----------------------------------------------------------------------}
474--nextBroadcast = 519
520-- here we should enqueue broadcast messages and keep in mind that:
521--
522-- * We should enqueue broadcast events as they are appear.
523-- * We should yield broadcast messages as fast as we get them.
524--
525-- these 2 phases might differ in time significantly
526
527-- TODO do this; but only when it'll be clean which other broadcast
528-- messages & events we should send
529
530-- 1. Update client have bitfield --\____ in one transaction;
531-- 2. Update downloaded stats --/
532-- 3. Signal to the all other peer about this.
533
534available :: Bitfield -> SwarmSession -> IO ()
535available bf se @ SwarmSession {..} = mark >> atomically broadcast
536 where
537 mark = do
538 let bytes = pieceLength se * BF.haveCount bf
539 atomically $ do
540 modifyTVar' clientBitfield (BF.union bf)
541 modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
542
543 broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
544
545
546-- TODO compute size of messages: if it's faster to send Bitfield
547-- instead many Have do that
548--
549-- also if there is single Have message in queue then the
550-- corresponding piece is likely still in memory or disc cache,
551-- when we can send SuggestPiece
552
553-- | Get pending messages queue appeared in result of asynchronously
554-- changed client state. Resulting queue should be sent to a peer
555-- immediately.
556getPending :: PeerSession -> IO [Message]
557getPending PeerSession {..} = atomically (readAvail pendingMessages)
558
559readAvail :: TChan a -> STM [a]
560readAvail chan = do
561 m <- tryReadTChan chan
562 case m of
563 Just a -> (:) <$> pure a <*> readAvail chan
564 Nothing -> return []
475 565
476{----------------------------------------------------------------------- 566{-----------------------------------------------------------------------
477 Timeouts 567 Timeouts
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs
index ea45b75d..bfe4182d 100644
--- a/src/Network/BitTorrent/Tracker.hs
+++ b/src/Network/BitTorrent/Tracker.hs
@@ -90,9 +90,9 @@ genericReq ses pr = TRequest {
90 , reqPeerID = tconnPeerID ses 90 , reqPeerID = tconnPeerID ses
91 , reqPort = tconnPort ses 91 , reqPort = tconnPort ses
92 92
93 , reqUploaded = prUploaded pr 93 , reqUploaded = _uploaded pr
94 , reqDownloaded = prDownloaded pr 94 , reqDownloaded = _downloaded pr
95 , reqLeft = prLeft pr 95 , reqLeft = _left pr
96 96
97 , reqIP = Nothing 97 , reqIP = Nothing
98 , reqNumWant = Nothing 98 , reqNumWant = Nothing