diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2013-10-14 05:11:46 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2013-10-14 05:11:46 +0400 |
commit | b5f222ba7dfa1fa53b8b53f4e1b770193bb55fe4 (patch) | |
tree | f24b2f677d78a05139b9e190a60ea75bc64f49b4 /src/System/Torrent | |
parent | 9737a06bff6c6539a6afd67f7970a6923b401d86 (diff) |
Move some modules from torrent-content
Diffstat (limited to 'src/System/Torrent')
-rw-r--r-- | src/System/Torrent/Storage.hs | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs new file mode 100644 index 00000000..16f888bf --- /dev/null +++ b/src/System/Torrent/Storage.hs | |||
@@ -0,0 +1,336 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam T. 2013 | ||
3 | -- License : MIT | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : non-portable | ||
7 | -- | ||
8 | -- This module implements mapping from single continious block space | ||
9 | -- to file storage. Storage can be used in two modes: | ||
10 | -- | ||
11 | -- * As in memory storage - in this case we don't touch filesystem. | ||
12 | -- | ||
13 | -- * As ordinary mmaped file storage - when we need to store | ||
14 | -- data in the filesystem. | ||
15 | -- | ||
16 | {-# LANGUAGE DoAndIfThenElse #-} | ||
17 | {-# LANGUAGE NamedFieldPuns #-} | ||
18 | {-# LANGUAGE RecordWildCards #-} | ||
19 | module System.Torrent.Storage | ||
20 | ( Storage (metainfo) | ||
21 | , ppStorage | ||
22 | |||
23 | -- * Construction | ||
24 | , openStorage, closeStorage, withStorage | ||
25 | , getCompleteBitfield | ||
26 | |||
27 | -- * Modification | ||
28 | , getBlk, putBlk, selBlk | ||
29 | , getPiece, validatePiece | ||
30 | |||
31 | -- * TODO expose only File interface! | ||
32 | -- * File interface | ||
33 | , FD | ||
34 | , openFD, flushFD, closeFD | ||
35 | , readFD, writeFD | ||
36 | ) where | ||
37 | |||
38 | import Control.Applicative | ||
39 | import Control.Concurrent.STM | ||
40 | import Control.Exception | ||
41 | import Control.Monad | ||
42 | import Control.Monad.Trans | ||
43 | |||
44 | import Data.ByteString as B | ||
45 | import qualified Data.ByteString.Lazy as Lazy | ||
46 | import Text.PrettyPrint | ||
47 | import System.FilePath | ||
48 | import System.Directory | ||
49 | import Foreign.C.Error | ||
50 | |||
51 | import Data.Torrent.Bitfield as BF | ||
52 | import Data.Torrent.Block | ||
53 | import Data.Torrent.Metainfo | ||
54 | import System.IO.MMap.Fixed as Fixed | ||
55 | |||
56 | -- TODO merge piece validation and Sessions.available into one transaction. | ||
57 | data Storage = Storage { | ||
58 | -- | | ||
59 | metainfo :: !Torrent | ||
60 | |||
61 | -- | Bitmask of complete and verified _pieces_. | ||
62 | , complete :: !(TVar Bitfield) | ||
63 | |||
64 | -- | Bitmask of complete _blocks_. | ||
65 | , blocks :: !(TVar Bitfield) | ||
66 | -- TODO use bytestring for fast serialization | ||
67 | -- because we need to write this bitmap to disc periodically | ||
68 | |||
69 | , blockSize :: !Int | ||
70 | |||
71 | -- | Used to map linear block addresses to disjoint | ||
72 | -- mallocated/mmaped adresses. | ||
73 | , payload :: !Fixed | ||
74 | } | ||
75 | |||
76 | ppStorage :: Storage -> IO Doc | ||
77 | ppStorage Storage {..} = pp <$> readTVarIO blocks | ||
78 | where | ||
79 | pp bf = int blockSize | ||
80 | |||
81 | getCompleteBitfield :: Storage -> STM Bitfield | ||
82 | getCompleteBitfield Storage {..} = readTVar complete | ||
83 | |||
84 | {----------------------------------------------------------------------- | ||
85 | Construction | ||
86 | -----------------------------------------------------------------------} | ||
87 | |||
88 | -- TODO doc args | ||
89 | openStorage :: Torrent -> FilePath -> Bitfield -> IO Storage | ||
90 | openStorage t @ Torrent {..} contentPath bf = do | ||
91 | let content_paths = contentLayout contentPath tInfo | ||
92 | mapM_ (mkDir . fst) content_paths | ||
93 | |||
94 | let blockSize = defaultBlockSize `min` ciPieceLength tInfo | ||
95 | Storage t <$> newTVarIO bf | ||
96 | <*> newTVarIO (haveNone (blockCount blockSize tInfo)) | ||
97 | <*> pure blockSize | ||
98 | <*> coalesceFiles content_paths | ||
99 | where | ||
100 | mkDir path = do | ||
101 | let dirPath = fst (splitFileName path) | ||
102 | exist <- doesDirectoryExist dirPath | ||
103 | unless exist $ do | ||
104 | createDirectoryIfMissing True dirPath | ||
105 | |||
106 | -- TODO | ||
107 | closeStorage :: Storage -> IO () | ||
108 | closeStorage st = return () | ||
109 | |||
110 | |||
111 | withStorage :: Torrent -> FilePath -> Bitfield -> (Storage -> IO a) -> IO a | ||
112 | withStorage se path bf = bracket (openStorage se path bf) closeStorage | ||
113 | |||
114 | {----------------------------------------------------------------------- | ||
115 | Modification | ||
116 | -----------------------------------------------------------------------} | ||
117 | |||
118 | -- TODO to avoid races we might need to try Control.Concurrent.yield | ||
119 | -- TODO make block_payload :: Lazy.ByteString | ||
120 | |||
121 | selBlk :: MonadIO m => PieceIx -> Storage -> m [BlockIx] | ||
122 | selBlk pix st @ Storage {..} | ||
123 | = liftIO $ {-# SCC selBlk #-} atomically $ do | ||
124 | mask <- pieceMask pix st | ||
125 | select mask <$> readTVar blocks | ||
126 | where | ||
127 | select mask = fmap mkBix . toList . difference mask | ||
128 | -- TODO clip upper bound of block index | ||
129 | mkBix ix = BlockIx pix (blockSize * (ix - offset)) blockSize | ||
130 | |||
131 | offset = coeff * pix | ||
132 | coeff = ciPieceLength (tInfo metainfo) `div` blockSize | ||
133 | |||
134 | -- | ||
135 | -- TODO make global lock map -- otherwise we might get broken pieces | ||
136 | -- | ||
137 | -- imagine the following situation: | ||
138 | -- | ||
139 | -- thread1: write | ||
140 | -- thread1: mark | ||
141 | -- | ||
142 | -- this let us avoid races as well | ||
143 | -- | ||
144 | |||
145 | -- | Write a block to the storage. If block out of range then block is clipped. | ||
146 | -- | ||
147 | -- | ||
148 | -- | ||
149 | putBlk :: MonadIO m => Block -> Storage -> m Bool | ||
150 | putBlk blk @ Block {..} st @ Storage {..} | ||
151 | = liftIO $ {-# SCC putBlk #-} do | ||
152 | -- let blkIx = undefined | ||
153 | -- bm <- readTVarIO blocks | ||
154 | -- unless (member blkIx bm) $ do | ||
155 | writeBytes (blkInterval (ciPieceLength (tInfo metainfo)) blk) blkData payload | ||
156 | |||
157 | markBlock blk st | ||
158 | completePiece blkPiece st | ||
159 | |||
160 | markBlock :: Block -> Storage -> IO () | ||
161 | markBlock Block {..} Storage {..} = {-# SCC markBlock #-} do | ||
162 | let piLen = ciPieceLength (tInfo metainfo) | ||
163 | let glIx = (piLen `div` blockSize) * blkPiece + (blkOffset `div` blockSize) | ||
164 | atomically $ modifyTVar' blocks (have glIx) | ||
165 | |||
166 | -- | Read a block by given block index. If lower or upper bound out of | ||
167 | -- range then index is clipped. | ||
168 | -- | ||
169 | -- Do not block. | ||
170 | -- | ||
171 | getBlk :: MonadIO m => BlockIx -> Storage -> m Block | ||
172 | getBlk ix @ BlockIx {..} st @ Storage {..} | ||
173 | = liftIO $ {-# SCC getBlk #-} do | ||
174 | -- TODO check if __piece__ is available | ||
175 | let piLen = ciPieceLength (tInfo metainfo) | ||
176 | bs <- readBytes (ixInterval piLen ix) payload | ||
177 | return $ Block ixPiece ixOffset bs | ||
178 | |||
179 | getPiece :: PieceIx -> Storage -> IO Lazy.ByteString | ||
180 | getPiece pix st @ Storage {..} = {-# SCC getPiece #-} do | ||
181 | let piLen = ciPieceLength (tInfo metainfo) | ||
182 | let bix = BlockIx pix 0 piLen | ||
183 | let bs = viewBytes (ixInterval piLen bix) payload | ||
184 | return $ bs | ||
185 | |||
186 | resetPiece :: PieceIx -> Storage -> IO () | ||
187 | resetPiece pix st @ Storage {..} | ||
188 | = {-# SCC resetPiece #-} atomically $ do | ||
189 | mask <- pieceMask pix st | ||
190 | modifyTVar' blocks (`difference` mask) | ||
191 | |||
192 | validatePiece :: Storage -> PieceIx -> IO Bool | ||
193 | validatePiece storage pix = do | ||
194 | checkPiece (tInfo (metainfo storage)) pix <$> getPiece pix storage | ||
195 | |||
196 | completePiece :: PieceIx -> Storage -> IO Bool | ||
197 | completePiece pix st @ Storage {..} = {-# SCC completePiece #-} do | ||
198 | downloaded <- atomically $ isDownloaded pix st | ||
199 | if not downloaded then return False | ||
200 | else do | ||
201 | piece <- getPiece pix st | ||
202 | if checkPiece (tInfo metainfo) pix piece | ||
203 | then do | ||
204 | atomically $ modifyTVar' complete (BF.have pix) | ||
205 | return True | ||
206 | else do | ||
207 | print $ "----------------------------- invalid " ++ show pix | ||
208 | -- resetPiece pix st | ||
209 | return True | ||
210 | |||
211 | -- | Check each piece in the storage against content info hash. | ||
212 | -- | ||
213 | -- Note that this function will block until each the entire storage | ||
214 | -- checked. This may take a long time for a big torrents use fork | ||
215 | -- if needed. | ||
216 | -- | ||
217 | validateStorage :: Storage -> IO () | ||
218 | validateStorage st = undefined -- (`validatePiece` st) [0..pieceCount st] | ||
219 | |||
220 | {----------------------------------------------------------------------- | ||
221 | POSIX-like file interface | ||
222 | ------------------------------------------------------------------------ | ||
223 | This is useful for virtual filesystem writers and just for per file | ||
224 | interface. | ||
225 | -----------------------------------------------------------------------} | ||
226 | -- TODO reference counting: storage might be closed before all FDs | ||
227 | -- gets closed! | ||
228 | -- or we can forbid to close storage and use finalizers only? | ||
229 | |||
230 | type Offset = Int | ||
231 | type Size = Int | ||
232 | |||
233 | data FD = FD { | ||
234 | fdData :: ByteString | ||
235 | , fdNoBlock :: Bool | ||
236 | } | ||
237 | |||
238 | |||
239 | -- TODO return "is dir" error | ||
240 | -- | This call correspond to open(2) with the following parameters: | ||
241 | -- | ||
242 | -- * OpenMode = ReadOnly; | ||
243 | -- | ||
244 | -- * OpenFileFlags = O_NONBLOCK. (not true yet) | ||
245 | -- | ||
246 | openFD :: FilePath -> Bool -> Storage -> IO (Either Errno FD) | ||
247 | openFD path nonblock Storage {..} | ||
248 | | Just offset <- fileOffset path (tInfo metainfo) | ||
249 | , Just bs <- lookupRegion (fromIntegral offset) payload | ||
250 | = return $ Right $ FD bs nonblock | ||
251 | | otherwise = return $ Left $ eNOENT | ||
252 | |||
253 | -- | Cancel all enqueued read operations and report any delayed | ||
254 | -- errors. | ||
255 | flushFD :: FD -> IO Errno | ||
256 | flushFD _ = return eOK | ||
257 | |||
258 | -- | This call correspond to close(2). | ||
259 | closeFD :: FD -> IO () | ||
260 | closeFD _ = return () | ||
261 | |||
262 | -- TODO | ||
263 | maskRegion :: FD -> Offset -> Size -> Maybe Size | ||
264 | maskRegion FD {..} offset siz = return siz | ||
265 | |||
266 | -- TODO | ||
267 | isComplete :: FD -> Offset -> Size -> IO Size | ||
268 | isComplete _ _ siz = return siz | ||
269 | |||
270 | -- TODO | ||
271 | enqueueRead :: FD -> Offset -> Size -> IO () | ||
272 | enqueueRead _ _ _ = return () | ||
273 | |||
274 | -- TODO | ||
275 | readAhead :: FD -> Offset -> Size -> IO () | ||
276 | readAhead _ _ _ = return () | ||
277 | |||
278 | -- TODO | ||
279 | waitRegion :: FD -> Offset -> Size -> IO ByteString | ||
280 | waitRegion _ _ _ = return B.empty | ||
281 | |||
282 | -- TODO implement blocking and non blocking modes? | ||
283 | -- TODO check if region completely downloaded | ||
284 | -- TODO if not we could return EAGAIN | ||
285 | -- TODO enqueue read to piece manager | ||
286 | -- | This call correspond to pread(2). | ||
287 | readFD :: FD -> Offset -> Size -> IO (Either Errno ByteString) | ||
288 | readFD fd @ FD {..} offset reqSize = | ||
289 | case maskRegion fd offset reqSize of | ||
290 | Nothing -> return $ Right B.empty | ||
291 | Just expSize -> do | ||
292 | availSize <- isComplete fd offset expSize | ||
293 | if availSize == expSize then haveAllReg expSize else haveSomeReg expSize | ||
294 | where | ||
295 | haveAllReg expSize = do | ||
296 | readAhead fd offset expSize | ||
297 | return $ Right $ slice offset expSize fdData | ||
298 | |||
299 | haveSomeReg expSize | ||
300 | | fdNoBlock = return $ Left $ eAGAIN | ||
301 | | otherwise = do | ||
302 | bs <- waitRegion fd offset expSize | ||
303 | readAhead fd offset expSize | ||
304 | return $ Right bs | ||
305 | |||
306 | -- TODO implement COW; needed for applications which want to change files. | ||
307 | writeFD :: FD -> ByteString -> Offset -> IO () | ||
308 | writeFD FD {..} bs offset = return () | ||
309 | |||
310 | {----------------------------------------------------------------------- | ||
311 | Internal | ||
312 | -----------------------------------------------------------------------} | ||
313 | |||
314 | isDownloaded :: PieceIx -> Storage -> STM Bool | ||
315 | isDownloaded pix st @ Storage {..} = do | ||
316 | bf <- readTVar blocks | ||
317 | mask <- pieceMask pix st | ||
318 | return $ intersection mask bf == mask | ||
319 | |||
320 | pieceMask :: PieceIx -> Storage -> STM Bitfield | ||
321 | pieceMask pix Storage {..} = do | ||
322 | bf <- readTVar blocks | ||
323 | return $ BF.interval (totalCount bf) offset (offset + coeff - 1) | ||
324 | where | ||
325 | offset = coeff * pix | ||
326 | coeff = ciPieceLength (tInfo metainfo) `div` blockSize | ||
327 | |||
328 | |||
329 | ixInterval :: Int -> BlockIx -> FixedInterval | ||
330 | ixInterval pieceSize BlockIx {..} = | ||
331 | Fixed.interval (ixPiece * pieceSize + ixOffset) ixLength | ||
332 | |||
333 | blkInterval :: Int -> Block -> FixedInterval | ||
334 | blkInterval pieceSize Block {..} = | ||
335 | Fixed.interval (blkPiece * pieceSize + blkOffset) | ||
336 | (fromIntegral (Lazy.length blkData)) \ No newline at end of file | ||