diff options
-rw-r--r-- | bittorrent.cabal | 1 | ||||
-rw-r--r-- | src/Data/Bitfield.hs | 5 | ||||
-rw-r--r-- | src/Data/Torrent.hs | 16 | ||||
-rw-r--r-- | src/Network/BitTorrent.hs | 12 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 18 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 124 | ||||
-rw-r--r-- | src/Network/BitTorrent/Tracker.hs | 6 | ||||
-rw-r--r-- | src/System/Torrent/Storage.hs | 48 |
8 files changed, 194 insertions, 36 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal index 7f1a9be5..b3ffbe49 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal | |||
@@ -41,6 +41,7 @@ library | |||
41 | , Network.BitTorrent.Peer | 41 | , Network.BitTorrent.Peer |
42 | , Network.BitTorrent.Tracker | 42 | , Network.BitTorrent.Tracker |
43 | , Network.BitTorrent.Exchange | 43 | , Network.BitTorrent.Exchange |
44 | , System.Torrent.Storage | ||
44 | 45 | ||
45 | other-modules: Network.BitTorrent.Internal | 46 | other-modules: Network.BitTorrent.Internal |
46 | 47 | ||
diff --git a/src/Data/Bitfield.hs b/src/Data/Bitfield.hs index 11897786..46e0a71f 100644 --- a/src/Data/Bitfield.hs +++ b/src/Data/Bitfield.hs | |||
@@ -52,6 +52,7 @@ module Data.Bitfield | |||
52 | 52 | ||
53 | -- * Serialization | 53 | -- * Serialization |
54 | , fromBitmap, toBitmap | 54 | , fromBitmap, toBitmap |
55 | , toList | ||
55 | 56 | ||
56 | -- * Selection | 57 | -- * Selection |
57 | , Selector | 58 | , Selector |
@@ -259,6 +260,10 @@ unions = foldl' union (haveNone 0) | |||
259 | Serialization | 260 | Serialization |
260 | -----------------------------------------------------------------------} | 261 | -----------------------------------------------------------------------} |
261 | 262 | ||
263 | -- | List all have indexes. | ||
264 | toList :: Bitfield -> [PieceIx] | ||
265 | toList Bitfield {..} = S.toList bfSet | ||
266 | |||
262 | -- | Unpack 'Bitfield' from tightly packed bit array. Note resulting | 267 | -- | Unpack 'Bitfield' from tightly packed bit array. Note resulting |
263 | -- size might be more than real bitfield size, use 'adjustSize'. | 268 | -- size might be more than real bitfield size, use 'adjustSize'. |
264 | fromBitmap :: ByteString -> Bitfield | 269 | fromBitmap :: ByteString -> Bitfield |
diff --git a/src/Data/Torrent.hs b/src/Data/Torrent.hs index 368ec77c..551a260c 100644 --- a/src/Data/Torrent.hs +++ b/src/Data/Torrent.hs | |||
@@ -356,6 +356,22 @@ isMultiFile :: ContentInfo -> Bool | |||
356 | isMultiFile MultiFile {} = True | 356 | isMultiFile MultiFile {} = True |
357 | isMultiFile _ = False | 357 | isMultiFile _ = False |
358 | 358 | ||
359 | slice :: Int -> Int -> ByteString -> ByteString | ||
360 | slice from to = B.take to . B.drop from | ||
361 | |||
362 | -- | Extract validation hash by specified piece index. | ||
363 | pieceHash :: ContentInfo -> Int -> ByteString | ||
364 | pieceHash ci ix = slice offset size (ciPieces ci) | ||
365 | where | ||
366 | offset = ciPieceLength ci * ix | ||
367 | size = ciPieceLength ci | ||
368 | |||
369 | -- | Validate piece with metainfo hash. | ||
370 | checkPiece :: ContentInfo -> Int -> ByteString -> Bool | ||
371 | checkPiece ci ix piece | ||
372 | = B.length piece == ciPieceLength ci | ||
373 | && hash piece == InfoHash (pieceHash ci ix) | ||
374 | |||
359 | -- | Read and decode a .torrent file. | 375 | -- | Read and decode a .torrent file. |
360 | fromFile :: FilePath -> IO Torrent | 376 | fromFile :: FilePath -> IO Torrent |
361 | fromFile filepath = do | 377 | fromFile filepath = do |
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index b97db4b0..86c7802b 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs | |||
@@ -33,8 +33,15 @@ module Network.BitTorrent | |||
33 | , SessionCount | 33 | , SessionCount |
34 | , getSessionCount | 34 | , getSessionCount |
35 | 35 | ||
36 | -- * Storage | ||
37 | , Storage | ||
38 | , bindTo | ||
39 | , unbind | ||
40 | |||
36 | -- * Discovery | 41 | -- * Discovery |
37 | , discover | 42 | , discover |
43 | , exchange | ||
44 | |||
38 | 45 | ||
39 | -- * Peer to Peer | 46 | -- * Peer to Peer |
40 | , P2P | 47 | , P2P |
@@ -122,12 +129,13 @@ discover swarm action = do | |||
122 | -- \---------------------------/ | 129 | -- \---------------------------/ |
123 | -- | 130 | -- |
124 | 131 | ||
132 | |||
125 | -- | Default P2P action. | 133 | -- | Default P2P action. |
126 | exchange :: Storage -> P2P () | 134 | exchange :: Storage -> P2P () |
127 | exchange storage = handleEvent handler | 135 | exchange storage = handleEvent (\msg -> liftIO (print msg) >> handler msg) |
128 | where | 136 | where |
129 | handler (Available bf) | 137 | handler (Available bf) |
130 | | Just m <- findMin bf = return (Want (BlockIx m 0 10)) | 138 | | Just m <- findMin bf = return (Want (BlockIx m 0 262144)) |
131 | | otherwise = error "impossible" | 139 | | otherwise = error "impossible" |
132 | -- TODO findMin :: Bitfield -> PieceIx | 140 | -- TODO findMin :: Bitfield -> PieceIx |
133 | 141 | ||
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 3d05f7fc..505360a4 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -109,15 +109,18 @@ runPeerWire sock p2p = | |||
109 | sinkSocket sock | 109 | sinkSocket sock |
110 | 110 | ||
111 | awaitMessage :: P2P Message | 111 | awaitMessage :: P2P Message |
112 | awaitMessage = P2P (ReaderT (const go)) | 112 | awaitMessage = P2P $ ReaderT $ const go |
113 | where | 113 | where |
114 | go = await >>= maybe (monadThrow PeerDisconnected) return | 114 | go = await >>= maybe (monadThrow PeerDisconnected) return |
115 | {-# INLINE awaitMessage #-} | 115 | {-# INLINE awaitMessage #-} |
116 | 116 | ||
117 | yieldMessage :: Message -> P2P () | 117 | yieldMessage :: Message -> P2P () |
118 | yieldMessage msg = P2P $ ReaderT $ \se -> C.yield msg | 118 | yieldMessage msg = P2P $ ReaderT $ const (C.yield msg) |
119 | {-# INLINE yieldMessage #-} | 119 | {-# INLINE yieldMessage #-} |
120 | 120 | ||
121 | flushPending :: P2P () | ||
122 | flushPending = ask >>= liftIO . getPending >>= mapM_ yieldMessage | ||
123 | |||
121 | {----------------------------------------------------------------------- | 124 | {----------------------------------------------------------------------- |
122 | P2P monad | 125 | P2P monad |
123 | -----------------------------------------------------------------------} | 126 | -----------------------------------------------------------------------} |
@@ -313,7 +316,9 @@ data Event | |||
313 | -- forall (Fragment block). isPiece block == True | 316 | -- forall (Fragment block). isPiece block == True |
314 | -- | 317 | -- |
315 | awaitEvent :: P2P Event | 318 | awaitEvent :: P2P Event |
316 | awaitEvent = awaitMessage >>= go | 319 | awaitEvent = do |
320 | |||
321 | awaitMessage >>= go | ||
317 | where | 322 | where |
318 | go KeepAlive = awaitEvent | 323 | go KeepAlive = awaitEvent |
319 | go Choke = do | 324 | go Choke = do |
@@ -432,7 +437,7 @@ awaitEvent = awaitMessage >>= go | |||
432 | -- most likely will be ignored without any network IO. | 437 | -- most likely will be ignored without any network IO. |
433 | -- | 438 | -- |
434 | yieldEvent :: Event -> P2P () | 439 | yieldEvent :: Event -> P2P () |
435 | yieldEvent (Available _ ) = undefined | 440 | yieldEvent (Available ixs) = asks swarmSession >>= liftIO . available ixs |
436 | yieldEvent (Want bix) = do | 441 | yieldEvent (Want bix) = do |
437 | offer <- peerOffer | 442 | offer <- peerOffer |
438 | if ixPiece bix `BF.member` offer | 443 | if ixPiece bix `BF.member` offer |
@@ -449,10 +454,5 @@ yieldEvent (Fragment blk) = do | |||
449 | handleEvent :: (Event -> P2P Event) -> P2P () | 454 | handleEvent :: (Event -> P2P Event) -> P2P () |
450 | handleEvent action = awaitEvent >>= action >>= yieldEvent | 455 | handleEvent action = awaitEvent >>= action >>= yieldEvent |
451 | 456 | ||
452 | --flushBroadcast :: P2P () | ||
453 | --flushBroadcast = nextBroadcast >>= maybe (return ()) go | ||
454 | -- where | ||
455 | -- go = undefined | ||
456 | |||
457 | checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool | 457 | checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool |
458 | checkPiece = undefined | 458 | checkPiece = undefined |
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index f163cadb..38388b9a 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -15,6 +15,7 @@ | |||
15 | -- | 15 | -- |
16 | {-# LANGUAGE OverloadedStrings #-} | 16 | {-# LANGUAGE OverloadedStrings #-} |
17 | {-# LANGUAGE RecordWildCards #-} | 17 | {-# LANGUAGE RecordWildCards #-} |
18 | {-# LANGUAGE ViewPatterns #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | 19 | {-# LANGUAGE TemplateHaskell #-} |
19 | {-# LANGUAGE DeriveDataTypeable #-} | 20 | {-# LANGUAGE DeriveDataTypeable #-} |
20 | module Network.BitTorrent.Internal | 21 | module Network.BitTorrent.Internal |
@@ -47,6 +48,8 @@ module Network.BitTorrent.Internal | |||
47 | , leaveSwarm | 48 | , leaveSwarm |
48 | , waitVacancy | 49 | , waitVacancy |
49 | 50 | ||
51 | , available | ||
52 | |||
50 | -- * Peer | 53 | -- * Peer |
51 | , PeerSession( PeerSession, connectedPeerAddr | 54 | , PeerSession( PeerSession, connectedPeerAddr |
52 | , swarmSession, enabledExtensions | 55 | , swarmSession, enabledExtensions |
@@ -54,6 +57,7 @@ module Network.BitTorrent.Internal | |||
54 | ) | 57 | ) |
55 | , SessionState | 58 | , SessionState |
56 | , withPeerSession | 59 | , withPeerSession |
60 | , getPending | ||
57 | 61 | ||
58 | -- ** Exceptions | 62 | -- ** Exceptions |
59 | , SessionException(..) | 63 | , SessionException(..) |
@@ -68,6 +72,8 @@ module Network.BitTorrent.Internal | |||
68 | , updateIncoming, updateOutcoming | 72 | , updateIncoming, updateOutcoming |
69 | ) where | 73 | ) where |
70 | 74 | ||
75 | import Prelude hiding (mapM_) | ||
76 | |||
71 | import Control.Applicative | 77 | import Control.Applicative |
72 | import Control.Concurrent | 78 | import Control.Concurrent |
73 | import Control.Concurrent.STM | 79 | import Control.Concurrent.STM |
@@ -79,6 +85,7 @@ import Control.Monad.Trans | |||
79 | import Data.IORef | 85 | import Data.IORef |
80 | import Data.Default | 86 | import Data.Default |
81 | import Data.Function | 87 | import Data.Function |
88 | import Data.Foldable (mapM_) | ||
82 | import Data.Ord | 89 | import Data.Ord |
83 | import Data.Set as S | 90 | import Data.Set as S |
84 | import Data.Typeable | 91 | import Data.Typeable |
@@ -104,7 +111,7 @@ import Network.BitTorrent.Tracker.Protocol as BT | |||
104 | -----------------------------------------------------------------------} | 111 | -----------------------------------------------------------------------} |
105 | 112 | ||
106 | -- | 'Progress' contains upload/download/left stats about | 113 | -- | 'Progress' contains upload/download/left stats about |
107 | -- current client state and used to notify the tracker | 114 | -- current client state and used to notify the tracker. |
108 | -- | 115 | -- |
109 | -- This data is considered as dynamic within one client | 116 | -- This data is considered as dynamic within one client |
110 | -- session. This data also should be shared across client | 117 | -- session. This data also should be shared across client |
@@ -112,12 +119,14 @@ import Network.BitTorrent.Tracker.Protocol as BT | |||
112 | -- to get initial 'Progress'. | 119 | -- to get initial 'Progress'. |
113 | -- | 120 | -- |
114 | data Progress = Progress { | 121 | data Progress = Progress { |
115 | prUploaded :: !Integer -- ^ Total amount of bytes uploaded. | 122 | _uploaded :: !Integer -- ^ Total amount of bytes uploaded. |
116 | , prDownloaded :: !Integer -- ^ Total amount of bytes downloaded. | 123 | , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. |
117 | , prLeft :: !Integer -- ^ Total amount of bytes left. | 124 | , _left :: !Integer -- ^ Total amount of bytes left. |
118 | } deriving (Show, Read, Eq) | 125 | } deriving (Show, Read, Eq) |
119 | 126 | ||
120 | -- TODO make lenses | 127 | -- TODO use atomic bits and Word64 |
128 | |||
129 | $(makeLenses ''Progress) | ||
121 | 130 | ||
122 | -- | Initial progress is used when there are no session before. | 131 | -- | Initial progress is used when there are no session before. |
123 | -- | 132 | -- |
@@ -128,6 +137,29 @@ data Progress = Progress { | |||
128 | startProgress :: Integer -> Progress | 137 | startProgress :: Integer -> Progress |
129 | startProgress = Progress 0 0 | 138 | startProgress = Progress 0 0 |
130 | 139 | ||
140 | -- | Used when the client download some data from /any/ peer. | ||
141 | downloadedProgress :: Int -> Progress -> Progress | ||
142 | downloadedProgress (fromIntegral -> amount) | ||
143 | = (left -~ amount) | ||
144 | . (downloaded +~ amount) | ||
145 | {-# INLINE downloadedProgress #-} | ||
146 | |||
147 | -- | Used when the client upload some data to /any/ peer. | ||
148 | uploadedProgress :: Int -> Progress -> Progress | ||
149 | uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
150 | {-# INLINE uploadedProgress #-} | ||
151 | |||
152 | -- | Used when leecher join client session. | ||
153 | enqueuedProgress :: Int -> Progress -> Progress | ||
154 | enqueuedProgress (fromIntegral -> amount) = left +~ amount | ||
155 | {-# INLINE enqueuedProgress #-} | ||
156 | |||
157 | -- | Used when leecher leave client session. | ||
158 | -- (e.g. user deletes not completed torrent) | ||
159 | dequeuedProgress :: Int -> Progress -> Progress | ||
160 | dequeuedProgress (fromIntegral -> amount) = left -~ amount | ||
161 | {-# INLINE dequeuedProgress #-} | ||
162 | |||
131 | {----------------------------------------------------------------------- | 163 | {----------------------------------------------------------------------- |
132 | Client session | 164 | Client session |
133 | -----------------------------------------------------------------------} | 165 | -----------------------------------------------------------------------} |
@@ -193,6 +225,9 @@ data ClientSession = ClientSession { | |||
193 | , currentProgress :: !(TVar Progress) | 225 | , currentProgress :: !(TVar Progress) |
194 | } | 226 | } |
195 | 227 | ||
228 | -- currentProgress field is reduntant: progress depends on the all swarm bitfields | ||
229 | -- maybe we can remove the 'currentProgress' and compute it on demand? | ||
230 | |||
196 | instance Eq ClientSession where | 231 | instance Eq ClientSession where |
197 | (==) = (==) `on` clientPeerID | 232 | (==) = (==) `on` clientPeerID |
198 | 233 | ||
@@ -274,7 +309,15 @@ data SwarmSession = SwarmSession { | |||
274 | 309 | ||
275 | -- | Modify this carefully updating global progress. | 310 | -- | Modify this carefully updating global progress. |
276 | , clientBitfield :: !(TVar Bitfield) | 311 | , clientBitfield :: !(TVar Bitfield) |
312 | |||
277 | , connectedPeers :: !(TVar (Set PeerSession)) | 313 | , connectedPeers :: !(TVar (Set PeerSession)) |
314 | |||
315 | -- TODO use bounded broadcast chan with priority queue and drop old entries | ||
316 | -- | Channel used for replicate messages across all peers in | ||
317 | -- swarm. For exsample if we get some piece we should sent to all | ||
318 | -- connected (and interested in) peers HAVE message. | ||
319 | -- | ||
320 | , broadcastMessages :: !(TChan Message) | ||
278 | } | 321 | } |
279 | 322 | ||
280 | -- INVARIANT: | 323 | -- INVARIANT: |
@@ -294,6 +337,7 @@ newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | |||
294 | <*> MSem.new n | 337 | <*> MSem.new n |
295 | <*> newTVarIO bf | 338 | <*> newTVarIO bf |
296 | <*> newTVarIO S.empty | 339 | <*> newTVarIO S.empty |
340 | <*> newBroadcastTChanIO | ||
297 | 341 | ||
298 | -- | New swarm session in which the client allowed to upload only. | 342 | -- | New swarm session in which the client allowed to upload only. |
299 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | 343 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession |
@@ -342,6 +386,9 @@ waitVacancy se = | |||
342 | bracket (enterSwarm se) (const (leaveSwarm se)) | 386 | bracket (enterSwarm se) (const (leaveSwarm se)) |
343 | . const | 387 | . const |
344 | 388 | ||
389 | pieceLength :: SwarmSession -> Int | ||
390 | pieceLength = ciPieceLength . tInfo . torrentMeta | ||
391 | |||
345 | {----------------------------------------------------------------------- | 392 | {----------------------------------------------------------------------- |
346 | Peer session | 393 | Peer session |
347 | -----------------------------------------------------------------------} | 394 | -----------------------------------------------------------------------} |
@@ -380,17 +427,15 @@ data PeerSession = PeerSession { | |||
380 | , outcomingTimeout :: !TimeoutKey | 427 | , outcomingTimeout :: !TimeoutKey |
381 | 428 | ||
382 | -- TODO use dupChan for broadcasting | 429 | -- TODO use dupChan for broadcasting |
383 | 430 | -- | Broadcast messages waiting to be sent to peer. | |
384 | -- | Channel used for replicate messages across all peers in | 431 | , pendingMessages :: !(TChan Message) |
385 | -- swarm. For exsample if we get some piece we should sent to all | ||
386 | -- connected (and interested in) peers HAVE message. | ||
387 | -- | ||
388 | , broadcastMessages :: !(Chan [Message]) | ||
389 | 432 | ||
390 | -- | Dymanic P2P data. | 433 | -- | Dymanic P2P data. |
391 | , sessionState :: !(IORef SessionState) | 434 | , sessionState :: !(IORef SessionState) |
392 | } | 435 | } |
393 | 436 | ||
437 | -- TODO unpack some fields | ||
438 | |||
394 | data SessionState = SessionState { | 439 | data SessionState = SessionState { |
395 | _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | 440 | _bitfield :: !Bitfield -- ^ Other peer Have bitfield. |
396 | , _status :: !SessionStatus -- ^ Status of both peers. | 441 | , _status :: !SessionStatus -- ^ Status of both peers. |
@@ -451,7 +496,7 @@ withPeerSession ss @ SwarmSession {..} addr | |||
451 | maxIncomingTime (return ()) | 496 | maxIncomingTime (return ()) |
452 | <*> registerTimeout (eventManager clientSession) | 497 | <*> registerTimeout (eventManager clientSession) |
453 | maxOutcomingTime (sendKA sock) | 498 | maxOutcomingTime (sendKA sock) |
454 | <*> newChan | 499 | <*> atomically (dupTChan broadcastMessages) |
455 | <*> do { | 500 | <*> do { |
456 | ; tc <- totalCount <$> readTVarIO clientBitfield | 501 | ; tc <- totalCount <$> readTVarIO clientBitfield |
457 | ; newIORef (SessionState (haveNone tc) def) | 502 | ; newIORef (SessionState (haveNone tc) def) |
@@ -468,10 +513,55 @@ withPeerSession ss @ SwarmSession {..} addr | |||
468 | findPieceCount :: PeerSession -> PieceCount | 513 | findPieceCount :: PeerSession -> PieceCount |
469 | findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | 514 | findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession |
470 | 515 | ||
471 | -- TODO use this type for broadcast messages instead of 'Message' | 516 | {----------------------------------------------------------------------- |
472 | --data Signal = | 517 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece |
473 | --nextBroadcast :: P2P (Maybe Signal) | 518 | -----------------------------------------------------------------------} |
474 | --nextBroadcast = | 519 | |
520 | -- here we should enqueue broadcast messages and keep in mind that: | ||
521 | -- | ||
522 | -- * We should enqueue broadcast events as they are appear. | ||
523 | -- * We should yield broadcast messages as fast as we get them. | ||
524 | -- | ||
525 | -- these 2 phases might differ in time significantly | ||
526 | |||
527 | -- TODO do this; but only when it'll be clean which other broadcast | ||
528 | -- messages & events we should send | ||
529 | |||
530 | -- 1. Update client have bitfield --\____ in one transaction; | ||
531 | -- 2. Update downloaded stats --/ | ||
532 | -- 3. Signal to the all other peer about this. | ||
533 | |||
534 | available :: Bitfield -> SwarmSession -> IO () | ||
535 | available bf se @ SwarmSession {..} = mark >> atomically broadcast | ||
536 | where | ||
537 | mark = do | ||
538 | let bytes = pieceLength se * BF.haveCount bf | ||
539 | atomically $ do | ||
540 | modifyTVar' clientBitfield (BF.union bf) | ||
541 | modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
542 | |||
543 | broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
544 | |||
545 | |||
546 | -- TODO compute size of messages: if it's faster to send Bitfield | ||
547 | -- instead many Have do that | ||
548 | -- | ||
549 | -- also if there is single Have message in queue then the | ||
550 | -- corresponding piece is likely still in memory or disc cache, | ||
551 | -- when we can send SuggestPiece | ||
552 | |||
553 | -- | Get pending messages queue appeared in result of asynchronously | ||
554 | -- changed client state. Resulting queue should be sent to a peer | ||
555 | -- immediately. | ||
556 | getPending :: PeerSession -> IO [Message] | ||
557 | getPending PeerSession {..} = atomically (readAvail pendingMessages) | ||
558 | |||
559 | readAvail :: TChan a -> STM [a] | ||
560 | readAvail chan = do | ||
561 | m <- tryReadTChan chan | ||
562 | case m of | ||
563 | Just a -> (:) <$> pure a <*> readAvail chan | ||
564 | Nothing -> return [] | ||
475 | 565 | ||
476 | {----------------------------------------------------------------------- | 566 | {----------------------------------------------------------------------- |
477 | Timeouts | 567 | Timeouts |
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index ea45b75d..bfe4182d 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs | |||
@@ -90,9 +90,9 @@ genericReq ses pr = TRequest { | |||
90 | , reqPeerID = tconnPeerID ses | 90 | , reqPeerID = tconnPeerID ses |
91 | , reqPort = tconnPort ses | 91 | , reqPort = tconnPort ses |
92 | 92 | ||
93 | , reqUploaded = prUploaded pr | 93 | , reqUploaded = _uploaded pr |
94 | , reqDownloaded = prDownloaded pr | 94 | , reqDownloaded = _downloaded pr |
95 | , reqLeft = prLeft pr | 95 | , reqLeft = _left pr |
96 | 96 | ||
97 | , reqIP = Nothing | 97 | , reqIP = Nothing |
98 | , reqNumWant = Nothing | 98 | , reqNumWant = Nothing |
diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs index f73f1a55..a5529fe6 100644 --- a/src/System/Torrent/Storage.hs +++ b/src/System/Torrent/Storage.hs | |||
@@ -29,6 +29,7 @@ module System.Torrent.Storage | |||
29 | ) where | 29 | ) where |
30 | 30 | ||
31 | import Control.Applicative | 31 | import Control.Applicative |
32 | import Control.Concurrent.STM | ||
32 | import Control.Exception | 33 | import Control.Exception |
33 | import Control.Monad | 34 | import Control.Monad |
34 | import Data.ByteString as B | 35 | import Data.ByteString as B |
@@ -37,6 +38,7 @@ import Data.List as L | |||
37 | import System.FilePath | 38 | import System.FilePath |
38 | import System.Directory | 39 | import System.Directory |
39 | 40 | ||
41 | import Data.Bitfield | ||
40 | import Data.Torrent | 42 | import Data.Torrent |
41 | import Network.BitTorrent.Exchange.Protocol | 43 | import Network.BitTorrent.Exchange.Protocol |
42 | import Network.BitTorrent.Internal | 44 | import Network.BitTorrent.Internal |
@@ -47,19 +49,28 @@ data Storage = Storage { | |||
47 | -- | | 49 | -- | |
48 | session :: !SwarmSession | 50 | session :: !SwarmSession |
49 | 51 | ||
50 | -- | Used to map linear block addresses to disjoint mallocated/mmaped adresses. | 52 | -- | |
53 | , blocks :: !(TVar Bitfield) | ||
54 | |||
55 | -- | Used to map linear block addresses to disjoint | ||
56 | -- mallocated/mmaped adresses. | ||
51 | , payload :: !Fixed | 57 | , payload :: !Fixed |
52 | } | 58 | } |
53 | 59 | ||
54 | pieceSize :: Storage -> Int | 60 | pieceSize :: Storage -> Int |
55 | pieceSize = ciPieceLength . tInfo . torrentMeta . session | 61 | pieceSize = ciPieceLength . tInfo . torrentMeta . session |
56 | 62 | ||
63 | {----------------------------------------------------------------------- | ||
64 | Construction | ||
65 | -----------------------------------------------------------------------} | ||
66 | |||
57 | -- TODO doc args | 67 | -- TODO doc args |
58 | bindTo :: SwarmSession -> FilePath -> IO Storage | 68 | bindTo :: SwarmSession -> FilePath -> IO Storage |
59 | bindTo se @ SwarmSession {..} contentPath = do | 69 | bindTo se @ SwarmSession {..} contentPath = do |
60 | let content_paths = contentLayout contentPath (tInfo torrentMeta) | 70 | let content_paths = contentLayout contentPath (tInfo torrentMeta) |
61 | mapM_ mkDir (L.map fst content_paths) | 71 | mapM_ mkDir (L.map fst content_paths) |
62 | Storage se <$> coalesceFiles content_paths | 72 | Storage se <$> newTVarIO (haveNone (ciPieceLength (tInfo torrentMeta))) |
73 | <*> coalesceFiles content_paths | ||
63 | where | 74 | where |
64 | mkDir path = do | 75 | mkDir path = do |
65 | let dirPath = fst (splitFileName path) | 76 | let dirPath = fst (splitFileName path) |
@@ -74,17 +85,44 @@ unbind st = error "unmapStorage" | |||
74 | withStorage :: SwarmSession -> FilePath -> (Storage -> IO a) -> IO a | 85 | withStorage :: SwarmSession -> FilePath -> (Storage -> IO a) -> IO a |
75 | withStorage se path = bracket (se `bindTo` path) unbind | 86 | withStorage se path = bracket (se `bindTo` path) unbind |
76 | 87 | ||
88 | {----------------------------------------------------------------------- | ||
89 | Modification | ||
90 | -----------------------------------------------------------------------} | ||
91 | |||
92 | -- TODO to avoid races we might need to try Control.Concurrent.yield | ||
93 | -- TODO lazy block payload | ||
77 | 94 | ||
95 | -- | Write a block to the storage. If block out of range then block is clipped. | ||
78 | putBlk :: Block -> Storage -> IO () | 96 | putBlk :: Block -> Storage -> IO () |
79 | putBlk blk @ Block {..} st @ Storage {..} = do | 97 | putBlk blk @ Block {..} st @ Storage {..} = do |
80 | writeBytes (blkInterval (pieceSize st) blk) (Lazy.fromChunks [blkData]) payload | 98 | -- let blkIx = undefined |
81 | 99 | -- bm <- readTVarIO blocks | |
82 | -- TODO | 100 | -- unless (member blkIx bm) $ do |
101 | writeBytes (blkInterval (pieceSize st) blk) | ||
102 | (Lazy.fromChunks [blkData]) | ||
103 | payload | ||
104 | -- when (undefined bm blkIx) $ do | ||
105 | -- if checkPiece ci piIx piece | ||
106 | -- then return True | ||
107 | -- else do | ||
108 | -- reset | ||
109 | -- return False | ||
110 | |||
111 | -- | Read a block by given block index. If lower or upper bound out of | ||
112 | -- range then index is clipped. | ||
83 | getBlk :: BlockIx -> Storage -> IO Block | 113 | getBlk :: BlockIx -> Storage -> IO Block |
84 | getBlk ix @ BlockIx {..} st @ Storage {..} = do | 114 | getBlk ix @ BlockIx {..} st @ Storage {..} = do |
85 | bs <- readBytes (ixInterval (pieceSize st) ix) payload | 115 | bs <- readBytes (ixInterval (pieceSize st) ix) payload |
86 | return $ Block ixPiece ixOffset (Lazy.toStrict bs) | 116 | return $ Block ixPiece ixOffset (Lazy.toStrict bs) |
87 | 117 | ||
118 | -- | Should be used to verify piece. | ||
119 | getPiece :: PieceIx -> Storage -> IO ByteString | ||
120 | getPiece ix st = blkData <$> getBlk (BlockIx ix 0 (pieceSize st)) st | ||
121 | |||
122 | {----------------------------------------------------------------------- | ||
123 | Internal | ||
124 | -----------------------------------------------------------------------} | ||
125 | |||
88 | ixInterval :: Int -> BlockIx -> FixedInterval | 126 | ixInterval :: Int -> BlockIx -> FixedInterval |
89 | ixInterval pieceSize BlockIx {..} = | 127 | ixInterval pieceSize BlockIx {..} = |
90 | interval (ixPiece * pieceSize + ixOffset) ixLength | 128 | interval (ixPiece * pieceSize + ixOffset) ixLength |