summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-07-16 20:25:43 +0400
committerSam T <pxqr.sta@gmail.com>2013-07-16 20:25:43 +0400
commit412919e88e1d60303f7a14134e37f27becf5f959 (patch)
tree89711599f2ca1101c1d905e65516b2778c50fd07 /src/Network
parent8c6e5818ee6b901efd975392c54aff5cf2721ae4 (diff)
~ Move client bitfield to storage.
We localize bitfield mutation in storage module this way. Also fix some warnings.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/Exchange.hs16
-rw-r--r--src/Network/BitTorrent/Sessions.hs19
-rw-r--r--src/Network/BitTorrent/Sessions/Types.lhs18
3 files changed, 27 insertions, 26 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index dc1b2752..71be3f88 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -71,6 +71,7 @@ module Network.BitTorrent.Exchange
71 ) where 71 ) where
72 72
73import Control.Applicative 73import Control.Applicative
74import Control.Concurrent.STM
74import Control.Exception 75import Control.Exception
75import Control.Lens 76import Control.Lens
76import Control.Monad.Reader 77import Control.Monad.Reader
@@ -80,7 +81,7 @@ import Control.Monad.Trans.Resource
80import Data.IORef 81import Data.IORef
81import Data.Conduit as C 82import Data.Conduit as C
82import Data.Conduit.Cereal as S 83import Data.Conduit.Cereal as S
83import Data.Conduit.Serialization.Binary as B 84--import Data.Conduit.Serialization.Binary as B
84import Data.Conduit.Network 85import Data.Conduit.Network
85import Data.Serialize as S 86import Data.Serialize as S
86import Text.PrettyPrint as PP hiding (($$)) 87import Text.PrettyPrint as PP hiding (($$))
@@ -100,11 +101,11 @@ import System.Torrent.Storage
100type PeerWire = ConduitM Message Message IO 101type PeerWire = ConduitM Message Message IO
101 102
102runPeerWire :: Socket -> PeerWire () -> IO () 103runPeerWire :: Socket -> PeerWire () -> IO ()
103runPeerWire sock p2p = 104runPeerWire sock action =
104 sourceSocket sock $= 105 sourceSocket sock $=
105 S.conduitGet S.get $= 106 S.conduitGet S.get $=
106-- B.conduitDecode $= 107-- B.conduitDecode $=
107 p2p $= 108 action $=
108 S.conduitPut S.put $$ 109 S.conduitPut S.put $$
109-- B.conduitEncode $$ 110-- B.conduitEncode $$
110 sinkSocket sock 111 sinkSocket sock
@@ -153,9 +154,9 @@ instance MonadState SessionState P2P where
153 {-# INLINE put #-} 154 {-# INLINE put #-}
154 155
155runP2P :: (Socket, PeerSession) -> P2P () -> IO () 156runP2P :: (Socket, PeerSession) -> P2P () -> IO ()
156runP2P (sock, ses) p2p = 157runP2P (sock, ses) action =
157 handle isIOException $ 158 handle isIOException $
158 runPeerWire sock (runReaderT (unP2P p2p) ses) 159 runPeerWire sock (runReaderT (unP2P action) ses)
159 where 160 where
160 isIOException :: IOException -> IO () 161 isIOException :: IOException -> IO ()
161 isIOException _ = return () 162 isIOException _ = return ()
@@ -428,7 +429,10 @@ yieldEvent e = {-# SCC yieldEvent #-} do
428 go e 429 go e
429 flushPending 430 flushPending
430 where 431 where
431 go (Available ixs) = asks swarmSession >>= liftIO . available ixs 432 go (Available ixs) = do
433 ses <- asks swarmSession
434 liftIO $ atomically $ available ixs ses
435
432 go (Want bix) = do 436 go (Want bix) = do
433 offer <- peerOffer 437 offer <- peerOffer
434 if ixPiece bix `BF.member` offer 438 if ixPiece bix `BF.member` offer
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs
index 43a34df9..2118ccf0 100644
--- a/src/Network/BitTorrent/Sessions.hs
+++ b/src/Network/BitTorrent/Sessions.hs
@@ -202,9 +202,9 @@ discover swarm @ SwarmSession {..} = {-# SCC discover #-} do
202 forever $ do 202 forever $ do
203 addr <- getPeerAddr tses 203 addr <- getPeerAddr tses
204 forkThrottle swarm $ do 204 forkThrottle swarm $ do
205 initiatePeerSession swarm addr $ \conn -> do 205 initiatePeerSession swarm addr $ \pconn -> do
206 print addr 206 print addr
207 runP2P conn p2p 207 runP2P pconn p2p
208 208
209registerSwarmSession :: SwarmSession -> STM () 209registerSwarmSession :: SwarmSession -> STM ()
210registerSwarmSession ss @ SwarmSession {..} = 210registerSwarmSession ss @ SwarmSession {..} =
@@ -223,8 +223,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
223 223
224 ss <- SwarmSession t cs 224 ss <- SwarmSession t cs
225 <$> MSem.new defLeecherConns 225 <$> MSem.new defLeecherConns
226 <*> newTVarIO bf 226 <*> openStorage t dataDirPath bf
227 <*> openStorage t dataDirPath
228 <*> newTVarIO S.empty 227 <*> newTVarIO S.empty
229 <*> newBroadcastTChanIO 228 <*> newBroadcastTChanIO
230 229
@@ -232,7 +231,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
232 modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t 231 modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t
233 registerSwarmSession ss 232 registerSwarmSession ss
234 233
235 forkIO $ discover ss 234 _ <- forkIO $ discover ss
236 235
237 return ss 236 return ss
238 237
@@ -246,8 +245,8 @@ closeSwarmSession se @ SwarmSession {..} = do
246 245
247getSwarm :: ClientSession -> InfoHash -> IO SwarmSession 246getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
248getSwarm cs @ ClientSession {..} ih = do 247getSwarm cs @ ClientSession {..} ih = do
249 status <- torrentPresence cs ih 248 tstatus <- torrentPresence cs ih
250 case status of 249 case tstatus of
251 Unknown -> throw $ UnknownTorrent ih 250 Unknown -> throw $ UnknownTorrent ih
252 Active sw -> return sw 251 Active sw -> return sw
253 Registered loc -> openSwarmSession cs loc 252 Registered loc -> openSwarmSession cs loc
@@ -342,9 +341,11 @@ openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
342openSession ss @ SwarmSession {..} addr Handshake {..} = do 341openSession ss @ SwarmSession {..} addr Handshake {..} = do
343 let clientCaps = encodeExts $ allowedExtensions $ clientSession 342 let clientCaps = encodeExts $ allowedExtensions $ clientSession
344 let enabled = decodeExts (enabledCaps clientCaps hsReserved) 343 let enabled = decodeExts (enabledCaps clientCaps hsReserved)
344
345 bf <- getClientBitfield ss
345 ps <- PeerSession addr ss enabled 346 ps <- PeerSession addr ss enabled
346 <$> atomically (dupTChan broadcastMessages) 347 <$> atomically (dupTChan broadcastMessages)
347 <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) 348 <*> newIORef (initialSessionState (totalCount bf))
348 -- TODO we could implement more interesting throtling scheme 349 -- TODO we could implement more interesting throtling scheme
349 -- using connected peer information 350 -- using connected peer information
350 registerPeerSession ps 351 registerPeerSession ps
@@ -408,7 +409,7 @@ listener cs action serverPort = bracket openListener close loop
408 putStrLn "accepted" 409 putStrLn "accepted"
409 case addr of 410 case addr of
410 SockAddrInet port host -> do 411 SockAddrInet port host -> do
411 forkIO $ do 412 _ <- forkIO $ do
412 acceptPeerSession cs (PeerAddr Nothing host port) conn action 413 acceptPeerSession cs (PeerAddr Nothing host port) conn action
413 return () 414 return ()
414 _ -> return () 415 _ -> return ()
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs
index 69411d4e..af2a6755 100644
--- a/src/Network/BitTorrent/Sessions/Types.lhs
+++ b/src/Network/BitTorrent/Sessions/Types.lhs
@@ -299,7 +299,7 @@ So if client is a leecher then max sessions count depends on the
299number of unchoke slots. 299number of unchoke slots.
300 300
301> -- | Used to bound the number of simultaneous connections and, which 301> -- | Used to bound the number of simultaneous connections and, which
302> -- is the same, P2P sessions within the swarm session. 302> -- is the same, P2P sessions within the swarm session.
303> type SessionCount = Int 303> type SessionCount = Int
304 304
305However if client is a seeder then the value depends on . 305However if client is a seeder then the value depends on .
@@ -322,8 +322,6 @@ Throttling* section.
322Client bitfield is used to keep track "the client have" piece set. 322Client bitfield is used to keep track "the client have" piece set.
323Modify this carefully always updating global progress. 323Modify this carefully always updating global progress.
324 324
325> , clientBitfield :: !(TVar Bitfield)
326
327> , storage :: !Storage 325> , storage :: !Storage
328 326
329We keep set of the all connected peers for the each particular torrent 327We keep set of the all connected peers for the each particular torrent
@@ -371,7 +369,7 @@ INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers
371> compare = comparing (tInfoHash . torrentMeta) 369> compare = comparing (tInfoHash . torrentMeta)
372 370
373> getClientBitfield :: SwarmSession -> IO Bitfield 371> getClientBitfield :: SwarmSession -> IO Bitfield
374> getClientBitfield = readTVarIO . clientBitfield 372> getClientBitfield SwarmSession {..} = atomically $ getCompleteBitfield storage
375 373
376Peer sessions 374Peer sessions
377------------------------------------------------------------------------ 375------------------------------------------------------------------------
@@ -458,16 +456,14 @@ messages & events we should send.
4582. Update downloaded stats --/ 4562. Update downloaded stats --/
4593. Signal to the all other peer about this. 4573. Signal to the all other peer about this.
460 458
461> available :: Bitfield -> SwarmSession -> IO () 459> available :: Bitfield -> SwarmSession -> STM ()
462> available bf se @ SwarmSession {..} = {-# SCC available #-} do 460> available bf SwarmSession {..} = {-# SCC available #-} do
463> mark >> atomically broadcast 461> updateProgress >> broadcast
464> where 462> where
465> mark = do 463> updateProgress = do
466> let piLen = ciPieceLength $ tInfo $ torrentMeta 464> let piLen = ciPieceLength $ tInfo $ torrentMeta
467> let bytes = piLen * BF.haveCount bf 465> let bytes = piLen * BF.haveCount bf
468> atomically $ do 466> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
469> modifyTVar' clientBitfield (BF.union bf)
470> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
471> 467>
472> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) 468> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
473 469