summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-06-30 05:18:24 +0400
committerSam T <pxqr.sta@gmail.com>2013-06-30 05:18:24 +0400
commitc15da2e2b376d81671f35e821e94db19e59d5ddd (patch)
tree7bcc2c929df2dd49f27ef3083eb830344b3d7685
parentf556bf196bf07308f024cc43c1a51dfd4c21188c (diff)
+ Add very basic storage operations.
Now we can download and make some progress, but very unstable.
-rw-r--r--examples/Main.hs2
-rw-r--r--src/Data/Bitfield.hs23
-rw-r--r--src/Data/Torrent.hs15
-rw-r--r--src/Network/BitTorrent.hs31
-rw-r--r--src/Network/BitTorrent/Exchange.hs12
-rw-r--r--src/Network/BitTorrent/Exchange/Protocol.hs2
-rw-r--r--src/Network/BitTorrent/Internal.hs9
-rw-r--r--src/System/Torrent/Storage.hs135
8 files changed, 172 insertions, 57 deletions
diff --git a/examples/Main.hs b/examples/Main.hs
index 9786dbdc..8d976aed 100644
--- a/examples/Main.hs
+++ b/examples/Main.hs
@@ -18,6 +18,8 @@ main = do
18 18
19 storage <- swarm `bindTo` "/tmp/" 19 storage <- swarm `bindTo` "/tmp/"
20 20
21 ppStorage storage >>= print
22
21 discover swarm $ do 23 discover swarm $ do
22 liftIO $ print "connected to peer" 24 liftIO $ print "connected to peer"
23 forever $ exchange storage 25 forever $ exchange storage
diff --git a/src/Data/Bitfield.hs b/src/Data/Bitfield.hs
index 46e0a71f..89461fd2 100644
--- a/src/Data/Bitfield.hs
+++ b/src/Data/Bitfield.hs
@@ -32,6 +32,7 @@ module Data.Bitfield
32 32
33 -- * Construction 33 -- * Construction
34 , haveAll, haveNone, have, singleton 34 , haveAll, haveNone, have, singleton
35 , interval
35 , adjustSize 36 , adjustSize
36 37
37 -- * Query 38 -- * Query
@@ -137,6 +138,10 @@ singleton ix pc = have ix (haveNone pc)
137adjustSize :: PieceCount -> Bitfield -> Bitfield 138adjustSize :: PieceCount -> Bitfield -> Bitfield
138adjustSize s Bitfield {..} = Bitfield s bfSet 139adjustSize s Bitfield {..} = Bitfield s bfSet
139 140
141-- | NOTE: for internal use only
142interval :: PieceCount -> PieceIx -> PieceIx -> Bitfield
143interval pc a b = Bitfield pc (S.interval a b)
144
140{----------------------------------------------------------------------- 145{-----------------------------------------------------------------------
141 Query 146 Query
142-----------------------------------------------------------------------} 147-----------------------------------------------------------------------}
@@ -174,16 +179,14 @@ notMember ix bf @ Bitfield {..}
174 | otherwise = True 179 | otherwise = True
175 180
176-- | Find first available piece index. 181-- | Find first available piece index.
177findMin :: Bitfield -> Maybe PieceIx 182findMin :: Bitfield -> PieceIx
178findMin Bitfield {..} 183findMin = S.findMin . bfSet
179 | S.null bfSet = Nothing 184{-# INLINE findMin #-}
180 | otherwise = Just (S.findMin bfSet)
181 185
182-- | Find last available piece index. 186-- | Find last available piece index.
183findMax :: Bitfield -> Maybe PieceIx 187findMax :: Bitfield -> PieceIx
184findMax Bitfield {..} 188findMax = S.findMax . bfSet
185 | S.null bfSet = Nothing 189{-# INLINE findMax #-}
186 | otherwise = Just (S.findMax bfSet)
187 190
188isSubsetOf :: Bitfield -> Bitfield -> Bool 191isSubsetOf :: Bitfield -> Bitfield -> Bool
189isSubsetOf a b = bfSet a `S.isSubsetOf` bfSet b 192isSubsetOf a b = bfSet a `S.isSubsetOf` bfSet b
@@ -333,11 +336,11 @@ strategyClass threshold = classify . completeness
333 336
334-- | Select the first available piece. 337-- | Select the first available piece.
335strictFirst :: Selector 338strictFirst :: Selector
336strictFirst h a _ = findMin (difference a h) 339strictFirst h a _ = Just $ findMin (difference a h)
337 340
338-- | Select the last available piece. 341-- | Select the last available piece.
339strictLast :: Selector 342strictLast :: Selector
340strictLast h a _ = findMax (difference a h) 343strictLast h a _ = Just $ findMax (difference a h)
341 344
342-- | 345-- |
343rarestFirst :: Selector 346rarestFirst :: Selector
diff --git a/src/Data/Torrent.hs b/src/Data/Torrent.hs
index 551a260c..bdd38630 100644
--- a/src/Data/Torrent.hs
+++ b/src/Data/Torrent.hs
@@ -34,6 +34,8 @@ module Data.Torrent
34 , contentLength, pieceCount, blockCount 34 , contentLength, pieceCount, blockCount
35 , isSingleFile, isMultiFile 35 , isSingleFile, isMultiFile
36 36
37 , checkPiece
38
37 -- * Info hash 39 -- * Info hash
38#if defined (TESTING) 40#if defined (TESTING)
39 , InfoHash(..) 41 , InfoHash(..)
@@ -77,6 +79,9 @@ import Network.URI
77import System.FilePath 79import System.FilePath
78import Numeric 80import Numeric
79 81
82import Data.ByteString.Internal
83import Debug.Trace
84
80 85
81type Time = Text 86type Time = Text
82 87
@@ -141,6 +146,8 @@ simpleTorrent announce info = torrent announce info
141 Nothing Nothing Nothing 146 Nothing Nothing Nothing
142 Nothing Nothing 147 Nothing Nothing
143 148
149-- TODO check if pieceLength is power of 2
150
144-- | Info part of the .torrent file contain info about each content file. 151-- | Info part of the .torrent file contain info about each content file.
145data ContentInfo = 152data ContentInfo =
146 SingleFile { 153 SingleFile {
@@ -361,14 +368,14 @@ slice from to = B.take to . B.drop from
361 368
362-- | Extract validation hash by specified piece index. 369-- | Extract validation hash by specified piece index.
363pieceHash :: ContentInfo -> Int -> ByteString 370pieceHash :: ContentInfo -> Int -> ByteString
364pieceHash ci ix = slice offset size (ciPieces ci) 371pieceHash ci ix = slice (hashsize * ix) hashsize (ciPieces ci)
365 where 372 where
366 offset = ciPieceLength ci * ix 373 hashsize = 20
367 size = ciPieceLength ci
368 374
369-- | Validate piece with metainfo hash. 375-- | Validate piece with metainfo hash.
370checkPiece :: ContentInfo -> Int -> ByteString -> Bool 376checkPiece :: ContentInfo -> Int -> ByteString -> Bool
371checkPiece ci ix piece 377checkPiece ci ix piece @ (PS _ off si)
378 | traceShow (ix, off, si) True
372 = B.length piece == ciPieceLength ci 379 = B.length piece == ciPieceLength ci
373 && hash piece == InfoHash (pieceHash ci ix) 380 && hash piece == InfoHash (pieceHash ci ix)
374 381
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs
index 86c7802b..30735023 100644
--- a/src/Network/BitTorrent.hs
+++ b/src/Network/BitTorrent.hs
@@ -35,6 +35,8 @@ module Network.BitTorrent
35 35
36 -- * Storage 36 -- * Storage
37 , Storage 37 , Storage
38 , ppStorage
39
38 , bindTo 40 , bindTo
39 , unbind 41 , unbind
40 42
@@ -80,7 +82,7 @@ import Control.Monad.Reader
80 82
81import Network 83import Network
82 84
83import Data.Bitfield 85import Data.Bitfield as BF
84import Data.Torrent 86import Data.Torrent
85import Network.BitTorrent.Internal 87import Network.BitTorrent.Internal
86import Network.BitTorrent.Exchange 88import Network.BitTorrent.Exchange
@@ -132,17 +134,24 @@ discover swarm action = do
132 134
133-- | Default P2P action. 135-- | Default P2P action.
134exchange :: Storage -> P2P () 136exchange :: Storage -> P2P ()
135exchange storage = handleEvent (\msg -> liftIO (print msg) >> handler msg) 137exchange storage = awaitEvent >>= handler
136 where 138 where
137 handler (Available bf) 139 handler (Available bf) = do
138 | Just m <- findMin bf = return (Want (BlockIx m 0 262144)) 140 liftIO (print (completeness bf))
139 | otherwise = error "impossible" 141 ixs <- selBlk (findMin bf) storage
140 -- TODO findMin :: Bitfield -> PieceIx 142 mapM_ (yieldEvent . Want) ixs -- TODO yield vectored
141 143
142 handler (Want bix) = do 144 handler (Want bix) = do
143 blk <- liftIO $ getBlk bix storage 145 blk <- liftIO $ getBlk bix storage
144 return (Fragment blk) 146 yieldEvent (Fragment blk)
145 147
146 handler (Fragment blk) = do 148 handler (Fragment blk @ Block {..}) = do
147 liftIO $ putBlk blk storage 149 liftIO $ print (ppBlock blk)
148 return (Available (singleton (blkPiece blk) (error "singleton") )) 150 done <- liftIO $ putBlk blk storage
151 when done $ do
152 yieldEvent $ Available $ singleton blkPiece (succ blkPiece)
153
154 offer <- peerOffer
155 if BF.null offer
156 then return ()
157 else handler (Available offer) \ No newline at end of file
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 505360a4..66112f14 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -53,6 +53,7 @@ module Network.BitTorrent.Exchange
53 , getHaveCount 53 , getHaveCount
54 , getWantCount 54 , getWantCount
55 , getPieceCount 55 , getPieceCount
56 , peerOffer
56 57
57 -- * Events 58 -- * Events
58 , Event(..) 59 , Event(..)
@@ -295,6 +296,10 @@ data Event
295 | Fragment Block 296 | Fragment Block
296 deriving Show 297 deriving Show
297 298
299-- INVARIANT:
300--
301-- * Available Bitfield is never empty
302--
298 303
299-- | You could think of 'awaitEvent' as wait until something interesting occur. 304-- | You could think of 'awaitEvent' as wait until something interesting occur.
300-- 305--
@@ -316,9 +321,7 @@ data Event
316-- forall (Fragment block). isPiece block == True 321-- forall (Fragment block). isPiece block == True
317-- 322--
318awaitEvent :: P2P Event 323awaitEvent :: P2P Event
319awaitEvent = do 324awaitEvent = awaitMessage >>= go
320
321 awaitMessage >>= go
322 where 325 where
323 go KeepAlive = awaitEvent 326 go KeepAlive = awaitEvent
324 go Choke = do 327 go Choke = do
@@ -341,8 +344,7 @@ awaitEvent = do
341 awaitEvent 344 awaitEvent
342 345
343 go (Have idx) = do 346 go (Have idx) = do
344 new <- singletonBF idx 347 bitfield %= have idx
345 bitfield %= BF.union new
346 _ <- revise 348 _ <- revise
347 349
348 offer <- peerOffer 350 offer <- peerOffer
diff --git a/src/Network/BitTorrent/Exchange/Protocol.hs b/src/Network/BitTorrent/Exchange/Protocol.hs
index 8d42e3a8..4cf4685d 100644
--- a/src/Network/BitTorrent/Exchange/Protocol.hs
+++ b/src/Network/BitTorrent/Exchange/Protocol.hs
@@ -237,7 +237,7 @@ data Block = Block {
237 , blkOffset :: {-# UNPACK #-} !Int 237 , blkOffset :: {-# UNPACK #-} !Int
238 238
239 -- | Payload. 239 -- | Payload.
240 , blkData :: !ByteString 240 , blkData :: !ByteString -- TODO make lazy bytestring
241 } deriving (Show, Eq) 241 } deriving (Show, Eq)
242 242
243-- | Format block in human readable form. Payload is ommitted. 243-- | Format block in human readable form. Payload is ommitted.
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
index 38388b9a..bf47b87b 100644
--- a/src/Network/BitTorrent/Internal.hs
+++ b/src/Network/BitTorrent/Internal.hs
@@ -48,7 +48,7 @@ module Network.BitTorrent.Internal
48 , leaveSwarm 48 , leaveSwarm
49 , waitVacancy 49 , waitVacancy
50 50
51 , available 51 , pieceLength
52 52
53 -- * Peer 53 -- * Peer
54 , PeerSession( PeerSession, connectedPeerAddr 54 , PeerSession( PeerSession, connectedPeerAddr
@@ -57,6 +57,9 @@ module Network.BitTorrent.Internal
57 ) 57 )
58 , SessionState 58 , SessionState
59 , withPeerSession 59 , withPeerSession
60
61 -- ** Broadcasting
62 , available
60 , getPending 63 , getPending
61 64
62 -- ** Exceptions 65 -- ** Exceptions
@@ -388,6 +391,7 @@ waitVacancy se =
388 391
389pieceLength :: SwarmSession -> Int 392pieceLength :: SwarmSession -> Int
390pieceLength = ciPieceLength . tInfo . torrentMeta 393pieceLength = ciPieceLength . tInfo . torrentMeta
394{-# INLINE pieceLength #-}
391 395
392{----------------------------------------------------------------------- 396{-----------------------------------------------------------------------
393 Peer session 397 Peer session
@@ -532,7 +536,8 @@ findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
532-- 3. Signal to the all other peer about this. 536-- 3. Signal to the all other peer about this.
533 537
534available :: Bitfield -> SwarmSession -> IO () 538available :: Bitfield -> SwarmSession -> IO ()
535available bf se @ SwarmSession {..} = mark >> atomically broadcast 539available bf se @ SwarmSession {..} = do
540 mark >> atomically broadcast
536 where 541 where
537 mark = do 542 mark = do
538 let bytes = pieceLength se * BF.haveCount bf 543 let bytes = pieceLength se * BF.haveCount bf
diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs
index a5529fe6..cb0494e8 100644
--- a/src/System/Torrent/Storage.hs
+++ b/src/System/Torrent/Storage.hs
@@ -20,29 +20,33 @@
20{-# LANGUAGE RecordWildCards #-} 20{-# LANGUAGE RecordWildCards #-}
21module System.Torrent.Storage 21module System.Torrent.Storage
22 ( Storage 22 ( Storage
23 , ppStorage
23 24
24 -- * Construction 25 -- * Construction
25 , bindTo, unbind, withStorage 26 , bindTo, unbind, withStorage
26 27
27 -- * Modification 28 -- * Modification
28 , getBlk, putBlk 29 , getBlk, putBlk, selBlk
29 ) where 30 ) where
30 31
31import Control.Applicative 32import Control.Applicative
32import Control.Concurrent.STM 33import Control.Concurrent.STM
33import Control.Exception 34import Control.Exception
34import Control.Monad 35import Control.Monad
36import Control.Monad.Trans
37
35import Data.ByteString as B 38import Data.ByteString as B
36import qualified Data.ByteString.Lazy as Lazy 39import qualified Data.ByteString.Lazy as Lazy
37import Data.List as L 40import Data.List as L
41import Text.PrettyPrint
38import System.FilePath 42import System.FilePath
39import System.Directory 43import System.Directory
40 44
41import Data.Bitfield 45import Data.Bitfield as BF
42import Data.Torrent 46import Data.Torrent
43import Network.BitTorrent.Exchange.Protocol 47import Network.BitTorrent.Exchange.Protocol
44import Network.BitTorrent.Internal 48import Network.BitTorrent.Internal
45import System.IO.MMap.Fixed 49import System.IO.MMap.Fixed as Fixed
46 50
47 51
48data Storage = Storage { 52data Storage = Storage {
@@ -51,14 +55,21 @@ data Storage = Storage {
51 55
52 -- | 56 -- |
53 , blocks :: !(TVar Bitfield) 57 , blocks :: !(TVar Bitfield)
58 -- TODO use bytestring for fast serialization
59 -- because we need to write this bitmap to disc periodically
60
61
62 , blockSize :: !Int
54 63
55 -- | Used to map linear block addresses to disjoint 64 -- | Used to map linear block addresses to disjoint
56 -- mallocated/mmaped adresses. 65 -- mallocated/mmaped adresses.
57 , payload :: !Fixed 66 , payload :: !Fixed
58 } 67 }
59 68
60pieceSize :: Storage -> Int 69ppStorage :: Storage -> IO Doc
61pieceSize = ciPieceLength . tInfo . torrentMeta . session 70ppStorage Storage {..} = pp <$> readTVarIO blocks
71 where
72 pp bf = int blockSize
62 73
63{----------------------------------------------------------------------- 74{-----------------------------------------------------------------------
64 Construction 75 Construction
@@ -67,9 +78,15 @@ pieceSize = ciPieceLength . tInfo . torrentMeta . session
67-- TODO doc args 78-- TODO doc args
68bindTo :: SwarmSession -> FilePath -> IO Storage 79bindTo :: SwarmSession -> FilePath -> IO Storage
69bindTo se @ SwarmSession {..} contentPath = do 80bindTo se @ SwarmSession {..} contentPath = do
70 let content_paths = contentLayout contentPath (tInfo torrentMeta) 81 let contentInfo = tInfo torrentMeta
82 let content_paths = contentLayout contentPath contentInfo
71 mapM_ mkDir (L.map fst content_paths) 83 mapM_ mkDir (L.map fst content_paths)
72 Storage se <$> newTVarIO (haveNone (ciPieceLength (tInfo torrentMeta))) 84
85 let pieceLen = pieceLength se
86 let blockSize = min defaultBlockSize pieceLen
87 print $ "content length " ++ show (contentLength contentInfo)
88 Storage se <$> newTVarIO (haveNone (blockCount blockSize contentInfo))
89 <*> pure blockSize
73 <*> coalesceFiles content_paths 90 <*> coalesceFiles content_paths
74 where 91 where
75 mkDir path = do 92 mkDir path = do
@@ -90,43 +107,113 @@ withStorage se path = bracket (se `bindTo` path) unbind
90-----------------------------------------------------------------------} 107-----------------------------------------------------------------------}
91 108
92-- TODO to avoid races we might need to try Control.Concurrent.yield 109-- TODO to avoid races we might need to try Control.Concurrent.yield
93-- TODO lazy block payload 110-- TODO make block_payload :: Lazy.ByteString
111
112selBlk :: MonadIO m => PieceIx -> Storage -> m [BlockIx]
113selBlk pix st @ Storage {..} = liftIO $ atomically $ do
114 mask <- pieceMask pix st
115 select mask <$> readTVar blocks
116 where
117 select mask = fmap mkBix . toList . difference mask
118 -- TODO clip upper bound of block index
119 mkBix ix = BlockIx pix (blockSize * (ix - offset)) blockSize
120
121 offset = coeff * pix
122 coeff = pieceLength session `div` blockSize
123
124--
125-- TODO make global lock map -- otherwise we might get broken pieces
126--
127-- imagine the following situation:
128--
129-- thread1: write
130-- thread1: mark
131--
132-- this let us avoid races as well
133--
94 134
95-- | Write a block to the storage. If block out of range then block is clipped. 135-- | Write a block to the storage. If block out of range then block is clipped.
96putBlk :: Block -> Storage -> IO () 136--
97putBlk blk @ Block {..} st @ Storage {..} = do 137--
138--
139putBlk :: MonadIO m => Block -> Storage -> m Bool
140putBlk blk @ Block {..} st @ Storage {..} = liftIO $ do
98-- let blkIx = undefined 141-- let blkIx = undefined
99-- bm <- readTVarIO blocks 142-- bm <- readTVarIO blocks
100-- unless (member blkIx bm) $ do 143-- unless (member blkIx bm) $ do
101 writeBytes (blkInterval (pieceSize st) blk) 144 writeBytes (blkInterval (pieceLength session) blk)
102 (Lazy.fromChunks [blkData]) 145 (Lazy.fromChunks [blkData])
103 payload 146 payload
104-- when (undefined bm blkIx) $ do 147
105-- if checkPiece ci piIx piece 148 markBlock blk st
106-- then return True 149 validatePiece blkPiece st
107-- else do 150
108-- reset 151markBlock :: Block -> Storage -> IO ()
109-- return False 152markBlock Block {..} Storage {..} = do
153 let piLen = pieceLength session
154 let glIx = (piLen `div` blockSize) * blkPiece + (blkOffset `div` blockSize)
155 atomically $ modifyTVar' blocks (have glIx)
110 156
111-- | Read a block by given block index. If lower or upper bound out of 157-- | Read a block by given block index. If lower or upper bound out of
112-- range then index is clipped. 158-- range then index is clipped.
113getBlk :: BlockIx -> Storage -> IO Block 159--
114getBlk ix @ BlockIx {..} st @ Storage {..} = do 160-- Do not block.
115 bs <- readBytes (ixInterval (pieceSize st) ix) payload 161--
162getBlk :: MonadIO m => BlockIx -> Storage -> m Block
163getBlk ix @ BlockIx {..} st @ Storage {..} = liftIO $ do
164 -- TODO check if __piece__ is available
165 bs <- readBytes (ixInterval (pieceLength session) ix) payload
116 return $ Block ixPiece ixOffset (Lazy.toStrict bs) 166 return $ Block ixPiece ixOffset (Lazy.toStrict bs)
117 167
118-- | Should be used to verify piece.
119getPiece :: PieceIx -> Storage -> IO ByteString 168getPiece :: PieceIx -> Storage -> IO ByteString
120getPiece ix st = blkData <$> getBlk (BlockIx ix 0 (pieceSize st)) st 169getPiece pix st @ Storage {..} = do
170 let pieceLen = pieceLength session
171 let bix = BlockIx pix 0 (pieceLength session)
172 bs <- readBytes (ixInterval pieceLen bix) payload
173 return (Lazy.toStrict bs)
174
175resetPiece :: PieceIx -> Storage -> IO ()
176resetPiece pix st @ Storage {..} = atomically $ do
177 mask <- pieceMask pix st
178 modifyTVar' blocks (`difference` mask)
179
180validatePiece :: PieceIx -> Storage -> IO Bool
181validatePiece pix st @ Storage {..} = do
182 downloaded <- atomically $ isDownloaded pix st
183 if not downloaded then return False
184 else do
185 print $ show pix ++ " downloaded"
186 piece <- getPiece pix st
187 if checkPiece (tInfo (torrentMeta session)) pix piece
188 then return True
189 else do
190 print $ "----------------------------- invalid " ++ show pix
191-- resetPiece pix st
192 return True
121 193
122{----------------------------------------------------------------------- 194{-----------------------------------------------------------------------
123 Internal 195 Internal
124-----------------------------------------------------------------------} 196-----------------------------------------------------------------------}
125 197
198isDownloaded :: PieceIx -> Storage -> STM Bool
199isDownloaded pix st @ Storage {..} = do
200 bf <- readTVar blocks
201 mask <- pieceMask pix st
202 return $ intersection mask bf == mask
203
204pieceMask :: PieceIx -> Storage -> STM Bitfield
205pieceMask pix Storage {..} = do
206 bf <- readTVar blocks
207 return $ BF.interval (totalCount bf) offset (offset + coeff - 1)
208 where
209 offset = coeff * pix
210 coeff = pieceLength session `div` blockSize
211
212
126ixInterval :: Int -> BlockIx -> FixedInterval 213ixInterval :: Int -> BlockIx -> FixedInterval
127ixInterval pieceSize BlockIx {..} = 214ixInterval pieceSize BlockIx {..} =
128 interval (ixPiece * pieceSize + ixOffset) ixLength 215 Fixed.interval (ixPiece * pieceSize + ixOffset) ixLength
129 216
130blkInterval :: Int -> Block -> FixedInterval 217blkInterval :: Int -> Block -> FixedInterval
131blkInterval pieceSize Block {..} = 218blkInterval pieceSize Block {..} =
132 interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file 219 Fixed.interval (blkPiece * pieceSize + blkOffset) (B.length blkData) \ No newline at end of file