From c15da2e2b376d81671f35e821e94db19e59d5ddd Mon Sep 17 00:00:00 2001 From: Sam T Date: Sun, 30 Jun 2013 05:18:24 +0400 Subject: + Add very basic storage operations. Now we can download and make some progress, but very unstable. --- src/System/Torrent/Storage.hs | 135 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 24 deletions(-) (limited to 'src/System/Torrent') diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs index a5529fe6..cb0494e8 100644 --- a/src/System/Torrent/Storage.hs +++ b/src/System/Torrent/Storage.hs @@ -20,29 +20,33 @@ {-# LANGUAGE RecordWildCards #-} module System.Torrent.Storage ( Storage + , ppStorage -- * Construction , bindTo, unbind, withStorage -- * Modification - , getBlk, putBlk + , getBlk, putBlk, selBlk ) where import Control.Applicative import Control.Concurrent.STM import Control.Exception import Control.Monad +import Control.Monad.Trans + import Data.ByteString as B import qualified Data.ByteString.Lazy as Lazy import Data.List as L +import Text.PrettyPrint import System.FilePath import System.Directory -import Data.Bitfield +import Data.Bitfield as BF import Data.Torrent import Network.BitTorrent.Exchange.Protocol import Network.BitTorrent.Internal -import System.IO.MMap.Fixed +import System.IO.MMap.Fixed as Fixed data Storage = Storage { @@ -51,14 +55,21 @@ data Storage = Storage { -- | , blocks :: !(TVar Bitfield) + -- TODO use bytestring for fast serialization + -- because we need to write this bitmap to disc periodically + + + , blockSize :: !Int -- | Used to map linear block addresses to disjoint -- mallocated/mmaped adresses. , payload :: !Fixed } -pieceSize :: Storage -> Int -pieceSize = ciPieceLength . tInfo . torrentMeta . session +ppStorage :: Storage -> IO Doc +ppStorage Storage {..} = pp <$> readTVarIO blocks + where + pp bf = int blockSize {----------------------------------------------------------------------- Construction @@ -67,9 +78,15 @@ pieceSize = ciPieceLength . tInfo . torrentMeta . session -- TODO doc args bindTo :: SwarmSession -> FilePath -> IO Storage bindTo se @ SwarmSession {..} contentPath = do - let content_paths = contentLayout contentPath (tInfo torrentMeta) + let contentInfo = tInfo torrentMeta + let content_paths = contentLayout contentPath contentInfo mapM_ mkDir (L.map fst content_paths) - Storage se <$> newTVarIO (haveNone (ciPieceLength (tInfo torrentMeta))) + + let pieceLen = pieceLength se + let blockSize = min defaultBlockSize pieceLen + print $ "content length " ++ show (contentLength contentInfo) + Storage se <$> newTVarIO (haveNone (blockCount blockSize contentInfo)) + <*> pure blockSize <*> coalesceFiles content_paths where mkDir path = do @@ -90,43 +107,113 @@ withStorage se path = bracket (se `bindTo` path) unbind -----------------------------------------------------------------------} -- TODO to avoid races we might need to try Control.Concurrent.yield --- TODO lazy block payload +-- TODO make block_payload :: Lazy.ByteString + +selBlk :: MonadIO m => PieceIx -> Storage -> m [BlockIx] +selBlk pix st @ Storage {..} = liftIO $ atomically $ do + mask <- pieceMask pix st + select mask <$> readTVar blocks + where + select mask = fmap mkBix . toList . difference mask + -- TODO clip upper bound of block index + mkBix ix = BlockIx pix (blockSize * (ix - offset)) blockSize + + offset = coeff * pix + coeff = pieceLength session `div` blockSize + +-- +-- TODO make global lock map -- otherwise we might get broken pieces +-- +-- imagine the following situation: +-- +-- thread1: write +-- thread1: mark +-- +-- this let us avoid races as well +-- -- | Write a block to the storage. If block out of range then block is clipped. -putBlk :: Block -> Storage -> IO () -putBlk blk @ Block {..} st @ Storage {..} = do +-- +-- +-- +putBlk :: MonadIO m => Block -> Storage -> m Bool +putBlk blk @ Block {..} st @ Storage {..} = liftIO $ do -- let blkIx = undefined -- bm <- readTVarIO blocks -- unless (member blkIx bm) $ do - writeBytes (blkInterval (pieceSize st) blk) + writeBytes (blkInterval (pieceLength session) blk) (Lazy.fromChunks [blkData]) payload --- when (undefined bm blkIx) $ do --- if checkPiece ci piIx piece --- then return True --- else do --- reset --- return False + + markBlock blk st + validatePiece blkPiece st + +markBlock :: Block -> Storage -> IO () +markBlock Block {..} Storage {..} = do + let piLen = pieceLength session + let glIx = (piLen `div` blockSize) * blkPiece + (blkOffset `div` blockSize) + atomically $ modifyTVar' blocks (have glIx) -- | Read a block by given block index. If lower or upper bound out of -- range then index is clipped. -getBlk :: BlockIx -> Storage -> IO Block -getBlk ix @ BlockIx {..} st @ Storage {..} = do - bs <- readBytes (ixInterval (pieceSize st) ix) payload +-- +-- Do not block. +-- +getBlk :: MonadIO m => BlockIx -> Storage -> m Block +getBlk ix @ BlockIx {..} st @ Storage {..} = liftIO $ do + -- TODO check if __piece__ is available + bs <- readBytes (ixInterval (pieceLength session) ix) payload return $ Block ixPiece ixOffset (Lazy.toStrict bs) --- | Should be used to verify piece. getPiece :: PieceIx -> Storage -> IO ByteString -getPiece ix st = blkData <$> getBlk (BlockIx ix 0 (pieceSize st)) st +getPiece pix st @ Storage {..} = do + let pieceLen = pieceLength session + let bix = BlockIx pix 0 (pieceLength session) + bs <- readBytes (ixInterval pieceLen bix) payload + return (Lazy.toStrict bs) + +resetPiece :: PieceIx -> Storage -> IO () +resetPiece pix st @ Storage {..} = atomically $ do + mask <- pieceMask pix st + modifyTVar' blocks (`difference` mask) + +validatePiece :: PieceIx -> Storage -> IO Bool +validatePiece pix st @ Storage {..} = do + downloaded <- atomically $ isDownloaded pix st + if not downloaded then return False + else do + print $ show pix ++ " downloaded" + piece <- getPiece pix st + if checkPiece (tInfo (torrentMeta session)) pix piece + then return True + else do + print $ "----------------------------- invalid " ++ show pix +-- resetPiece pix st + return True {----------------------------------------------------------------------- Internal -----------------------------------------------------------------------} +isDownloaded :: PieceIx -> Storage -> STM Bool +isDownloaded pix st @ Storage {..} = do + bf <- readTVar blocks + mask <- pieceMask pix st + return $ intersection mask bf == mask + +pieceMask :: PieceIx -> Storage -> STM Bitfield +pieceMask pix Storage {..} = do + bf <- readTVar blocks + return $ BF.interval (totalCount bf) offset (offset + coeff - 1) + where + offset = coeff * pix + coeff = pieceLength session `div` blockSize + + ixInterval :: Int -> BlockIx -> FixedInterval ixInterval pieceSize BlockIx {..} = - interval (ixPiece * pieceSize + ixOffset) ixLength + Fixed.interval (ixPiece * pieceSize + ixOffset) ixLength blkInterval :: Int -> Block -> FixedInterval blkInterval pieceSize Block {..} = - interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file + Fixed.interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file -- cgit v1.2.3