summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-07-16 20:25:43 +0400
committerSam T <pxqr.sta@gmail.com>2013-07-16 20:25:43 +0400
commit412919e88e1d60303f7a14134e37f27becf5f959 (patch)
tree89711599f2ca1101c1d905e65516b2778c50fd07
parent8c6e5818ee6b901efd975392c54aff5cf2721ae4 (diff)
~ Move client bitfield to storage.
We localize bitfield mutation in storage module this way. Also fix some warnings.
-rw-r--r--src/Network/BitTorrent/Exchange.hs16
-rw-r--r--src/Network/BitTorrent/Sessions.hs19
-rw-r--r--src/Network/BitTorrent/Sessions/Types.lhs18
-rw-r--r--src/System/IO/MMap/Fixed.hs4
-rw-r--r--src/System/Torrent/Storage.hs32
5 files changed, 49 insertions, 40 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index dc1b2752..71be3f88 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -71,6 +71,7 @@ module Network.BitTorrent.Exchange
71 ) where 71 ) where
72 72
73import Control.Applicative 73import Control.Applicative
74import Control.Concurrent.STM
74import Control.Exception 75import Control.Exception
75import Control.Lens 76import Control.Lens
76import Control.Monad.Reader 77import Control.Monad.Reader
@@ -80,7 +81,7 @@ import Control.Monad.Trans.Resource
80import Data.IORef 81import Data.IORef
81import Data.Conduit as C 82import Data.Conduit as C
82import Data.Conduit.Cereal as S 83import Data.Conduit.Cereal as S
83import Data.Conduit.Serialization.Binary as B 84--import Data.Conduit.Serialization.Binary as B
84import Data.Conduit.Network 85import Data.Conduit.Network
85import Data.Serialize as S 86import Data.Serialize as S
86import Text.PrettyPrint as PP hiding (($$)) 87import Text.PrettyPrint as PP hiding (($$))
@@ -100,11 +101,11 @@ import System.Torrent.Storage
100type PeerWire = ConduitM Message Message IO 101type PeerWire = ConduitM Message Message IO
101 102
102runPeerWire :: Socket -> PeerWire () -> IO () 103runPeerWire :: Socket -> PeerWire () -> IO ()
103runPeerWire sock p2p = 104runPeerWire sock action =
104 sourceSocket sock $= 105 sourceSocket sock $=
105 S.conduitGet S.get $= 106 S.conduitGet S.get $=
106-- B.conduitDecode $= 107-- B.conduitDecode $=
107 p2p $= 108 action $=
108 S.conduitPut S.put $$ 109 S.conduitPut S.put $$
109-- B.conduitEncode $$ 110-- B.conduitEncode $$
110 sinkSocket sock 111 sinkSocket sock
@@ -153,9 +154,9 @@ instance MonadState SessionState P2P where
153 {-# INLINE put #-} 154 {-# INLINE put #-}
154 155
155runP2P :: (Socket, PeerSession) -> P2P () -> IO () 156runP2P :: (Socket, PeerSession) -> P2P () -> IO ()
156runP2P (sock, ses) p2p = 157runP2P (sock, ses) action =
157 handle isIOException $ 158 handle isIOException $
158 runPeerWire sock (runReaderT (unP2P p2p) ses) 159 runPeerWire sock (runReaderT (unP2P action) ses)
159 where 160 where
160 isIOException :: IOException -> IO () 161 isIOException :: IOException -> IO ()
161 isIOException _ = return () 162 isIOException _ = return ()
@@ -428,7 +429,10 @@ yieldEvent e = {-# SCC yieldEvent #-} do
428 go e 429 go e
429 flushPending 430 flushPending
430 where 431 where
431 go (Available ixs) = asks swarmSession >>= liftIO . available ixs 432 go (Available ixs) = do
433 ses <- asks swarmSession
434 liftIO $ atomically $ available ixs ses
435
432 go (Want bix) = do 436 go (Want bix) = do
433 offer <- peerOffer 437 offer <- peerOffer
434 if ixPiece bix `BF.member` offer 438 if ixPiece bix `BF.member` offer
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs
index 43a34df9..2118ccf0 100644
--- a/src/Network/BitTorrent/Sessions.hs
+++ b/src/Network/BitTorrent/Sessions.hs
@@ -202,9 +202,9 @@ discover swarm @ SwarmSession {..} = {-# SCC discover #-} do
202 forever $ do 202 forever $ do
203 addr <- getPeerAddr tses 203 addr <- getPeerAddr tses
204 forkThrottle swarm $ do 204 forkThrottle swarm $ do
205 initiatePeerSession swarm addr $ \conn -> do 205 initiatePeerSession swarm addr $ \pconn -> do
206 print addr 206 print addr
207 runP2P conn p2p 207 runP2P pconn p2p
208 208
209registerSwarmSession :: SwarmSession -> STM () 209registerSwarmSession :: SwarmSession -> STM ()
210registerSwarmSession ss @ SwarmSession {..} = 210registerSwarmSession ss @ SwarmSession {..} =
@@ -223,8 +223,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
223 223
224 ss <- SwarmSession t cs 224 ss <- SwarmSession t cs
225 <$> MSem.new defLeecherConns 225 <$> MSem.new defLeecherConns
226 <*> newTVarIO bf 226 <*> openStorage t dataDirPath bf
227 <*> openStorage t dataDirPath
228 <*> newTVarIO S.empty 227 <*> newTVarIO S.empty
229 <*> newBroadcastTChanIO 228 <*> newBroadcastTChanIO
230 229
@@ -232,7 +231,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
232 modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t 231 modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t
233 registerSwarmSession ss 232 registerSwarmSession ss
234 233
235 forkIO $ discover ss 234 _ <- forkIO $ discover ss
236 235
237 return ss 236 return ss
238 237
@@ -246,8 +245,8 @@ closeSwarmSession se @ SwarmSession {..} = do
246 245
247getSwarm :: ClientSession -> InfoHash -> IO SwarmSession 246getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
248getSwarm cs @ ClientSession {..} ih = do 247getSwarm cs @ ClientSession {..} ih = do
249 status <- torrentPresence cs ih 248 tstatus <- torrentPresence cs ih
250 case status of 249 case tstatus of
251 Unknown -> throw $ UnknownTorrent ih 250 Unknown -> throw $ UnknownTorrent ih
252 Active sw -> return sw 251 Active sw -> return sw
253 Registered loc -> openSwarmSession cs loc 252 Registered loc -> openSwarmSession cs loc
@@ -342,9 +341,11 @@ openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
342openSession ss @ SwarmSession {..} addr Handshake {..} = do 341openSession ss @ SwarmSession {..} addr Handshake {..} = do
343 let clientCaps = encodeExts $ allowedExtensions $ clientSession 342 let clientCaps = encodeExts $ allowedExtensions $ clientSession
344 let enabled = decodeExts (enabledCaps clientCaps hsReserved) 343 let enabled = decodeExts (enabledCaps clientCaps hsReserved)
344
345 bf <- getClientBitfield ss
345 ps <- PeerSession addr ss enabled 346 ps <- PeerSession addr ss enabled
346 <$> atomically (dupTChan broadcastMessages) 347 <$> atomically (dupTChan broadcastMessages)
347 <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) 348 <*> newIORef (initialSessionState (totalCount bf))
348 -- TODO we could implement more interesting throtling scheme 349 -- TODO we could implement more interesting throtling scheme
349 -- using connected peer information 350 -- using connected peer information
350 registerPeerSession ps 351 registerPeerSession ps
@@ -408,7 +409,7 @@ listener cs action serverPort = bracket openListener close loop
408 putStrLn "accepted" 409 putStrLn "accepted"
409 case addr of 410 case addr of
410 SockAddrInet port host -> do 411 SockAddrInet port host -> do
411 forkIO $ do 412 _ <- forkIO $ do
412 acceptPeerSession cs (PeerAddr Nothing host port) conn action 413 acceptPeerSession cs (PeerAddr Nothing host port) conn action
413 return () 414 return ()
414 _ -> return () 415 _ -> return ()
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs
index 69411d4e..af2a6755 100644
--- a/src/Network/BitTorrent/Sessions/Types.lhs
+++ b/src/Network/BitTorrent/Sessions/Types.lhs
@@ -299,7 +299,7 @@ So if client is a leecher then max sessions count depends on the
299number of unchoke slots. 299number of unchoke slots.
300 300
301> -- | Used to bound the number of simultaneous connections and, which 301> -- | Used to bound the number of simultaneous connections and, which
302> -- is the same, P2P sessions within the swarm session. 302> -- is the same, P2P sessions within the swarm session.
303> type SessionCount = Int 303> type SessionCount = Int
304 304
305However if client is a seeder then the value depends on . 305However if client is a seeder then the value depends on .
@@ -322,8 +322,6 @@ Throttling* section.
322Client bitfield is used to keep track "the client have" piece set. 322Client bitfield is used to keep track "the client have" piece set.
323Modify this carefully always updating global progress. 323Modify this carefully always updating global progress.
324 324
325> , clientBitfield :: !(TVar Bitfield)
326
327> , storage :: !Storage 325> , storage :: !Storage
328 326
329We keep set of the all connected peers for the each particular torrent 327We keep set of the all connected peers for the each particular torrent
@@ -371,7 +369,7 @@ INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers
371> compare = comparing (tInfoHash . torrentMeta) 369> compare = comparing (tInfoHash . torrentMeta)
372 370
373> getClientBitfield :: SwarmSession -> IO Bitfield 371> getClientBitfield :: SwarmSession -> IO Bitfield
374> getClientBitfield = readTVarIO . clientBitfield 372> getClientBitfield SwarmSession {..} = atomically $ getCompleteBitfield storage
375 373
376Peer sessions 374Peer sessions
377------------------------------------------------------------------------ 375------------------------------------------------------------------------
@@ -458,16 +456,14 @@ messages & events we should send.
4582. Update downloaded stats --/ 4562. Update downloaded stats --/
4593. Signal to the all other peer about this. 4573. Signal to the all other peer about this.
460 458
461> available :: Bitfield -> SwarmSession -> IO () 459> available :: Bitfield -> SwarmSession -> STM ()
462> available bf se @ SwarmSession {..} = {-# SCC available #-} do 460> available bf SwarmSession {..} = {-# SCC available #-} do
463> mark >> atomically broadcast 461> updateProgress >> broadcast
464> where 462> where
465> mark = do 463> updateProgress = do
466> let piLen = ciPieceLength $ tInfo $ torrentMeta 464> let piLen = ciPieceLength $ tInfo $ torrentMeta
467> let bytes = piLen * BF.haveCount bf 465> let bytes = piLen * BF.haveCount bf
468> atomically $ do 466> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
469> modifyTVar' clientBitfield (BF.union bf)
470> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
471> 467>
472> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) 468> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
473 469
diff --git a/src/System/IO/MMap/Fixed.hs b/src/System/IO/MMap/Fixed.hs
index df6a6603..1e83c350 100644
--- a/src/System/IO/MMap/Fixed.hs
+++ b/src/System/IO/MMap/Fixed.hs
@@ -151,8 +151,8 @@ mallocTo fi s = do
151lookupRegion :: FixedOffset -> Fixed -> Maybe B.ByteString 151lookupRegion :: FixedOffset -> Fixed -> Maybe B.ByteString
152lookupRegion offset Fixed {..} = 152lookupRegion offset Fixed {..} =
153 case intersecting imap $ IntervalCO offset (succ offset) of 153 case intersecting imap $ IntervalCO offset (succ offset) of
154 [(i, (fptr, off))] -> let s = max 0 $ upperBound i - lowerBound i 154 [(i, (fptr, off))] -> let s = upperBound i - lowerBound i
155 in Just $ fromForeignPtr fptr off s 155 in Just $ fromForeignPtr fptr off (max 0 s)
156 _ -> Nothing 156 _ -> Nothing
157 157
158-- | Note: this is unsafe operation. 158-- | Note: this is unsafe operation.
diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs
index dd7258a0..c355d697 100644
--- a/src/System/Torrent/Storage.hs
+++ b/src/System/Torrent/Storage.hs
@@ -24,6 +24,7 @@ module System.Torrent.Storage
24 24
25 -- * Construction 25 -- * Construction
26 , openStorage, closeStorage, withStorage 26 , openStorage, closeStorage, withStorage
27 , getCompleteBitfield
27 28
28 -- * Modification 29 -- * Modification
29 , getBlk, putBlk, selBlk 30 , getBlk, putBlk, selBlk
@@ -51,19 +52,20 @@ import Data.Bitfield as BF
51import Data.Torrent 52import Data.Torrent
52import Network.BitTorrent.Exchange.Protocol 53import Network.BitTorrent.Exchange.Protocol
53import System.IO.MMap.Fixed as Fixed 54import System.IO.MMap.Fixed as Fixed
54import Debug.Trace
55
56 55
56-- TODO merge piece validation and Sessions.available into one transaction.
57data Storage = Storage { 57data Storage = Storage {
58 -- | 58 -- |
59 metainfo:: !Torrent 59 metainfo :: !Torrent
60 60
61 -- | 61 -- | Bitmask of complete and verified _pieces_.
62 , blocks :: !(TVar Bitfield) 62 , complete :: !(TVar Bitfield)
63
64 -- | Bitmask of complete _blocks_.
65 , blocks :: !(TVar Bitfield)
63 -- TODO use bytestring for fast serialization 66 -- TODO use bytestring for fast serialization
64 -- because we need to write this bitmap to disc periodically 67 -- because we need to write this bitmap to disc periodically
65 68
66
67 , blockSize :: !Int 69 , blockSize :: !Int
68 70
69 -- | Used to map linear block addresses to disjoint 71 -- | Used to map linear block addresses to disjoint
@@ -76,19 +78,23 @@ ppStorage Storage {..} = pp <$> readTVarIO blocks
76 where 78 where
77 pp bf = int blockSize 79 pp bf = int blockSize
78 80
81getCompleteBitfield :: Storage -> STM Bitfield
82getCompleteBitfield Storage {..} = readTVar complete
83
79{----------------------------------------------------------------------- 84{-----------------------------------------------------------------------
80 Construction 85 Construction
81-----------------------------------------------------------------------} 86-----------------------------------------------------------------------}
82 87
83-- TODO doc args 88-- TODO doc args
84openStorage :: Torrent -> FilePath -> IO Storage 89openStorage :: Torrent -> FilePath -> Bitfield -> IO Storage
85openStorage t @ Torrent {..} contentPath = do 90openStorage t @ Torrent {..} contentPath bf = do
86 let content_paths = contentLayout contentPath tInfo 91 let content_paths = contentLayout contentPath tInfo
87 mapM_ (mkDir . fst) content_paths 92 mapM_ (mkDir . fst) content_paths
88 93
89 let blockSize = defaultBlockSize `min` ciPieceLength tInfo 94 let blockSize = defaultBlockSize `min` ciPieceLength tInfo
90 print $ "content length " ++ show (contentLength tInfo) 95 print $ "content length " ++ show (contentLength tInfo)
91 Storage t <$> newTVarIO (haveNone (blockCount blockSize tInfo)) 96 Storage t <$> newTVarIO bf
97 <*> newTVarIO (haveNone (blockCount blockSize tInfo))
92 <*> pure blockSize 98 <*> pure blockSize
93 <*> coalesceFiles content_paths 99 <*> coalesceFiles content_paths
94 where 100 where
@@ -103,8 +109,8 @@ closeStorage :: Storage -> IO ()
103closeStorage st = return () 109closeStorage st = return ()
104 110
105 111
106withStorage :: Torrent -> FilePath -> (Storage -> IO a) -> IO a 112withStorage :: Torrent -> FilePath -> Bitfield -> (Storage -> IO a) -> IO a
107withStorage se path = bracket (openStorage se path) closeStorage 113withStorage se path bf = bracket (openStorage se path bf) closeStorage
108 114
109{----------------------------------------------------------------------- 115{-----------------------------------------------------------------------
110 Modification 116 Modification
@@ -191,7 +197,9 @@ validatePiece pix st @ Storage {..} = {-# SCC validatePiece #-} do
191 else do 197 else do
192 piece <- getPiece pix st 198 piece <- getPiece pix st
193 if checkPiece (tInfo metainfo) pix piece 199 if checkPiece (tInfo metainfo) pix piece
194 then return True 200 then do
201 atomically $ modifyTVar' complete (BF.have pix)
202 return True
195 else do 203 else do
196 print $ "----------------------------- invalid " ++ show pix 204 print $ "----------------------------- invalid " ++ show pix
197-- resetPiece pix st 205-- resetPiece pix st