From c15da2e2b376d81671f35e821e94db19e59d5ddd Mon Sep 17 00:00:00 2001 From: Sam T Date: Sun, 30 Jun 2013 05:18:24 +0400 Subject: + Add very basic storage operations. Now we can download and make some progress, but very unstable. --- examples/Main.hs | 2 + src/Data/Bitfield.hs | 23 ++--- src/Data/Torrent.hs | 15 +++- src/Network/BitTorrent.hs | 31 ++++--- src/Network/BitTorrent/Exchange.hs | 12 +-- src/Network/BitTorrent/Exchange/Protocol.hs | 2 +- src/Network/BitTorrent/Internal.hs | 9 +- src/System/Torrent/Storage.hs | 135 +++++++++++++++++++++++----- 8 files changed, 172 insertions(+), 57 deletions(-) diff --git a/examples/Main.hs b/examples/Main.hs index 9786dbdc..8d976aed 100644 --- a/examples/Main.hs +++ b/examples/Main.hs @@ -18,6 +18,8 @@ main = do storage <- swarm `bindTo` "/tmp/" + ppStorage storage >>= print + discover swarm $ do liftIO $ print "connected to peer" forever $ exchange storage diff --git a/src/Data/Bitfield.hs b/src/Data/Bitfield.hs index 46e0a71f..89461fd2 100644 --- a/src/Data/Bitfield.hs +++ b/src/Data/Bitfield.hs @@ -32,6 +32,7 @@ module Data.Bitfield -- * Construction , haveAll, haveNone, have, singleton + , interval , adjustSize -- * Query @@ -137,6 +138,10 @@ singleton ix pc = have ix (haveNone pc) adjustSize :: PieceCount -> Bitfield -> Bitfield adjustSize s Bitfield {..} = Bitfield s bfSet +-- | NOTE: for internal use only +interval :: PieceCount -> PieceIx -> PieceIx -> Bitfield +interval pc a b = Bitfield pc (S.interval a b) + {----------------------------------------------------------------------- Query -----------------------------------------------------------------------} @@ -174,16 +179,14 @@ notMember ix bf @ Bitfield {..} | otherwise = True -- | Find first available piece index. -findMin :: Bitfield -> Maybe PieceIx -findMin Bitfield {..} - | S.null bfSet = Nothing - | otherwise = Just (S.findMin bfSet) +findMin :: Bitfield -> PieceIx +findMin = S.findMin . bfSet +{-# INLINE findMin #-} -- | Find last available piece index. -findMax :: Bitfield -> Maybe PieceIx -findMax Bitfield {..} - | S.null bfSet = Nothing - | otherwise = Just (S.findMax bfSet) +findMax :: Bitfield -> PieceIx +findMax = S.findMax . bfSet +{-# INLINE findMax #-} isSubsetOf :: Bitfield -> Bitfield -> Bool isSubsetOf a b = bfSet a `S.isSubsetOf` bfSet b @@ -333,11 +336,11 @@ strategyClass threshold = classify . completeness -- | Select the first available piece. strictFirst :: Selector -strictFirst h a _ = findMin (difference a h) +strictFirst h a _ = Just $ findMin (difference a h) -- | Select the last available piece. strictLast :: Selector -strictLast h a _ = findMax (difference a h) +strictLast h a _ = Just $ findMax (difference a h) -- | rarestFirst :: Selector diff --git a/src/Data/Torrent.hs b/src/Data/Torrent.hs index 551a260c..bdd38630 100644 --- a/src/Data/Torrent.hs +++ b/src/Data/Torrent.hs @@ -34,6 +34,8 @@ module Data.Torrent , contentLength, pieceCount, blockCount , isSingleFile, isMultiFile + , checkPiece + -- * Info hash #if defined (TESTING) , InfoHash(..) @@ -77,6 +79,9 @@ import Network.URI import System.FilePath import Numeric +import Data.ByteString.Internal +import Debug.Trace + type Time = Text @@ -141,6 +146,8 @@ simpleTorrent announce info = torrent announce info Nothing Nothing Nothing Nothing Nothing +-- TODO check if pieceLength is power of 2 + -- | Info part of the .torrent file contain info about each content file. data ContentInfo = SingleFile { @@ -361,14 +368,14 @@ slice from to = B.take to . B.drop from -- | Extract validation hash by specified piece index. pieceHash :: ContentInfo -> Int -> ByteString -pieceHash ci ix = slice offset size (ciPieces ci) +pieceHash ci ix = slice (hashsize * ix) hashsize (ciPieces ci) where - offset = ciPieceLength ci * ix - size = ciPieceLength ci + hashsize = 20 -- | Validate piece with metainfo hash. checkPiece :: ContentInfo -> Int -> ByteString -> Bool -checkPiece ci ix piece +checkPiece ci ix piece @ (PS _ off si) + | traceShow (ix, off, si) True = B.length piece == ciPieceLength ci && hash piece == InfoHash (pieceHash ci ix) diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index 86c7802b..30735023 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs @@ -35,6 +35,8 @@ module Network.BitTorrent -- * Storage , Storage + , ppStorage + , bindTo , unbind @@ -80,7 +82,7 @@ import Control.Monad.Reader import Network -import Data.Bitfield +import Data.Bitfield as BF import Data.Torrent import Network.BitTorrent.Internal import Network.BitTorrent.Exchange @@ -132,17 +134,24 @@ discover swarm action = do -- | Default P2P action. exchange :: Storage -> P2P () -exchange storage = handleEvent (\msg -> liftIO (print msg) >> handler msg) +exchange storage = awaitEvent >>= handler where - handler (Available bf) - | Just m <- findMin bf = return (Want (BlockIx m 0 262144)) - | otherwise = error "impossible" - -- TODO findMin :: Bitfield -> PieceIx + handler (Available bf) = do + liftIO (print (completeness bf)) + ixs <- selBlk (findMin bf) storage + mapM_ (yieldEvent . Want) ixs -- TODO yield vectored handler (Want bix) = do blk <- liftIO $ getBlk bix storage - return (Fragment blk) - - handler (Fragment blk) = do - liftIO $ putBlk blk storage - return (Available (singleton (blkPiece blk) (error "singleton") )) + yieldEvent (Fragment blk) + + handler (Fragment blk @ Block {..}) = do + liftIO $ print (ppBlock blk) + done <- liftIO $ putBlk blk storage + when done $ do + yieldEvent $ Available $ singleton blkPiece (succ blkPiece) + + offer <- peerOffer + if BF.null offer + then return () + else handler (Available offer) \ No newline at end of file diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 505360a4..66112f14 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs @@ -53,6 +53,7 @@ module Network.BitTorrent.Exchange , getHaveCount , getWantCount , getPieceCount + , peerOffer -- * Events , Event(..) @@ -295,6 +296,10 @@ data Event | Fragment Block deriving Show +-- INVARIANT: +-- +-- * Available Bitfield is never empty +-- -- | You could think of 'awaitEvent' as wait until something interesting occur. -- @@ -316,9 +321,7 @@ data Event -- forall (Fragment block). isPiece block == True -- awaitEvent :: P2P Event -awaitEvent = do - - awaitMessage >>= go +awaitEvent = awaitMessage >>= go where go KeepAlive = awaitEvent go Choke = do @@ -341,8 +344,7 @@ awaitEvent = do awaitEvent go (Have idx) = do - new <- singletonBF idx - bitfield %= BF.union new + bitfield %= have idx _ <- revise offer <- peerOffer diff --git a/src/Network/BitTorrent/Exchange/Protocol.hs b/src/Network/BitTorrent/Exchange/Protocol.hs index 8d42e3a8..4cf4685d 100644 --- a/src/Network/BitTorrent/Exchange/Protocol.hs +++ b/src/Network/BitTorrent/Exchange/Protocol.hs @@ -237,7 +237,7 @@ data Block = Block { , blkOffset :: {-# UNPACK #-} !Int -- | Payload. - , blkData :: !ByteString + , blkData :: !ByteString -- TODO make lazy bytestring } deriving (Show, Eq) -- | Format block in human readable form. Payload is ommitted. diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index 38388b9a..bf47b87b 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs @@ -48,7 +48,7 @@ module Network.BitTorrent.Internal , leaveSwarm , waitVacancy - , available + , pieceLength -- * Peer , PeerSession( PeerSession, connectedPeerAddr @@ -57,6 +57,9 @@ module Network.BitTorrent.Internal ) , SessionState , withPeerSession + + -- ** Broadcasting + , available , getPending -- ** Exceptions @@ -388,6 +391,7 @@ waitVacancy se = pieceLength :: SwarmSession -> Int pieceLength = ciPieceLength . tInfo . torrentMeta +{-# INLINE pieceLength #-} {----------------------------------------------------------------------- Peer session @@ -532,7 +536,8 @@ findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession -- 3. Signal to the all other peer about this. available :: Bitfield -> SwarmSession -> IO () -available bf se @ SwarmSession {..} = mark >> atomically broadcast +available bf se @ SwarmSession {..} = do + mark >> atomically broadcast where mark = do let bytes = pieceLength se * BF.haveCount bf diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs index a5529fe6..cb0494e8 100644 --- a/src/System/Torrent/Storage.hs +++ b/src/System/Torrent/Storage.hs @@ -20,29 +20,33 @@ {-# LANGUAGE RecordWildCards #-} module System.Torrent.Storage ( Storage + , ppStorage -- * Construction , bindTo, unbind, withStorage -- * Modification - , getBlk, putBlk + , getBlk, putBlk, selBlk ) where import Control.Applicative import Control.Concurrent.STM import Control.Exception import Control.Monad +import Control.Monad.Trans + import Data.ByteString as B import qualified Data.ByteString.Lazy as Lazy import Data.List as L +import Text.PrettyPrint import System.FilePath import System.Directory -import Data.Bitfield +import Data.Bitfield as BF import Data.Torrent import Network.BitTorrent.Exchange.Protocol import Network.BitTorrent.Internal -import System.IO.MMap.Fixed +import System.IO.MMap.Fixed as Fixed data Storage = Storage { @@ -51,14 +55,21 @@ data Storage = Storage { -- | , blocks :: !(TVar Bitfield) + -- TODO use bytestring for fast serialization + -- because we need to write this bitmap to disc periodically + + + , blockSize :: !Int -- | Used to map linear block addresses to disjoint -- mallocated/mmaped adresses. , payload :: !Fixed } -pieceSize :: Storage -> Int -pieceSize = ciPieceLength . tInfo . torrentMeta . session +ppStorage :: Storage -> IO Doc +ppStorage Storage {..} = pp <$> readTVarIO blocks + where + pp bf = int blockSize {----------------------------------------------------------------------- Construction @@ -67,9 +78,15 @@ pieceSize = ciPieceLength . tInfo . torrentMeta . session -- TODO doc args bindTo :: SwarmSession -> FilePath -> IO Storage bindTo se @ SwarmSession {..} contentPath = do - let content_paths = contentLayout contentPath (tInfo torrentMeta) + let contentInfo = tInfo torrentMeta + let content_paths = contentLayout contentPath contentInfo mapM_ mkDir (L.map fst content_paths) - Storage se <$> newTVarIO (haveNone (ciPieceLength (tInfo torrentMeta))) + + let pieceLen = pieceLength se + let blockSize = min defaultBlockSize pieceLen + print $ "content length " ++ show (contentLength contentInfo) + Storage se <$> newTVarIO (haveNone (blockCount blockSize contentInfo)) + <*> pure blockSize <*> coalesceFiles content_paths where mkDir path = do @@ -90,43 +107,113 @@ withStorage se path = bracket (se `bindTo` path) unbind -----------------------------------------------------------------------} -- TODO to avoid races we might need to try Control.Concurrent.yield --- TODO lazy block payload +-- TODO make block_payload :: Lazy.ByteString + +selBlk :: MonadIO m => PieceIx -> Storage -> m [BlockIx] +selBlk pix st @ Storage {..} = liftIO $ atomically $ do + mask <- pieceMask pix st + select mask <$> readTVar blocks + where + select mask = fmap mkBix . toList . difference mask + -- TODO clip upper bound of block index + mkBix ix = BlockIx pix (blockSize * (ix - offset)) blockSize + + offset = coeff * pix + coeff = pieceLength session `div` blockSize + +-- +-- TODO make global lock map -- otherwise we might get broken pieces +-- +-- imagine the following situation: +-- +-- thread1: write +-- thread1: mark +-- +-- this let us avoid races as well +-- -- | Write a block to the storage. If block out of range then block is clipped. -putBlk :: Block -> Storage -> IO () -putBlk blk @ Block {..} st @ Storage {..} = do +-- +-- +-- +putBlk :: MonadIO m => Block -> Storage -> m Bool +putBlk blk @ Block {..} st @ Storage {..} = liftIO $ do -- let blkIx = undefined -- bm <- readTVarIO blocks -- unless (member blkIx bm) $ do - writeBytes (blkInterval (pieceSize st) blk) + writeBytes (blkInterval (pieceLength session) blk) (Lazy.fromChunks [blkData]) payload --- when (undefined bm blkIx) $ do --- if checkPiece ci piIx piece --- then return True --- else do --- reset --- return False + + markBlock blk st + validatePiece blkPiece st + +markBlock :: Block -> Storage -> IO () +markBlock Block {..} Storage {..} = do + let piLen = pieceLength session + let glIx = (piLen `div` blockSize) * blkPiece + (blkOffset `div` blockSize) + atomically $ modifyTVar' blocks (have glIx) -- | Read a block by given block index. If lower or upper bound out of -- range then index is clipped. -getBlk :: BlockIx -> Storage -> IO Block -getBlk ix @ BlockIx {..} st @ Storage {..} = do - bs <- readBytes (ixInterval (pieceSize st) ix) payload +-- +-- Do not block. +-- +getBlk :: MonadIO m => BlockIx -> Storage -> m Block +getBlk ix @ BlockIx {..} st @ Storage {..} = liftIO $ do + -- TODO check if __piece__ is available + bs <- readBytes (ixInterval (pieceLength session) ix) payload return $ Block ixPiece ixOffset (Lazy.toStrict bs) --- | Should be used to verify piece. getPiece :: PieceIx -> Storage -> IO ByteString -getPiece ix st = blkData <$> getBlk (BlockIx ix 0 (pieceSize st)) st +getPiece pix st @ Storage {..} = do + let pieceLen = pieceLength session + let bix = BlockIx pix 0 (pieceLength session) + bs <- readBytes (ixInterval pieceLen bix) payload + return (Lazy.toStrict bs) + +resetPiece :: PieceIx -> Storage -> IO () +resetPiece pix st @ Storage {..} = atomically $ do + mask <- pieceMask pix st + modifyTVar' blocks (`difference` mask) + +validatePiece :: PieceIx -> Storage -> IO Bool +validatePiece pix st @ Storage {..} = do + downloaded <- atomically $ isDownloaded pix st + if not downloaded then return False + else do + print $ show pix ++ " downloaded" + piece <- getPiece pix st + if checkPiece (tInfo (torrentMeta session)) pix piece + then return True + else do + print $ "----------------------------- invalid " ++ show pix +-- resetPiece pix st + return True {----------------------------------------------------------------------- Internal -----------------------------------------------------------------------} +isDownloaded :: PieceIx -> Storage -> STM Bool +isDownloaded pix st @ Storage {..} = do + bf <- readTVar blocks + mask <- pieceMask pix st + return $ intersection mask bf == mask + +pieceMask :: PieceIx -> Storage -> STM Bitfield +pieceMask pix Storage {..} = do + bf <- readTVar blocks + return $ BF.interval (totalCount bf) offset (offset + coeff - 1) + where + offset = coeff * pix + coeff = pieceLength session `div` blockSize + + ixInterval :: Int -> BlockIx -> FixedInterval ixInterval pieceSize BlockIx {..} = - interval (ixPiece * pieceSize + ixOffset) ixLength + Fixed.interval (ixPiece * pieceSize + ixOffset) ixLength blkInterval :: Int -> Block -> FixedInterval blkInterval pieceSize Block {..} = - interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file + Fixed.interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file -- cgit v1.2.3