diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 16 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 19 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions/Types.lhs | 18 | ||||
-rw-r--r-- | src/System/IO/MMap/Fixed.hs | 4 | ||||
-rw-r--r-- | src/System/Torrent/Storage.hs | 32 |
5 files changed, 49 insertions, 40 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index dc1b2752..71be3f88 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -71,6 +71,7 @@ module Network.BitTorrent.Exchange | |||
71 | ) where | 71 | ) where |
72 | 72 | ||
73 | import Control.Applicative | 73 | import Control.Applicative |
74 | import Control.Concurrent.STM | ||
74 | import Control.Exception | 75 | import Control.Exception |
75 | import Control.Lens | 76 | import Control.Lens |
76 | import Control.Monad.Reader | 77 | import Control.Monad.Reader |
@@ -80,7 +81,7 @@ import Control.Monad.Trans.Resource | |||
80 | import Data.IORef | 81 | import Data.IORef |
81 | import Data.Conduit as C | 82 | import Data.Conduit as C |
82 | import Data.Conduit.Cereal as S | 83 | import Data.Conduit.Cereal as S |
83 | import Data.Conduit.Serialization.Binary as B | 84 | --import Data.Conduit.Serialization.Binary as B |
84 | import Data.Conduit.Network | 85 | import Data.Conduit.Network |
85 | import Data.Serialize as S | 86 | import Data.Serialize as S |
86 | import Text.PrettyPrint as PP hiding (($$)) | 87 | import Text.PrettyPrint as PP hiding (($$)) |
@@ -100,11 +101,11 @@ import System.Torrent.Storage | |||
100 | type PeerWire = ConduitM Message Message IO | 101 | type PeerWire = ConduitM Message Message IO |
101 | 102 | ||
102 | runPeerWire :: Socket -> PeerWire () -> IO () | 103 | runPeerWire :: Socket -> PeerWire () -> IO () |
103 | runPeerWire sock p2p = | 104 | runPeerWire sock action = |
104 | sourceSocket sock $= | 105 | sourceSocket sock $= |
105 | S.conduitGet S.get $= | 106 | S.conduitGet S.get $= |
106 | -- B.conduitDecode $= | 107 | -- B.conduitDecode $= |
107 | p2p $= | 108 | action $= |
108 | S.conduitPut S.put $$ | 109 | S.conduitPut S.put $$ |
109 | -- B.conduitEncode $$ | 110 | -- B.conduitEncode $$ |
110 | sinkSocket sock | 111 | sinkSocket sock |
@@ -153,9 +154,9 @@ instance MonadState SessionState P2P where | |||
153 | {-# INLINE put #-} | 154 | {-# INLINE put #-} |
154 | 155 | ||
155 | runP2P :: (Socket, PeerSession) -> P2P () -> IO () | 156 | runP2P :: (Socket, PeerSession) -> P2P () -> IO () |
156 | runP2P (sock, ses) p2p = | 157 | runP2P (sock, ses) action = |
157 | handle isIOException $ | 158 | handle isIOException $ |
158 | runPeerWire sock (runReaderT (unP2P p2p) ses) | 159 | runPeerWire sock (runReaderT (unP2P action) ses) |
159 | where | 160 | where |
160 | isIOException :: IOException -> IO () | 161 | isIOException :: IOException -> IO () |
161 | isIOException _ = return () | 162 | isIOException _ = return () |
@@ -428,7 +429,10 @@ yieldEvent e = {-# SCC yieldEvent #-} do | |||
428 | go e | 429 | go e |
429 | flushPending | 430 | flushPending |
430 | where | 431 | where |
431 | go (Available ixs) = asks swarmSession >>= liftIO . available ixs | 432 | go (Available ixs) = do |
433 | ses <- asks swarmSession | ||
434 | liftIO $ atomically $ available ixs ses | ||
435 | |||
432 | go (Want bix) = do | 436 | go (Want bix) = do |
433 | offer <- peerOffer | 437 | offer <- peerOffer |
434 | if ixPiece bix `BF.member` offer | 438 | if ixPiece bix `BF.member` offer |
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs index 43a34df9..2118ccf0 100644 --- a/src/Network/BitTorrent/Sessions.hs +++ b/src/Network/BitTorrent/Sessions.hs | |||
@@ -202,9 +202,9 @@ discover swarm @ SwarmSession {..} = {-# SCC discover #-} do | |||
202 | forever $ do | 202 | forever $ do |
203 | addr <- getPeerAddr tses | 203 | addr <- getPeerAddr tses |
204 | forkThrottle swarm $ do | 204 | forkThrottle swarm $ do |
205 | initiatePeerSession swarm addr $ \conn -> do | 205 | initiatePeerSession swarm addr $ \pconn -> do |
206 | print addr | 206 | print addr |
207 | runP2P conn p2p | 207 | runP2P pconn p2p |
208 | 208 | ||
209 | registerSwarmSession :: SwarmSession -> STM () | 209 | registerSwarmSession :: SwarmSession -> STM () |
210 | registerSwarmSession ss @ SwarmSession {..} = | 210 | registerSwarmSession ss @ SwarmSession {..} = |
@@ -223,8 +223,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do | |||
223 | 223 | ||
224 | ss <- SwarmSession t cs | 224 | ss <- SwarmSession t cs |
225 | <$> MSem.new defLeecherConns | 225 | <$> MSem.new defLeecherConns |
226 | <*> newTVarIO bf | 226 | <*> openStorage t dataDirPath bf |
227 | <*> openStorage t dataDirPath | ||
228 | <*> newTVarIO S.empty | 227 | <*> newTVarIO S.empty |
229 | <*> newBroadcastTChanIO | 228 | <*> newBroadcastTChanIO |
230 | 229 | ||
@@ -232,7 +231,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do | |||
232 | modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t | 231 | modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t |
233 | registerSwarmSession ss | 232 | registerSwarmSession ss |
234 | 233 | ||
235 | forkIO $ discover ss | 234 | _ <- forkIO $ discover ss |
236 | 235 | ||
237 | return ss | 236 | return ss |
238 | 237 | ||
@@ -246,8 +245,8 @@ closeSwarmSession se @ SwarmSession {..} = do | |||
246 | 245 | ||
247 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | 246 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession |
248 | getSwarm cs @ ClientSession {..} ih = do | 247 | getSwarm cs @ ClientSession {..} ih = do |
249 | status <- torrentPresence cs ih | 248 | tstatus <- torrentPresence cs ih |
250 | case status of | 249 | case tstatus of |
251 | Unknown -> throw $ UnknownTorrent ih | 250 | Unknown -> throw $ UnknownTorrent ih |
252 | Active sw -> return sw | 251 | Active sw -> return sw |
253 | Registered loc -> openSwarmSession cs loc | 252 | Registered loc -> openSwarmSession cs loc |
@@ -342,9 +341,11 @@ openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | |||
342 | openSession ss @ SwarmSession {..} addr Handshake {..} = do | 341 | openSession ss @ SwarmSession {..} addr Handshake {..} = do |
343 | let clientCaps = encodeExts $ allowedExtensions $ clientSession | 342 | let clientCaps = encodeExts $ allowedExtensions $ clientSession |
344 | let enabled = decodeExts (enabledCaps clientCaps hsReserved) | 343 | let enabled = decodeExts (enabledCaps clientCaps hsReserved) |
344 | |||
345 | bf <- getClientBitfield ss | ||
345 | ps <- PeerSession addr ss enabled | 346 | ps <- PeerSession addr ss enabled |
346 | <$> atomically (dupTChan broadcastMessages) | 347 | <$> atomically (dupTChan broadcastMessages) |
347 | <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) | 348 | <*> newIORef (initialSessionState (totalCount bf)) |
348 | -- TODO we could implement more interesting throtling scheme | 349 | -- TODO we could implement more interesting throtling scheme |
349 | -- using connected peer information | 350 | -- using connected peer information |
350 | registerPeerSession ps | 351 | registerPeerSession ps |
@@ -408,7 +409,7 @@ listener cs action serverPort = bracket openListener close loop | |||
408 | putStrLn "accepted" | 409 | putStrLn "accepted" |
409 | case addr of | 410 | case addr of |
410 | SockAddrInet port host -> do | 411 | SockAddrInet port host -> do |
411 | forkIO $ do | 412 | _ <- forkIO $ do |
412 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | 413 | acceptPeerSession cs (PeerAddr Nothing host port) conn action |
413 | return () | 414 | return () |
414 | _ -> return () | 415 | _ -> return () |
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs index 69411d4e..af2a6755 100644 --- a/src/Network/BitTorrent/Sessions/Types.lhs +++ b/src/Network/BitTorrent/Sessions/Types.lhs | |||
@@ -299,7 +299,7 @@ So if client is a leecher then max sessions count depends on the | |||
299 | number of unchoke slots. | 299 | number of unchoke slots. |
300 | 300 | ||
301 | > -- | Used to bound the number of simultaneous connections and, which | 301 | > -- | Used to bound the number of simultaneous connections and, which |
302 | > -- is the same, P2P sessions within the swarm session. | 302 | > -- is the same, P2P sessions within the swarm session. |
303 | > type SessionCount = Int | 303 | > type SessionCount = Int |
304 | 304 | ||
305 | However if client is a seeder then the value depends on . | 305 | However if client is a seeder then the value depends on . |
@@ -322,8 +322,6 @@ Throttling* section. | |||
322 | Client bitfield is used to keep track "the client have" piece set. | 322 | Client bitfield is used to keep track "the client have" piece set. |
323 | Modify this carefully always updating global progress. | 323 | Modify this carefully always updating global progress. |
324 | 324 | ||
325 | > , clientBitfield :: !(TVar Bitfield) | ||
326 | |||
327 | > , storage :: !Storage | 325 | > , storage :: !Storage |
328 | 326 | ||
329 | We keep set of the all connected peers for the each particular torrent | 327 | We keep set of the all connected peers for the each particular torrent |
@@ -371,7 +369,7 @@ INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers | |||
371 | > compare = comparing (tInfoHash . torrentMeta) | 369 | > compare = comparing (tInfoHash . torrentMeta) |
372 | 370 | ||
373 | > getClientBitfield :: SwarmSession -> IO Bitfield | 371 | > getClientBitfield :: SwarmSession -> IO Bitfield |
374 | > getClientBitfield = readTVarIO . clientBitfield | 372 | > getClientBitfield SwarmSession {..} = atomically $ getCompleteBitfield storage |
375 | 373 | ||
376 | Peer sessions | 374 | Peer sessions |
377 | ------------------------------------------------------------------------ | 375 | ------------------------------------------------------------------------ |
@@ -458,16 +456,14 @@ messages & events we should send. | |||
458 | 2. Update downloaded stats --/ | 456 | 2. Update downloaded stats --/ |
459 | 3. Signal to the all other peer about this. | 457 | 3. Signal to the all other peer about this. |
460 | 458 | ||
461 | > available :: Bitfield -> SwarmSession -> IO () | 459 | > available :: Bitfield -> SwarmSession -> STM () |
462 | > available bf se @ SwarmSession {..} = {-# SCC available #-} do | 460 | > available bf SwarmSession {..} = {-# SCC available #-} do |
463 | > mark >> atomically broadcast | 461 | > updateProgress >> broadcast |
464 | > where | 462 | > where |
465 | > mark = do | 463 | > updateProgress = do |
466 | > let piLen = ciPieceLength $ tInfo $ torrentMeta | 464 | > let piLen = ciPieceLength $ tInfo $ torrentMeta |
467 | > let bytes = piLen * BF.haveCount bf | 465 | > let bytes = piLen * BF.haveCount bf |
468 | > atomically $ do | 466 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) |
469 | > modifyTVar' clientBitfield (BF.union bf) | ||
470 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
471 | > | 467 | > |
472 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | 468 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) |
473 | 469 | ||
diff --git a/src/System/IO/MMap/Fixed.hs b/src/System/IO/MMap/Fixed.hs index df6a6603..1e83c350 100644 --- a/src/System/IO/MMap/Fixed.hs +++ b/src/System/IO/MMap/Fixed.hs | |||
@@ -151,8 +151,8 @@ mallocTo fi s = do | |||
151 | lookupRegion :: FixedOffset -> Fixed -> Maybe B.ByteString | 151 | lookupRegion :: FixedOffset -> Fixed -> Maybe B.ByteString |
152 | lookupRegion offset Fixed {..} = | 152 | lookupRegion offset Fixed {..} = |
153 | case intersecting imap $ IntervalCO offset (succ offset) of | 153 | case intersecting imap $ IntervalCO offset (succ offset) of |
154 | [(i, (fptr, off))] -> let s = max 0 $ upperBound i - lowerBound i | 154 | [(i, (fptr, off))] -> let s = upperBound i - lowerBound i |
155 | in Just $ fromForeignPtr fptr off s | 155 | in Just $ fromForeignPtr fptr off (max 0 s) |
156 | _ -> Nothing | 156 | _ -> Nothing |
157 | 157 | ||
158 | -- | Note: this is unsafe operation. | 158 | -- | Note: this is unsafe operation. |
diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs index dd7258a0..c355d697 100644 --- a/src/System/Torrent/Storage.hs +++ b/src/System/Torrent/Storage.hs | |||
@@ -24,6 +24,7 @@ module System.Torrent.Storage | |||
24 | 24 | ||
25 | -- * Construction | 25 | -- * Construction |
26 | , openStorage, closeStorage, withStorage | 26 | , openStorage, closeStorage, withStorage |
27 | , getCompleteBitfield | ||
27 | 28 | ||
28 | -- * Modification | 29 | -- * Modification |
29 | , getBlk, putBlk, selBlk | 30 | , getBlk, putBlk, selBlk |
@@ -51,19 +52,20 @@ import Data.Bitfield as BF | |||
51 | import Data.Torrent | 52 | import Data.Torrent |
52 | import Network.BitTorrent.Exchange.Protocol | 53 | import Network.BitTorrent.Exchange.Protocol |
53 | import System.IO.MMap.Fixed as Fixed | 54 | import System.IO.MMap.Fixed as Fixed |
54 | import Debug.Trace | ||
55 | |||
56 | 55 | ||
56 | -- TODO merge piece validation and Sessions.available into one transaction. | ||
57 | data Storage = Storage { | 57 | data Storage = Storage { |
58 | -- | | 58 | -- | |
59 | metainfo:: !Torrent | 59 | metainfo :: !Torrent |
60 | 60 | ||
61 | -- | | 61 | -- | Bitmask of complete and verified _pieces_. |
62 | , blocks :: !(TVar Bitfield) | 62 | , complete :: !(TVar Bitfield) |
63 | |||
64 | -- | Bitmask of complete _blocks_. | ||
65 | , blocks :: !(TVar Bitfield) | ||
63 | -- TODO use bytestring for fast serialization | 66 | -- TODO use bytestring for fast serialization |
64 | -- because we need to write this bitmap to disc periodically | 67 | -- because we need to write this bitmap to disc periodically |
65 | 68 | ||
66 | |||
67 | , blockSize :: !Int | 69 | , blockSize :: !Int |
68 | 70 | ||
69 | -- | Used to map linear block addresses to disjoint | 71 | -- | Used to map linear block addresses to disjoint |
@@ -76,19 +78,23 @@ ppStorage Storage {..} = pp <$> readTVarIO blocks | |||
76 | where | 78 | where |
77 | pp bf = int blockSize | 79 | pp bf = int blockSize |
78 | 80 | ||
81 | getCompleteBitfield :: Storage -> STM Bitfield | ||
82 | getCompleteBitfield Storage {..} = readTVar complete | ||
83 | |||
79 | {----------------------------------------------------------------------- | 84 | {----------------------------------------------------------------------- |
80 | Construction | 85 | Construction |
81 | -----------------------------------------------------------------------} | 86 | -----------------------------------------------------------------------} |
82 | 87 | ||
83 | -- TODO doc args | 88 | -- TODO doc args |
84 | openStorage :: Torrent -> FilePath -> IO Storage | 89 | openStorage :: Torrent -> FilePath -> Bitfield -> IO Storage |
85 | openStorage t @ Torrent {..} contentPath = do | 90 | openStorage t @ Torrent {..} contentPath bf = do |
86 | let content_paths = contentLayout contentPath tInfo | 91 | let content_paths = contentLayout contentPath tInfo |
87 | mapM_ (mkDir . fst) content_paths | 92 | mapM_ (mkDir . fst) content_paths |
88 | 93 | ||
89 | let blockSize = defaultBlockSize `min` ciPieceLength tInfo | 94 | let blockSize = defaultBlockSize `min` ciPieceLength tInfo |
90 | print $ "content length " ++ show (contentLength tInfo) | 95 | print $ "content length " ++ show (contentLength tInfo) |
91 | Storage t <$> newTVarIO (haveNone (blockCount blockSize tInfo)) | 96 | Storage t <$> newTVarIO bf |
97 | <*> newTVarIO (haveNone (blockCount blockSize tInfo)) | ||
92 | <*> pure blockSize | 98 | <*> pure blockSize |
93 | <*> coalesceFiles content_paths | 99 | <*> coalesceFiles content_paths |
94 | where | 100 | where |
@@ -103,8 +109,8 @@ closeStorage :: Storage -> IO () | |||
103 | closeStorage st = return () | 109 | closeStorage st = return () |
104 | 110 | ||
105 | 111 | ||
106 | withStorage :: Torrent -> FilePath -> (Storage -> IO a) -> IO a | 112 | withStorage :: Torrent -> FilePath -> Bitfield -> (Storage -> IO a) -> IO a |
107 | withStorage se path = bracket (openStorage se path) closeStorage | 113 | withStorage se path bf = bracket (openStorage se path bf) closeStorage |
108 | 114 | ||
109 | {----------------------------------------------------------------------- | 115 | {----------------------------------------------------------------------- |
110 | Modification | 116 | Modification |
@@ -191,7 +197,9 @@ validatePiece pix st @ Storage {..} = {-# SCC validatePiece #-} do | |||
191 | else do | 197 | else do |
192 | piece <- getPiece pix st | 198 | piece <- getPiece pix st |
193 | if checkPiece (tInfo metainfo) pix piece | 199 | if checkPiece (tInfo metainfo) pix piece |
194 | then return True | 200 | then do |
201 | atomically $ modifyTVar' complete (BF.have pix) | ||
202 | return True | ||
195 | else do | 203 | else do |
196 | print $ "----------------------------- invalid " ++ show pix | 204 | print $ "----------------------------- invalid " ++ show pix |
197 | -- resetPiece pix st | 205 | -- resetPiece pix st |