From d6a0442a56d7b977d5f1d1d162517c9086c413eb Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 3 Dec 2013 16:15:32 +0400 Subject: New storage --- src/System/Torrent/FileMap.hs | 151 ++++++++++++++++++ src/System/Torrent/Storage.hs | 345 +++++------------------------------------- 2 files changed, 189 insertions(+), 307 deletions(-) create mode 100644 src/System/Torrent/FileMap.hs (limited to 'src/System/Torrent') diff --git a/src/System/Torrent/FileMap.hs b/src/System/Torrent/FileMap.hs new file mode 100644 index 00000000..80907a30 --- /dev/null +++ b/src/System/Torrent/FileMap.hs @@ -0,0 +1,151 @@ +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ViewPatterns #-} +{-# OPTIONS -fno-warn-orphans #-} +module System.Torrent.FileMap + ( FileMap + + -- * Construction + , Mode (..) + , def + , mmapFiles + , unmapFiles + + -- * Query + , System.Torrent.FileMap.size + + -- * Modification + , readBytes + , writeBytes + , unsafeReadBytes + + -- * Unsafe conversions + , fromLazyByteString + , toLazyByteString + ) where + +import Control.Applicative +import Control.Monad as L +import Data.ByteString as BS +import Data.ByteString.Internal as BS +import Data.ByteString.Lazy as BL +import Data.ByteString.Lazy.Internal as BL +import Data.Default +import Data.Vector as V -- TODO use unboxed vector +import Foreign +import System.IO.MMap + +import Data.Torrent.Layout + + +data FileEntry = FileEntry + { filePosition :: {-# UNPACK #-} !FileOffset + , fileBytes :: {-# UNPACK #-} !BS.ByteString + } deriving (Show, Eq) + +type FileMap = Vector FileEntry + +instance Default Mode where + def = ReadWriteEx + +mmapFiles :: Mode -> FileLayout FileSize -> IO FileMap +mmapFiles mode layout = V.fromList <$> L.mapM mkEntry (accumPositions layout) + where + mkEntry (path, (pos, expectedSize)) = do + let esize = fromIntegral expectedSize -- FIXME does this safe? + (fptr, moff, msize) <- mmapFileForeignPtr path mode $ Just (0, esize) + if msize /= esize + then error "mmapFiles" -- TODO unmap mapped files on exception + else return $ FileEntry pos (PS fptr moff msize) + +unmapFiles :: FileMap -> IO () +unmapFiles = V.mapM_ unmapEntry + where + unmapEntry (FileEntry _ (PS fptr _ _)) = finalizeForeignPtr fptr + +fromLazyByteString :: BL.ByteString -> FileMap +fromLazyByteString lbs = V.unfoldr f (0, lbs) + where + f (_, Empty ) = Nothing + f (pos, Chunk x xs) = Just (FileEntry pos x, ((pos + chunkSize), xs)) + where chunkSize = fromIntegral $ BS.length x + +-- | /O(n)/. +toLazyByteString :: FileMap -> BL.ByteString +toLazyByteString = V.foldr f Empty + where + f FileEntry {..} bs = Chunk fileBytes bs + +-- | /O(1)/. +size :: FileMap -> FileOffset +size m + | V.null m = 0 + | FileEntry {..} <- V.unsafeLast m + = filePosition + fromIntegral (BS.length fileBytes) + +bsearch :: FileOffset -> FileMap -> Maybe Int +bsearch x m + | V.null m = Nothing + | otherwise = branch (V.length m `div` 2) + where + branch c @ ((m !) -> FileEntry {..}) + | x < filePosition = bsearch x (V.take c m) + | x >= filePosition + fileSize = do + ix <- bsearch x (V.drop (succ c) m) + return $ succ c + ix + | otherwise = Just c + where + fileSize = fromIntegral (BS.length fileBytes) + +-- | /O(log n)/. +drop :: FileOffset -> FileMap -> (FileSize, FileMap) +drop off m + | Just ix <- bsearch off m + , FileEntry {..} <- m ! ix = (off - filePosition, V.drop ix m) + | otherwise = (0 , V.empty) + +-- | /O(log n)/. +take :: FileSize -> FileMap -> (FileMap, FileSize) +take len m + | len >= s = (m , 0) + | Just ix <- bsearch (pred len) m = let m' = V.take (succ ix) m + in (m', System.Torrent.FileMap.size m' - len) + | otherwise = (V.empty , 0) + where + s = System.Torrent.FileMap.size m + +-- | /O(log n + m)/. Do not use this function with 'unmapFiles'. +unsafeReadBytes :: FileOffset -> FileSize -> FileMap -> BL.ByteString +unsafeReadBytes off s m + | (l , m') <- System.Torrent.FileMap.drop off m + , (m'', _ ) <- System.Torrent.FileMap.take (off + s) m' + = BL.take (fromIntegral s) $ BL.drop (fromIntegral l) $ toLazyByteString m'' + +readBytes :: FileOffset -> FileSize -> FileMap -> IO BL.ByteString +readBytes off s m = do + let bs_copy = BL.copy $ unsafeReadBytes off s m + forceLBS bs_copy + return bs_copy + where + forceLBS Empty = return () + forceLBS (Chunk _ x) = forceLBS x + +bscpy :: BL.ByteString -> BL.ByteString -> IO () +bscpy (PS _ _ 0 `Chunk` dest_rest) src = bscpy dest_rest src +bscpy dest (PS _ _ 0 `Chunk` src_rest) = bscpy dest src_rest +bscpy (PS dest_fptr dest_off dest_size `Chunk` dest_rest) + (PS src_fptr src_off src_size `Chunk` src_rest) + = do let csize = min dest_size src_size + withForeignPtr dest_fptr $ \dest_ptr -> + withForeignPtr src_fptr $ \src_ptr -> + memcpy (dest_ptr `advancePtr` dest_off) + (src_ptr `advancePtr` src_off) + (fromIntegral csize) -- TODO memmove? + bscpy (PS dest_fptr (dest_off + csize) (dest_size - csize) `Chunk` dest_rest) + (PS src_fptr (src_off + csize) (src_size - csize) `Chunk` src_rest) +bscpy _ _ = return () + +writeBytes :: FileOffset -> BL.ByteString -> FileMap -> IO () +writeBytes off lbs m = bscpy dest src + where + src = BL.take (fromIntegral (BL.length dest)) lbs + dest = unsafeReadBytes off (fromIntegral (BL.length lbs)) m \ No newline at end of file diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs index 2225b0a3..bb6c5d2e 100644 --- a/src/System/Torrent/Storage.hs +++ b/src/System/Torrent/Storage.hs @@ -1,11 +1,11 @@ -- | --- Copyright : (c) Sam T. 2013 --- License : MIT +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental --- Portability : non-portable +-- Portability : portable -- --- This module implements mapping from single continious block space +-- This module implements mapping from single continious piece space -- to file storage. Storage can be used in two modes: -- -- * As in memory storage - in this case we don't touch filesystem. @@ -13,324 +13,55 @@ -- * As ordinary mmaped file storage - when we need to store -- data in the filesystem. -- -{-# LANGUAGE DoAndIfThenElse #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE RecordWildCards #-} module System.Torrent.Storage - ( Storage (metainfo) - , ppStorage + ( Storage -- * Construction - , openStorage, closeStorage, withStorage - , getCompleteBitfield + , Mode (..) + , def + , open + , close -- * Modification - , getBlk, putBlk, selBlk - , getPiece, validatePiece - - -- * TODO expose only File interface! - -- * File interface - , FD - , openFD, flushFD, closeFD - , readFD, writeFD + , writePiece + , readPiece + , unsafeReadPiece ) where import Control.Applicative -import Control.Concurrent.STM -import Control.Exception -import Control.Monad -import Control.Monad.Trans - -import Data.ByteString as B -import qualified Data.ByteString.Lazy as Lazy -import Text.PrettyPrint -import System.FilePath -import System.Directory -import Foreign.C.Error - -import Data.Torrent.Bitfield as BF -import Data.Torrent.Block -import Data.Torrent -import System.IO.MMap.Fixed as Fixed - --- TODO merge piece validation and Sessions.available into one transaction. -data Storage = Storage { - -- | - metainfo :: !Torrent +import Data.ByteString.Lazy as BL - -- | Bitmask of complete and verified _pieces_. - , complete :: !(TVar Bitfield) +import Data.Torrent.Layout +import Data.Torrent.Piece +import System.Torrent.FileMap - -- | Bitmask of complete _blocks_. - , blocks :: !(TVar Bitfield) - -- TODO use bytestring for fast serialization - -- because we need to write this bitmap to disc periodically - , blockSize :: !Int - - -- | Used to map linear block addresses to disjoint - -- mallocated/mmaped adresses. - , payload :: !Fixed +-- TODO validation +data Storage = Storage + { pieceLen :: {-# UNPACK #-} !PieceSize + , fileMap :: {-# UNPACK #-} !FileMap } -ppStorage :: Storage -> IO Doc -ppStorage Storage {..} = pp <$> readTVarIO blocks - where - pp bf = int blockSize - -getCompleteBitfield :: Storage -> STM Bitfield -getCompleteBitfield Storage {..} = readTVar complete - -{----------------------------------------------------------------------- - Construction ------------------------------------------------------------------------} +-- ResourceT ? +open :: Mode -> PieceSize -> FileLayout FileSize -> IO Storage +open mode s l = Storage s <$> mmapFiles mode l --- TODO doc args -openStorage :: Torrent -> FilePath -> Bitfield -> IO Storage -openStorage t @ Torrent {..} contentPath bf = do - let content_paths = contentLayout contentPath tInfo - mapM_ (mkDir . fst) content_paths +close :: Storage -> IO () +close Storage {..} = unmapFiles fileMap - let blockSize = defaultBlockSize `min` ciPieceLength tInfo - Storage t <$> newTVarIO bf - <*> newTVarIO (haveNone (blockCount blockSize tInfo)) - <*> pure blockSize - <*> coalesceFiles content_paths - where - mkDir path = do - let dirPath = fst (splitFileName path) - exist <- doesDirectoryExist dirPath - unless exist $ do - createDirectoryIfMissing True dirPath - --- TODO -closeStorage :: Storage -> IO () -closeStorage st = return () - - -withStorage :: Torrent -> FilePath -> Bitfield -> (Storage -> IO a) -> IO a -withStorage se path bf = bracket (openStorage se path bf) closeStorage +writePiece :: Piece BL.ByteString -> Storage -> IO () +writePiece Piece {..} Storage {..} = do + writeBytes (fromIntegral (pieceIndex * pieceLen)) pieceData fileMap -{----------------------------------------------------------------------- - Modification ------------------------------------------------------------------------} +readPiece :: PieceIx -> Storage -> IO (Piece BL.ByteString) +readPiece pix Storage {..} = do + bs <- readBytes (fromIntegral (pix * pieceLen)) + (fromIntegral pieceLen) fileMap + return $ Piece pix bs --- TODO to avoid races we might need to try Control.Concurrent.yield --- TODO make block_payload :: Lazy.ByteString - -selBlk :: MonadIO m => PieceIx -> Storage -> m [BlockIx] -selBlk pix st @ Storage {..} - = liftIO $ {-# SCC selBlk #-} atomically $ do - mask <- pieceMask pix st - select mask <$> readTVar blocks +unsafeReadPiece :: PieceIx -> Storage -> IO (Piece BL.ByteString) +unsafeReadPiece pix Storage {..} = return $ Piece pix lbs where - select mask = fmap mkBix . toList . difference mask - -- TODO clip upper bound of block index - mkBix ix = BlockIx pix (blockSize * (ix - offset)) blockSize - - offset = coeff * pix - coeff = ciPieceLength (tInfo metainfo) `div` blockSize - --- --- TODO make global lock map -- otherwise we might get broken pieces --- --- imagine the following situation: --- --- thread1: write --- thread1: mark --- --- this let us avoid races as well --- - --- | Write a block to the storage. If block out of range then block is clipped. --- --- --- -putBlk :: MonadIO m => Block -> Storage -> m Bool -putBlk blk @ Block {..} st @ Storage {..} - = liftIO $ {-# SCC putBlk #-} do --- let blkIx = undefined --- bm <- readTVarIO blocks --- unless (member blkIx bm) $ do - writeBytes (blkInterval (ciPieceLength (tInfo metainfo)) blk) blkData payload - - markBlock blk st - completePiece blkPiece st - -markBlock :: Block -> Storage -> IO () -markBlock Block {..} Storage {..} = {-# SCC markBlock #-} do - let piLen = ciPieceLength (tInfo metainfo) - let glIx = (piLen `div` blockSize) * blkPiece + (blkOffset `div` blockSize) - atomically $ modifyTVar' blocks (have glIx) - --- | Read a block by given block index. If lower or upper bound out of --- range then index is clipped. --- --- Do not block. --- -getBlk :: MonadIO m => BlockIx -> Storage -> m Block -getBlk ix @ BlockIx {..} st @ Storage {..} - = liftIO $ {-# SCC getBlk #-} do - -- TODO check if __piece__ is available - let piLen = ciPieceLength (tInfo metainfo) - bs <- readBytes (ixInterval piLen ix) payload - return $ Block ixPiece ixOffset bs - -getPiece :: PieceIx -> Storage -> IO Lazy.ByteString -getPiece pix st @ Storage {..} = {-# SCC getPiece #-} do - let piLen = ciPieceLength (tInfo metainfo) - let bix = BlockIx pix 0 piLen - let bs = viewBytes (ixInterval piLen bix) payload - return $ bs - -resetPiece :: PieceIx -> Storage -> IO () -resetPiece pix st @ Storage {..} - = {-# SCC resetPiece #-} atomically $ do - mask <- pieceMask pix st - modifyTVar' blocks (`difference` mask) - -validatePiece :: Storage -> PieceIx -> IO Bool -validatePiece storage pix = do - checkPiece (tInfo (metainfo storage)) pix <$> getPiece pix storage - -completePiece :: PieceIx -> Storage -> IO Bool -completePiece pix st @ Storage {..} = {-# SCC completePiece #-} do - downloaded <- atomically $ isDownloaded pix st - if not downloaded then return False - else do - piece <- getPiece pix st - if checkPiece (tInfo metainfo) pix piece - then do - atomically $ modifyTVar' complete (BF.have pix) - return True - else do - print $ "----------------------------- invalid " ++ show pix --- resetPiece pix st - return True - --- | Check each piece in the storage against content info hash. --- --- Note that this function will block until each the entire storage --- checked. This may take a long time for a big torrents ­ use fork --- if needed. --- -validateStorage :: Storage -> IO () -validateStorage st = undefined -- (`validatePiece` st) [0..pieceCount st] - -{----------------------------------------------------------------------- - POSIX-like file interface ------------------------------------------------------------------------- -This is useful for virtual filesystem writers and just for per file -interface. ------------------------------------------------------------------------} --- TODO reference counting: storage might be closed before all FDs --- gets closed! --- or we can forbid to close storage and use finalizers only? - -type Offset = Int -type Size = Int - -data FD = FD { - fdData :: ByteString - , fdNoBlock :: Bool - } - - --- TODO return "is dir" error --- | This call correspond to open(2) with the following parameters: --- --- * OpenMode = ReadOnly; --- --- * OpenFileFlags = O_NONBLOCK. (not true yet) --- -openFD :: FilePath -> Bool -> Storage -> IO (Either Errno FD) -openFD path nonblock Storage {..} - | Just offset <- fileOffset path (tInfo metainfo) - , Just bs <- lookupRegion (fromIntegral offset) payload - = return $ Right $ FD bs nonblock - | otherwise = return $ Left $ eNOENT - --- | Cancel all enqueued read operations and report any delayed --- errors. -flushFD :: FD -> IO Errno -flushFD _ = return eOK - --- | This call correspond to close(2). -closeFD :: FD -> IO () -closeFD _ = return () - --- TODO -maskRegion :: FD -> Offset -> Size -> Maybe Size -maskRegion FD {..} offset siz = return siz - --- TODO -isComplete :: FD -> Offset -> Size -> IO Size -isComplete _ _ siz = return siz - --- TODO -enqueueRead :: FD -> Offset -> Size -> IO () -enqueueRead _ _ _ = return () - --- TODO -readAhead :: FD -> Offset -> Size -> IO () -readAhead _ _ _ = return () - --- TODO -waitRegion :: FD -> Offset -> Size -> IO ByteString -waitRegion _ _ _ = return B.empty - --- TODO implement blocking and non blocking modes? --- TODO check if region completely downloaded --- TODO if not we could return EAGAIN --- TODO enqueue read to piece manager --- | This call correspond to pread(2). -readFD :: FD -> Offset -> Size -> IO (Either Errno ByteString) -readFD fd @ FD {..} offset reqSize = - case maskRegion fd offset reqSize of - Nothing -> return $ Right B.empty - Just expSize -> do - availSize <- isComplete fd offset expSize - if availSize == expSize then haveAllReg expSize else haveSomeReg expSize - where - haveAllReg expSize = do - readAhead fd offset expSize - return $ Right $ slice offset expSize fdData - - haveSomeReg expSize - | fdNoBlock = return $ Left $ eAGAIN - | otherwise = do - bs <- waitRegion fd offset expSize - readAhead fd offset expSize - return $ Right bs - --- TODO implement COW; needed for applications which want to change files. -writeFD :: FD -> ByteString -> Offset -> IO () -writeFD FD {..} bs offset = return () - -{----------------------------------------------------------------------- - Internal ------------------------------------------------------------------------} - -isDownloaded :: PieceIx -> Storage -> STM Bool -isDownloaded pix st @ Storage {..} = do - bf <- readTVar blocks - mask <- pieceMask pix st - return $ intersection mask bf == mask - -pieceMask :: PieceIx -> Storage -> STM Bitfield -pieceMask pix Storage {..} = do - bf <- readTVar blocks - return $ BF.interval (totalCount bf) offset (offset + coeff - 1) - where - offset = coeff * pix - coeff = ciPieceLength (tInfo metainfo) `div` blockSize - - -ixInterval :: Int -> BlockIx -> FixedInterval -ixInterval pieceSize BlockIx {..} = - Fixed.interval (ixPiece * pieceSize + ixOffset) ixLength - -blkInterval :: Int -> Block -> FixedInterval -blkInterval pieceSize Block {..} = - Fixed.interval (blkPiece * pieceSize + blkOffset) - (fromIntegral (Lazy.length blkData)) \ No newline at end of file + lbs = unsafeReadBytes (fromIntegral (pix * pieceLen)) + (fromIntegral pieceLen) fileMap -- cgit v1.2.3