diff options
Diffstat (limited to 'src/System/Torrent')
-rw-r--r-- | src/System/Torrent/Storage.hs | 135 |
1 files changed, 111 insertions, 24 deletions
diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs index a5529fe6..cb0494e8 100644 --- a/src/System/Torrent/Storage.hs +++ b/src/System/Torrent/Storage.hs | |||
@@ -20,29 +20,33 @@ | |||
20 | {-# LANGUAGE RecordWildCards #-} | 20 | {-# LANGUAGE RecordWildCards #-} |
21 | module System.Torrent.Storage | 21 | module System.Torrent.Storage |
22 | ( Storage | 22 | ( Storage |
23 | , ppStorage | ||
23 | 24 | ||
24 | -- * Construction | 25 | -- * Construction |
25 | , bindTo, unbind, withStorage | 26 | , bindTo, unbind, withStorage |
26 | 27 | ||
27 | -- * Modification | 28 | -- * Modification |
28 | , getBlk, putBlk | 29 | , getBlk, putBlk, selBlk |
29 | ) where | 30 | ) where |
30 | 31 | ||
31 | import Control.Applicative | 32 | import Control.Applicative |
32 | import Control.Concurrent.STM | 33 | import Control.Concurrent.STM |
33 | import Control.Exception | 34 | import Control.Exception |
34 | import Control.Monad | 35 | import Control.Monad |
36 | import Control.Monad.Trans | ||
37 | |||
35 | import Data.ByteString as B | 38 | import Data.ByteString as B |
36 | import qualified Data.ByteString.Lazy as Lazy | 39 | import qualified Data.ByteString.Lazy as Lazy |
37 | import Data.List as L | 40 | import Data.List as L |
41 | import Text.PrettyPrint | ||
38 | import System.FilePath | 42 | import System.FilePath |
39 | import System.Directory | 43 | import System.Directory |
40 | 44 | ||
41 | import Data.Bitfield | 45 | import Data.Bitfield as BF |
42 | import Data.Torrent | 46 | import Data.Torrent |
43 | import Network.BitTorrent.Exchange.Protocol | 47 | import Network.BitTorrent.Exchange.Protocol |
44 | import Network.BitTorrent.Internal | 48 | import Network.BitTorrent.Internal |
45 | import System.IO.MMap.Fixed | 49 | import System.IO.MMap.Fixed as Fixed |
46 | 50 | ||
47 | 51 | ||
48 | data Storage = Storage { | 52 | data Storage = Storage { |
@@ -51,14 +55,21 @@ data Storage = Storage { | |||
51 | 55 | ||
52 | -- | | 56 | -- | |
53 | , blocks :: !(TVar Bitfield) | 57 | , blocks :: !(TVar Bitfield) |
58 | -- TODO use bytestring for fast serialization | ||
59 | -- because we need to write this bitmap to disc periodically | ||
60 | |||
61 | |||
62 | , blockSize :: !Int | ||
54 | 63 | ||
55 | -- | Used to map linear block addresses to disjoint | 64 | -- | Used to map linear block addresses to disjoint |
56 | -- mallocated/mmaped adresses. | 65 | -- mallocated/mmaped adresses. |
57 | , payload :: !Fixed | 66 | , payload :: !Fixed |
58 | } | 67 | } |
59 | 68 | ||
60 | pieceSize :: Storage -> Int | 69 | ppStorage :: Storage -> IO Doc |
61 | pieceSize = ciPieceLength . tInfo . torrentMeta . session | 70 | ppStorage Storage {..} = pp <$> readTVarIO blocks |
71 | where | ||
72 | pp bf = int blockSize | ||
62 | 73 | ||
63 | {----------------------------------------------------------------------- | 74 | {----------------------------------------------------------------------- |
64 | Construction | 75 | Construction |
@@ -67,9 +78,15 @@ pieceSize = ciPieceLength . tInfo . torrentMeta . session | |||
67 | -- TODO doc args | 78 | -- TODO doc args |
68 | bindTo :: SwarmSession -> FilePath -> IO Storage | 79 | bindTo :: SwarmSession -> FilePath -> IO Storage |
69 | bindTo se @ SwarmSession {..} contentPath = do | 80 | bindTo se @ SwarmSession {..} contentPath = do |
70 | let content_paths = contentLayout contentPath (tInfo torrentMeta) | 81 | let contentInfo = tInfo torrentMeta |
82 | let content_paths = contentLayout contentPath contentInfo | ||
71 | mapM_ mkDir (L.map fst content_paths) | 83 | mapM_ mkDir (L.map fst content_paths) |
72 | Storage se <$> newTVarIO (haveNone (ciPieceLength (tInfo torrentMeta))) | 84 | |
85 | let pieceLen = pieceLength se | ||
86 | let blockSize = min defaultBlockSize pieceLen | ||
87 | print $ "content length " ++ show (contentLength contentInfo) | ||
88 | Storage se <$> newTVarIO (haveNone (blockCount blockSize contentInfo)) | ||
89 | <*> pure blockSize | ||
73 | <*> coalesceFiles content_paths | 90 | <*> coalesceFiles content_paths |
74 | where | 91 | where |
75 | mkDir path = do | 92 | mkDir path = do |
@@ -90,43 +107,113 @@ withStorage se path = bracket (se `bindTo` path) unbind | |||
90 | -----------------------------------------------------------------------} | 107 | -----------------------------------------------------------------------} |
91 | 108 | ||
92 | -- TODO to avoid races we might need to try Control.Concurrent.yield | 109 | -- TODO to avoid races we might need to try Control.Concurrent.yield |
93 | -- TODO lazy block payload | 110 | -- TODO make block_payload :: Lazy.ByteString |
111 | |||
112 | selBlk :: MonadIO m => PieceIx -> Storage -> m [BlockIx] | ||
113 | selBlk pix st @ Storage {..} = liftIO $ atomically $ do | ||
114 | mask <- pieceMask pix st | ||
115 | select mask <$> readTVar blocks | ||
116 | where | ||
117 | select mask = fmap mkBix . toList . difference mask | ||
118 | -- TODO clip upper bound of block index | ||
119 | mkBix ix = BlockIx pix (blockSize * (ix - offset)) blockSize | ||
120 | |||
121 | offset = coeff * pix | ||
122 | coeff = pieceLength session `div` blockSize | ||
123 | |||
124 | -- | ||
125 | -- TODO make global lock map -- otherwise we might get broken pieces | ||
126 | -- | ||
127 | -- imagine the following situation: | ||
128 | -- | ||
129 | -- thread1: write | ||
130 | -- thread1: mark | ||
131 | -- | ||
132 | -- this let us avoid races as well | ||
133 | -- | ||
94 | 134 | ||
95 | -- | Write a block to the storage. If block out of range then block is clipped. | 135 | -- | Write a block to the storage. If block out of range then block is clipped. |
96 | putBlk :: Block -> Storage -> IO () | 136 | -- |
97 | putBlk blk @ Block {..} st @ Storage {..} = do | 137 | -- |
138 | -- | ||
139 | putBlk :: MonadIO m => Block -> Storage -> m Bool | ||
140 | putBlk blk @ Block {..} st @ Storage {..} = liftIO $ do | ||
98 | -- let blkIx = undefined | 141 | -- let blkIx = undefined |
99 | -- bm <- readTVarIO blocks | 142 | -- bm <- readTVarIO blocks |
100 | -- unless (member blkIx bm) $ do | 143 | -- unless (member blkIx bm) $ do |
101 | writeBytes (blkInterval (pieceSize st) blk) | 144 | writeBytes (blkInterval (pieceLength session) blk) |
102 | (Lazy.fromChunks [blkData]) | 145 | (Lazy.fromChunks [blkData]) |
103 | payload | 146 | payload |
104 | -- when (undefined bm blkIx) $ do | 147 | |
105 | -- if checkPiece ci piIx piece | 148 | markBlock blk st |
106 | -- then return True | 149 | validatePiece blkPiece st |
107 | -- else do | 150 | |
108 | -- reset | 151 | markBlock :: Block -> Storage -> IO () |
109 | -- return False | 152 | markBlock Block {..} Storage {..} = do |
153 | let piLen = pieceLength session | ||
154 | let glIx = (piLen `div` blockSize) * blkPiece + (blkOffset `div` blockSize) | ||
155 | atomically $ modifyTVar' blocks (have glIx) | ||
110 | 156 | ||
111 | -- | Read a block by given block index. If lower or upper bound out of | 157 | -- | Read a block by given block index. If lower or upper bound out of |
112 | -- range then index is clipped. | 158 | -- range then index is clipped. |
113 | getBlk :: BlockIx -> Storage -> IO Block | 159 | -- |
114 | getBlk ix @ BlockIx {..} st @ Storage {..} = do | 160 | -- Do not block. |
115 | bs <- readBytes (ixInterval (pieceSize st) ix) payload | 161 | -- |
162 | getBlk :: MonadIO m => BlockIx -> Storage -> m Block | ||
163 | getBlk ix @ BlockIx {..} st @ Storage {..} = liftIO $ do | ||
164 | -- TODO check if __piece__ is available | ||
165 | bs <- readBytes (ixInterval (pieceLength session) ix) payload | ||
116 | return $ Block ixPiece ixOffset (Lazy.toStrict bs) | 166 | return $ Block ixPiece ixOffset (Lazy.toStrict bs) |
117 | 167 | ||
118 | -- | Should be used to verify piece. | ||
119 | getPiece :: PieceIx -> Storage -> IO ByteString | 168 | getPiece :: PieceIx -> Storage -> IO ByteString |
120 | getPiece ix st = blkData <$> getBlk (BlockIx ix 0 (pieceSize st)) st | 169 | getPiece pix st @ Storage {..} = do |
170 | let pieceLen = pieceLength session | ||
171 | let bix = BlockIx pix 0 (pieceLength session) | ||
172 | bs <- readBytes (ixInterval pieceLen bix) payload | ||
173 | return (Lazy.toStrict bs) | ||
174 | |||
175 | resetPiece :: PieceIx -> Storage -> IO () | ||
176 | resetPiece pix st @ Storage {..} = atomically $ do | ||
177 | mask <- pieceMask pix st | ||
178 | modifyTVar' blocks (`difference` mask) | ||
179 | |||
180 | validatePiece :: PieceIx -> Storage -> IO Bool | ||
181 | validatePiece pix st @ Storage {..} = do | ||
182 | downloaded <- atomically $ isDownloaded pix st | ||
183 | if not downloaded then return False | ||
184 | else do | ||
185 | print $ show pix ++ " downloaded" | ||
186 | piece <- getPiece pix st | ||
187 | if checkPiece (tInfo (torrentMeta session)) pix piece | ||
188 | then return True | ||
189 | else do | ||
190 | print $ "----------------------------- invalid " ++ show pix | ||
191 | -- resetPiece pix st | ||
192 | return True | ||
121 | 193 | ||
122 | {----------------------------------------------------------------------- | 194 | {----------------------------------------------------------------------- |
123 | Internal | 195 | Internal |
124 | -----------------------------------------------------------------------} | 196 | -----------------------------------------------------------------------} |
125 | 197 | ||
198 | isDownloaded :: PieceIx -> Storage -> STM Bool | ||
199 | isDownloaded pix st @ Storage {..} = do | ||
200 | bf <- readTVar blocks | ||
201 | mask <- pieceMask pix st | ||
202 | return $ intersection mask bf == mask | ||
203 | |||
204 | pieceMask :: PieceIx -> Storage -> STM Bitfield | ||
205 | pieceMask pix Storage {..} = do | ||
206 | bf <- readTVar blocks | ||
207 | return $ BF.interval (totalCount bf) offset (offset + coeff - 1) | ||
208 | where | ||
209 | offset = coeff * pix | ||
210 | coeff = pieceLength session `div` blockSize | ||
211 | |||
212 | |||
126 | ixInterval :: Int -> BlockIx -> FixedInterval | 213 | ixInterval :: Int -> BlockIx -> FixedInterval |
127 | ixInterval pieceSize BlockIx {..} = | 214 | ixInterval pieceSize BlockIx {..} = |
128 | interval (ixPiece * pieceSize + ixOffset) ixLength | 215 | Fixed.interval (ixPiece * pieceSize + ixOffset) ixLength |
129 | 216 | ||
130 | blkInterval :: Int -> Block -> FixedInterval | 217 | blkInterval :: Int -> Block -> FixedInterval |
131 | blkInterval pieceSize Block {..} = | 218 | blkInterval pieceSize Block {..} = |
132 | interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file | 219 | Fixed.interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file |