diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 16 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 19 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions/Types.lhs | 18 |
3 files changed, 27 insertions, 26 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index dc1b2752..71be3f88 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -71,6 +71,7 @@ module Network.BitTorrent.Exchange | |||
71 | ) where | 71 | ) where |
72 | 72 | ||
73 | import Control.Applicative | 73 | import Control.Applicative |
74 | import Control.Concurrent.STM | ||
74 | import Control.Exception | 75 | import Control.Exception |
75 | import Control.Lens | 76 | import Control.Lens |
76 | import Control.Monad.Reader | 77 | import Control.Monad.Reader |
@@ -80,7 +81,7 @@ import Control.Monad.Trans.Resource | |||
80 | import Data.IORef | 81 | import Data.IORef |
81 | import Data.Conduit as C | 82 | import Data.Conduit as C |
82 | import Data.Conduit.Cereal as S | 83 | import Data.Conduit.Cereal as S |
83 | import Data.Conduit.Serialization.Binary as B | 84 | --import Data.Conduit.Serialization.Binary as B |
84 | import Data.Conduit.Network | 85 | import Data.Conduit.Network |
85 | import Data.Serialize as S | 86 | import Data.Serialize as S |
86 | import Text.PrettyPrint as PP hiding (($$)) | 87 | import Text.PrettyPrint as PP hiding (($$)) |
@@ -100,11 +101,11 @@ import System.Torrent.Storage | |||
100 | type PeerWire = ConduitM Message Message IO | 101 | type PeerWire = ConduitM Message Message IO |
101 | 102 | ||
102 | runPeerWire :: Socket -> PeerWire () -> IO () | 103 | runPeerWire :: Socket -> PeerWire () -> IO () |
103 | runPeerWire sock p2p = | 104 | runPeerWire sock action = |
104 | sourceSocket sock $= | 105 | sourceSocket sock $= |
105 | S.conduitGet S.get $= | 106 | S.conduitGet S.get $= |
106 | -- B.conduitDecode $= | 107 | -- B.conduitDecode $= |
107 | p2p $= | 108 | action $= |
108 | S.conduitPut S.put $$ | 109 | S.conduitPut S.put $$ |
109 | -- B.conduitEncode $$ | 110 | -- B.conduitEncode $$ |
110 | sinkSocket sock | 111 | sinkSocket sock |
@@ -153,9 +154,9 @@ instance MonadState SessionState P2P where | |||
153 | {-# INLINE put #-} | 154 | {-# INLINE put #-} |
154 | 155 | ||
155 | runP2P :: (Socket, PeerSession) -> P2P () -> IO () | 156 | runP2P :: (Socket, PeerSession) -> P2P () -> IO () |
156 | runP2P (sock, ses) p2p = | 157 | runP2P (sock, ses) action = |
157 | handle isIOException $ | 158 | handle isIOException $ |
158 | runPeerWire sock (runReaderT (unP2P p2p) ses) | 159 | runPeerWire sock (runReaderT (unP2P action) ses) |
159 | where | 160 | where |
160 | isIOException :: IOException -> IO () | 161 | isIOException :: IOException -> IO () |
161 | isIOException _ = return () | 162 | isIOException _ = return () |
@@ -428,7 +429,10 @@ yieldEvent e = {-# SCC yieldEvent #-} do | |||
428 | go e | 429 | go e |
429 | flushPending | 430 | flushPending |
430 | where | 431 | where |
431 | go (Available ixs) = asks swarmSession >>= liftIO . available ixs | 432 | go (Available ixs) = do |
433 | ses <- asks swarmSession | ||
434 | liftIO $ atomically $ available ixs ses | ||
435 | |||
432 | go (Want bix) = do | 436 | go (Want bix) = do |
433 | offer <- peerOffer | 437 | offer <- peerOffer |
434 | if ixPiece bix `BF.member` offer | 438 | if ixPiece bix `BF.member` offer |
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs index 43a34df9..2118ccf0 100644 --- a/src/Network/BitTorrent/Sessions.hs +++ b/src/Network/BitTorrent/Sessions.hs | |||
@@ -202,9 +202,9 @@ discover swarm @ SwarmSession {..} = {-# SCC discover #-} do | |||
202 | forever $ do | 202 | forever $ do |
203 | addr <- getPeerAddr tses | 203 | addr <- getPeerAddr tses |
204 | forkThrottle swarm $ do | 204 | forkThrottle swarm $ do |
205 | initiatePeerSession swarm addr $ \conn -> do | 205 | initiatePeerSession swarm addr $ \pconn -> do |
206 | print addr | 206 | print addr |
207 | runP2P conn p2p | 207 | runP2P pconn p2p |
208 | 208 | ||
209 | registerSwarmSession :: SwarmSession -> STM () | 209 | registerSwarmSession :: SwarmSession -> STM () |
210 | registerSwarmSession ss @ SwarmSession {..} = | 210 | registerSwarmSession ss @ SwarmSession {..} = |
@@ -223,8 +223,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do | |||
223 | 223 | ||
224 | ss <- SwarmSession t cs | 224 | ss <- SwarmSession t cs |
225 | <$> MSem.new defLeecherConns | 225 | <$> MSem.new defLeecherConns |
226 | <*> newTVarIO bf | 226 | <*> openStorage t dataDirPath bf |
227 | <*> openStorage t dataDirPath | ||
228 | <*> newTVarIO S.empty | 227 | <*> newTVarIO S.empty |
229 | <*> newBroadcastTChanIO | 228 | <*> newBroadcastTChanIO |
230 | 229 | ||
@@ -232,7 +231,7 @@ openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do | |||
232 | modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t | 231 | modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t |
233 | registerSwarmSession ss | 232 | registerSwarmSession ss |
234 | 233 | ||
235 | forkIO $ discover ss | 234 | _ <- forkIO $ discover ss |
236 | 235 | ||
237 | return ss | 236 | return ss |
238 | 237 | ||
@@ -246,8 +245,8 @@ closeSwarmSession se @ SwarmSession {..} = do | |||
246 | 245 | ||
247 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | 246 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession |
248 | getSwarm cs @ ClientSession {..} ih = do | 247 | getSwarm cs @ ClientSession {..} ih = do |
249 | status <- torrentPresence cs ih | 248 | tstatus <- torrentPresence cs ih |
250 | case status of | 249 | case tstatus of |
251 | Unknown -> throw $ UnknownTorrent ih | 250 | Unknown -> throw $ UnknownTorrent ih |
252 | Active sw -> return sw | 251 | Active sw -> return sw |
253 | Registered loc -> openSwarmSession cs loc | 252 | Registered loc -> openSwarmSession cs loc |
@@ -342,9 +341,11 @@ openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | |||
342 | openSession ss @ SwarmSession {..} addr Handshake {..} = do | 341 | openSession ss @ SwarmSession {..} addr Handshake {..} = do |
343 | let clientCaps = encodeExts $ allowedExtensions $ clientSession | 342 | let clientCaps = encodeExts $ allowedExtensions $ clientSession |
344 | let enabled = decodeExts (enabledCaps clientCaps hsReserved) | 343 | let enabled = decodeExts (enabledCaps clientCaps hsReserved) |
344 | |||
345 | bf <- getClientBitfield ss | ||
345 | ps <- PeerSession addr ss enabled | 346 | ps <- PeerSession addr ss enabled |
346 | <$> atomically (dupTChan broadcastMessages) | 347 | <$> atomically (dupTChan broadcastMessages) |
347 | <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) | 348 | <*> newIORef (initialSessionState (totalCount bf)) |
348 | -- TODO we could implement more interesting throtling scheme | 349 | -- TODO we could implement more interesting throtling scheme |
349 | -- using connected peer information | 350 | -- using connected peer information |
350 | registerPeerSession ps | 351 | registerPeerSession ps |
@@ -408,7 +409,7 @@ listener cs action serverPort = bracket openListener close loop | |||
408 | putStrLn "accepted" | 409 | putStrLn "accepted" |
409 | case addr of | 410 | case addr of |
410 | SockAddrInet port host -> do | 411 | SockAddrInet port host -> do |
411 | forkIO $ do | 412 | _ <- forkIO $ do |
412 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | 413 | acceptPeerSession cs (PeerAddr Nothing host port) conn action |
413 | return () | 414 | return () |
414 | _ -> return () | 415 | _ -> return () |
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs index 69411d4e..af2a6755 100644 --- a/src/Network/BitTorrent/Sessions/Types.lhs +++ b/src/Network/BitTorrent/Sessions/Types.lhs | |||
@@ -299,7 +299,7 @@ So if client is a leecher then max sessions count depends on the | |||
299 | number of unchoke slots. | 299 | number of unchoke slots. |
300 | 300 | ||
301 | > -- | Used to bound the number of simultaneous connections and, which | 301 | > -- | Used to bound the number of simultaneous connections and, which |
302 | > -- is the same, P2P sessions within the swarm session. | 302 | > -- is the same, P2P sessions within the swarm session. |
303 | > type SessionCount = Int | 303 | > type SessionCount = Int |
304 | 304 | ||
305 | However if client is a seeder then the value depends on . | 305 | However if client is a seeder then the value depends on . |
@@ -322,8 +322,6 @@ Throttling* section. | |||
322 | Client bitfield is used to keep track "the client have" piece set. | 322 | Client bitfield is used to keep track "the client have" piece set. |
323 | Modify this carefully always updating global progress. | 323 | Modify this carefully always updating global progress. |
324 | 324 | ||
325 | > , clientBitfield :: !(TVar Bitfield) | ||
326 | |||
327 | > , storage :: !Storage | 325 | > , storage :: !Storage |
328 | 326 | ||
329 | We keep set of the all connected peers for the each particular torrent | 327 | We keep set of the all connected peers for the each particular torrent |
@@ -371,7 +369,7 @@ INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers | |||
371 | > compare = comparing (tInfoHash . torrentMeta) | 369 | > compare = comparing (tInfoHash . torrentMeta) |
372 | 370 | ||
373 | > getClientBitfield :: SwarmSession -> IO Bitfield | 371 | > getClientBitfield :: SwarmSession -> IO Bitfield |
374 | > getClientBitfield = readTVarIO . clientBitfield | 372 | > getClientBitfield SwarmSession {..} = atomically $ getCompleteBitfield storage |
375 | 373 | ||
376 | Peer sessions | 374 | Peer sessions |
377 | ------------------------------------------------------------------------ | 375 | ------------------------------------------------------------------------ |
@@ -458,16 +456,14 @@ messages & events we should send. | |||
458 | 2. Update downloaded stats --/ | 456 | 2. Update downloaded stats --/ |
459 | 3. Signal to the all other peer about this. | 457 | 3. Signal to the all other peer about this. |
460 | 458 | ||
461 | > available :: Bitfield -> SwarmSession -> IO () | 459 | > available :: Bitfield -> SwarmSession -> STM () |
462 | > available bf se @ SwarmSession {..} = {-# SCC available #-} do | 460 | > available bf SwarmSession {..} = {-# SCC available #-} do |
463 | > mark >> atomically broadcast | 461 | > updateProgress >> broadcast |
464 | > where | 462 | > where |
465 | > mark = do | 463 | > updateProgress = do |
466 | > let piLen = ciPieceLength $ tInfo $ torrentMeta | 464 | > let piLen = ciPieceLength $ tInfo $ torrentMeta |
467 | > let bytes = piLen * BF.haveCount bf | 465 | > let bytes = piLen * BF.haveCount bf |
468 | > atomically $ do | 466 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) |
469 | > modifyTVar' clientBitfield (BF.union bf) | ||
470 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
471 | > | 467 | > |
472 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | 468 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) |
473 | 469 | ||