From b5f222ba7dfa1fa53b8b53f4e1b770193bb55fe4 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Mon, 14 Oct 2013 05:11:46 +0400 Subject: Move some modules from torrent-content --- src/System/IO/MMap/Fixed.hs | 212 ++++++++++++++++++++++++++ src/System/Torrent/Storage.hs | 336 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 548 insertions(+) create mode 100644 src/System/IO/MMap/Fixed.hs create mode 100644 src/System/Torrent/Storage.hs (limited to 'src/System') diff --git a/src/System/IO/MMap/Fixed.hs b/src/System/IO/MMap/Fixed.hs new file mode 100644 index 00000000..1e83c350 --- /dev/null +++ b/src/System/IO/MMap/Fixed.hs @@ -0,0 +1,212 @@ +-- TODO pprint +-- TODO see if this IntervalMap is overkill: Interval dataty have 4 constrs +-- TODO clarify lifetime in docs +-- TODO use madvise +-- TODO unmap selected interval +-- TODO tests +-- TODO benchmarks +-- TODO unmap overlapped regions +-- [A] TODO lazy mapping for 32 bit arch; +-- we need tricky algorithm and a lot of perf tests +-- TODO use memmove in write bytes +-- TODO write elem, write byte, read byte +-- | +-- Copyright : (c) Sam T. 2013 +-- License : MIT +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- This library provides mechanism to mmap files to fixed address +-- with fine-grained control. Hovewer, instead of using MAP_FIXED we +-- create our own address space upon virtual address space. If you +-- would like you could call this space as "fixed address space". +-- +-- This solves a few problems: +-- +-- * Page already in use. If you mmap one file at 0..x addresses and +-- want to map second file to x..y addresses using MAP_FIXED you +-- can get in troubles: page might be mapped already. Raw call to +-- mmap will silently unmap x..y addresses and then mmap our second +-- file. So here we have extra unmap we would like to avoid. +-- +-- * Page boundaries. If you mmap one file at x..x+1 you could +-- not map next file to say addresses x+1..x+2. +-- +-- Internally we make ordinary call to mmap to map a file and then +-- using /interval map/ we map fixed address space to virtual +-- address space. It takes TODO time in TODO cases. +-- +-- Basically this library could be used when we need coalesce +-- several files in arbitrary way. We could map at any position as +-- long as offset + size fit in 'Int'. +-- +-- For other details see: +-- +-- > http://hackage.haskell.org/package/mmap +-- > man mmap +-- +{-# LANGUAGE RecordWildCards #-} +module System.IO.MMap.Fixed + ( -- * Intervals + FixedOffset, FileOffset, FixedInterval, FileInterval + , interval, fileInterval + + -- * Construction + , Fixed, Bytes + , System.IO.MMap.Fixed.empty, insertTo + , coalesceFiles + + -- ** Specialized 'insertTo' + , mmapTo, mallocTo + , lookupRegion + + -- * Query + , upperAddr + + -- * Access + , viewBytes, readBytes, writeBytes + , readElem, writeElem + ) where + +import Data.ByteString.Lazy as Lazy +import Data.ByteString.Lazy.Internal as Lazy +import Data.ByteString.Internal as B +import Data.List as L +import Data.Int +import Data.IntervalMap.Strict as M +import Data.IntervalMap.Interval +import System.IO.MMap +import Foreign + + +type FixedOffset = Int +type FileOffset = Int64 +type Size = Int + + +type FileInterval = (FileOffset, Size) +type FixedInterval = Interval FixedOffset + + +interval :: FixedOffset -> Size -> FixedInterval +interval off s = IntervalCO off (off + fromIntegral (max 0 s)) +{-# INLINE interval #-} + +fileInterval :: FileOffset -> Size -> FileInterval +fileInterval off s = (off, s) +{-# INLINE fileInterval #-} + +intervalSize :: FixedInterval -> Size +intervalSize i = upperBound i - lowerBound i +{-# INLINE intervalSize #-} + + +type Bytes = (ForeignPtr Word8, Size) + +type FixedMap = IntervalMap FixedOffset Bytes + +newtype Fixed = Fixed { imap :: FixedMap } + +instance Show Fixed where + show = show . M.toList . imap + + +mapIM :: (FixedMap -> FixedMap) -> Fixed -> Fixed +mapIM f s = s { imap = f (imap s) } + +empty :: Fixed +empty = Fixed M.empty + +coalesceFiles :: [(FilePath, Int)] -> IO Fixed +coalesceFiles = go 0 System.IO.MMap.Fixed.empty + where + go _ s [] = return s + go offset s ((path, bsize) : xs) = do + s' <- mmapTo path (0, bsize) offset s + go (offset + bsize) s' xs + +upperAddr :: Fixed -> FixedOffset +upperAddr = upperBound . fst . findLast . imap + +insertTo :: FixedInterval -> Bytes -> Fixed -> Fixed +insertTo fi mm = mapIM (M.insert fi mm) +{-# INLINE insertTo #-} + +mmapTo :: FilePath -> FileInterval -> FixedOffset -> Fixed -> IO Fixed +mmapTo path mrange to s = do + (fptr, offset, fsize) <- mmapFileForeignPtr path ReadWriteEx (Just mrange) + + let fixed = interval to fsize + let mmaped = (fptr, offset) + + return $ insertTo fixed mmaped s + +mallocTo :: FixedInterval -> Fixed -> IO Fixed +mallocTo fi s = do + let bsize = intervalSize fi + fptr <- mallocForeignPtrBytes bsize + return (insertTo fi (fptr, 0) s) + +lookupRegion :: FixedOffset -> Fixed -> Maybe B.ByteString +lookupRegion offset Fixed {..} = + case intersecting imap $ IntervalCO offset (succ offset) of + [(i, (fptr, off))] -> let s = upperBound i - lowerBound i + in Just $ fromForeignPtr fptr off (max 0 s) + _ -> Nothing + +-- | Note: this is unsafe operation. +viewBytes :: FixedInterval -> Fixed -> Lazy.ByteString +viewBytes fi s = fromChunks $ L.map mk $ (imap s `intersecting` fi) + where + mk (i, (fptr, offset)) = + let dropB = max 0 (lowerBound fi - lowerBound i) + dropT = max 0 (upperBound i - upperBound fi) + bsize = intervalSize i - (dropT + dropB) + in fromForeignPtr fptr (offset + dropB) bsize + + +readBytes :: FixedInterval -> Fixed -> IO Lazy.ByteString +readBytes fi s = let c = Lazy.copy (viewBytes fi s) in mkCopy c >> return c +{-# INLINE readBytes #-} + +writeBytes :: FixedInterval -> Lazy.ByteString -> Fixed -> IO () +writeBytes fi bs s = bscpy (viewBytes fi s) bs +{-# INLINE writeBytes #-} + +-- | Note: this operation takes O(log(files count)) time, if possible +-- use readBytes. +readElem :: Storable a => Fixed -> FixedOffset -> IO a +readElem s offset = go undefined + where + go :: Storable a => a -> IO a + go dont_touch = do + let bsize = sizeOf dont_touch + let PS fptr off _ = Lazy.toStrict (viewBytes (interval offset bsize) s) + withForeignPtr fptr $ \ ptr -> peekByteOff ptr off + +writeElem :: Storable a => Fixed -> FixedOffset -> a -> IO () +writeElem s offset x = do + let bsize = sizeOf x + let PS fptr off _ = Lazy.toStrict (viewBytes (interval offset bsize) s) + withForeignPtr fptr $ \ptr -> pokeByteOff ptr off x + + +mkCopy :: Lazy.ByteString -> IO () +mkCopy Empty = return () +mkCopy (Chunk _ x) = mkCopy x + +bscpy :: Lazy.ByteString -> Lazy.ByteString -> IO () +bscpy (PS _ _ 0 `Chunk` dest_rest) src = bscpy dest_rest src +bscpy dest (PS _ _ 0 `Chunk` src_rest) = bscpy dest src_rest +bscpy (PS dest_fptr dest_off dest_size `Chunk` dest_rest) + (PS src_fptr src_off src_size `Chunk` src_rest) + = do let csize = min dest_size src_size + withForeignPtr dest_fptr $ \dest_ptr -> + withForeignPtr src_fptr $ \src_ptr -> + memcpy (dest_ptr `advancePtr` dest_off) + (src_ptr `advancePtr` src_off) + (fromIntegral csize) -- TODO memmove? + bscpy (PS dest_fptr (dest_off + csize) (dest_size - csize) `Chunk` dest_rest) + (PS src_fptr (src_off + csize) (src_size - csize) `Chunk` src_rest) +bscpy _ _ = return () \ No newline at end of file diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs new file mode 100644 index 00000000..16f888bf --- /dev/null +++ b/src/System/Torrent/Storage.hs @@ -0,0 +1,336 @@ +-- | +-- Copyright : (c) Sam T. 2013 +-- License : MIT +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : non-portable +-- +-- This module implements mapping from single continious block space +-- to file storage. Storage can be used in two modes: +-- +-- * As in memory storage - in this case we don't touch filesystem. +-- +-- * As ordinary mmaped file storage - when we need to store +-- data in the filesystem. +-- +{-# LANGUAGE DoAndIfThenElse #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RecordWildCards #-} +module System.Torrent.Storage + ( Storage (metainfo) + , ppStorage + + -- * Construction + , openStorage, closeStorage, withStorage + , getCompleteBitfield + + -- * Modification + , getBlk, putBlk, selBlk + , getPiece, validatePiece + + -- * TODO expose only File interface! + -- * File interface + , FD + , openFD, flushFD, closeFD + , readFD, writeFD + ) where + +import Control.Applicative +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Control.Monad.Trans + +import Data.ByteString as B +import qualified Data.ByteString.Lazy as Lazy +import Text.PrettyPrint +import System.FilePath +import System.Directory +import Foreign.C.Error + +import Data.Torrent.Bitfield as BF +import Data.Torrent.Block +import Data.Torrent.Metainfo +import System.IO.MMap.Fixed as Fixed + +-- TODO merge piece validation and Sessions.available into one transaction. +data Storage = Storage { + -- | + metainfo :: !Torrent + + -- | Bitmask of complete and verified _pieces_. + , complete :: !(TVar Bitfield) + + -- | Bitmask of complete _blocks_. + , blocks :: !(TVar Bitfield) + -- TODO use bytestring for fast serialization + -- because we need to write this bitmap to disc periodically + + , blockSize :: !Int + + -- | Used to map linear block addresses to disjoint + -- mallocated/mmaped adresses. + , payload :: !Fixed + } + +ppStorage :: Storage -> IO Doc +ppStorage Storage {..} = pp <$> readTVarIO blocks + where + pp bf = int blockSize + +getCompleteBitfield :: Storage -> STM Bitfield +getCompleteBitfield Storage {..} = readTVar complete + +{----------------------------------------------------------------------- + Construction +-----------------------------------------------------------------------} + +-- TODO doc args +openStorage :: Torrent -> FilePath -> Bitfield -> IO Storage +openStorage t @ Torrent {..} contentPath bf = do + let content_paths = contentLayout contentPath tInfo + mapM_ (mkDir . fst) content_paths + + let blockSize = defaultBlockSize `min` ciPieceLength tInfo + Storage t <$> newTVarIO bf + <*> newTVarIO (haveNone (blockCount blockSize tInfo)) + <*> pure blockSize + <*> coalesceFiles content_paths + where + mkDir path = do + let dirPath = fst (splitFileName path) + exist <- doesDirectoryExist dirPath + unless exist $ do + createDirectoryIfMissing True dirPath + +-- TODO +closeStorage :: Storage -> IO () +closeStorage st = return () + + +withStorage :: Torrent -> FilePath -> Bitfield -> (Storage -> IO a) -> IO a +withStorage se path bf = bracket (openStorage se path bf) closeStorage + +{----------------------------------------------------------------------- + Modification +-----------------------------------------------------------------------} + +-- TODO to avoid races we might need to try Control.Concurrent.yield +-- TODO make block_payload :: Lazy.ByteString + +selBlk :: MonadIO m => PieceIx -> Storage -> m [BlockIx] +selBlk pix st @ Storage {..} + = liftIO $ {-# SCC selBlk #-} atomically $ do + mask <- pieceMask pix st + select mask <$> readTVar blocks + where + select mask = fmap mkBix . toList . difference mask + -- TODO clip upper bound of block index + mkBix ix = BlockIx pix (blockSize * (ix - offset)) blockSize + + offset = coeff * pix + coeff = ciPieceLength (tInfo metainfo) `div` blockSize + +-- +-- TODO make global lock map -- otherwise we might get broken pieces +-- +-- imagine the following situation: +-- +-- thread1: write +-- thread1: mark +-- +-- this let us avoid races as well +-- + +-- | Write a block to the storage. If block out of range then block is clipped. +-- +-- +-- +putBlk :: MonadIO m => Block -> Storage -> m Bool +putBlk blk @ Block {..} st @ Storage {..} + = liftIO $ {-# SCC putBlk #-} do +-- let blkIx = undefined +-- bm <- readTVarIO blocks +-- unless (member blkIx bm) $ do + writeBytes (blkInterval (ciPieceLength (tInfo metainfo)) blk) blkData payload + + markBlock blk st + completePiece blkPiece st + +markBlock :: Block -> Storage -> IO () +markBlock Block {..} Storage {..} = {-# SCC markBlock #-} do + let piLen = ciPieceLength (tInfo metainfo) + let glIx = (piLen `div` blockSize) * blkPiece + (blkOffset `div` blockSize) + atomically $ modifyTVar' blocks (have glIx) + +-- | Read a block by given block index. If lower or upper bound out of +-- range then index is clipped. +-- +-- Do not block. +-- +getBlk :: MonadIO m => BlockIx -> Storage -> m Block +getBlk ix @ BlockIx {..} st @ Storage {..} + = liftIO $ {-# SCC getBlk #-} do + -- TODO check if __piece__ is available + let piLen = ciPieceLength (tInfo metainfo) + bs <- readBytes (ixInterval piLen ix) payload + return $ Block ixPiece ixOffset bs + +getPiece :: PieceIx -> Storage -> IO Lazy.ByteString +getPiece pix st @ Storage {..} = {-# SCC getPiece #-} do + let piLen = ciPieceLength (tInfo metainfo) + let bix = BlockIx pix 0 piLen + let bs = viewBytes (ixInterval piLen bix) payload + return $ bs + +resetPiece :: PieceIx -> Storage -> IO () +resetPiece pix st @ Storage {..} + = {-# SCC resetPiece #-} atomically $ do + mask <- pieceMask pix st + modifyTVar' blocks (`difference` mask) + +validatePiece :: Storage -> PieceIx -> IO Bool +validatePiece storage pix = do + checkPiece (tInfo (metainfo storage)) pix <$> getPiece pix storage + +completePiece :: PieceIx -> Storage -> IO Bool +completePiece pix st @ Storage {..} = {-# SCC completePiece #-} do + downloaded <- atomically $ isDownloaded pix st + if not downloaded then return False + else do + piece <- getPiece pix st + if checkPiece (tInfo metainfo) pix piece + then do + atomically $ modifyTVar' complete (BF.have pix) + return True + else do + print $ "----------------------------- invalid " ++ show pix +-- resetPiece pix st + return True + +-- | Check each piece in the storage against content info hash. +-- +-- Note that this function will block until each the entire storage +-- checked. This may take a long time for a big torrents ­ use fork +-- if needed. +-- +validateStorage :: Storage -> IO () +validateStorage st = undefined -- (`validatePiece` st) [0..pieceCount st] + +{----------------------------------------------------------------------- + POSIX-like file interface +------------------------------------------------------------------------ +This is useful for virtual filesystem writers and just for per file +interface. +-----------------------------------------------------------------------} +-- TODO reference counting: storage might be closed before all FDs +-- gets closed! +-- or we can forbid to close storage and use finalizers only? + +type Offset = Int +type Size = Int + +data FD = FD { + fdData :: ByteString + , fdNoBlock :: Bool + } + + +-- TODO return "is dir" error +-- | This call correspond to open(2) with the following parameters: +-- +-- * OpenMode = ReadOnly; +-- +-- * OpenFileFlags = O_NONBLOCK. (not true yet) +-- +openFD :: FilePath -> Bool -> Storage -> IO (Either Errno FD) +openFD path nonblock Storage {..} + | Just offset <- fileOffset path (tInfo metainfo) + , Just bs <- lookupRegion (fromIntegral offset) payload + = return $ Right $ FD bs nonblock + | otherwise = return $ Left $ eNOENT + +-- | Cancel all enqueued read operations and report any delayed +-- errors. +flushFD :: FD -> IO Errno +flushFD _ = return eOK + +-- | This call correspond to close(2). +closeFD :: FD -> IO () +closeFD _ = return () + +-- TODO +maskRegion :: FD -> Offset -> Size -> Maybe Size +maskRegion FD {..} offset siz = return siz + +-- TODO +isComplete :: FD -> Offset -> Size -> IO Size +isComplete _ _ siz = return siz + +-- TODO +enqueueRead :: FD -> Offset -> Size -> IO () +enqueueRead _ _ _ = return () + +-- TODO +readAhead :: FD -> Offset -> Size -> IO () +readAhead _ _ _ = return () + +-- TODO +waitRegion :: FD -> Offset -> Size -> IO ByteString +waitRegion _ _ _ = return B.empty + +-- TODO implement blocking and non blocking modes? +-- TODO check if region completely downloaded +-- TODO if not we could return EAGAIN +-- TODO enqueue read to piece manager +-- | This call correspond to pread(2). +readFD :: FD -> Offset -> Size -> IO (Either Errno ByteString) +readFD fd @ FD {..} offset reqSize = + case maskRegion fd offset reqSize of + Nothing -> return $ Right B.empty + Just expSize -> do + availSize <- isComplete fd offset expSize + if availSize == expSize then haveAllReg expSize else haveSomeReg expSize + where + haveAllReg expSize = do + readAhead fd offset expSize + return $ Right $ slice offset expSize fdData + + haveSomeReg expSize + | fdNoBlock = return $ Left $ eAGAIN + | otherwise = do + bs <- waitRegion fd offset expSize + readAhead fd offset expSize + return $ Right bs + +-- TODO implement COW; needed for applications which want to change files. +writeFD :: FD -> ByteString -> Offset -> IO () +writeFD FD {..} bs offset = return () + +{----------------------------------------------------------------------- + Internal +-----------------------------------------------------------------------} + +isDownloaded :: PieceIx -> Storage -> STM Bool +isDownloaded pix st @ Storage {..} = do + bf <- readTVar blocks + mask <- pieceMask pix st + return $ intersection mask bf == mask + +pieceMask :: PieceIx -> Storage -> STM Bitfield +pieceMask pix Storage {..} = do + bf <- readTVar blocks + return $ BF.interval (totalCount bf) offset (offset + coeff - 1) + where + offset = coeff * pix + coeff = ciPieceLength (tInfo metainfo) `div` blockSize + + +ixInterval :: Int -> BlockIx -> FixedInterval +ixInterval pieceSize BlockIx {..} = + Fixed.interval (ixPiece * pieceSize + ixOffset) ixLength + +blkInterval :: Int -> Block -> FixedInterval +blkInterval pieceSize Block {..} = + Fixed.interval (blkPiece * pieceSize + blkOffset) + (fromIntegral (Lazy.length blkData)) \ No newline at end of file -- cgit v1.2.3