summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Download.hs
blob: 981db2fbff72431c92f0a8cd4240ababb1147e94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
-- |
--   Copyright   :  (c) Sam Truzjan 2013
--   License     :  BSD3
--   Maintainer  :  pxqr.sta@gmail.com
--   Stability   :  experimental
--   Portability :  portable
--
--
--
{-# LANGUAGE FlexibleContexts       #-}
{-# LANGUAGE FlexibleInstances      #-}
{-# LANGUAGE MultiParamTypeClasses  #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE TemplateHaskell        #-}
module Network.BitTorrent.Exchange.Download
       ( -- * Downloading
         Download (..)
       , Updates
       , runDownloadUpdates

         -- ** Metadata
         -- $metadata-download
       , MetadataDownload
       , metadataDownload

         -- ** Content
         -- $content-download
       , ContentDownload
       , contentDownload
       ) where

import Control.Applicative
import Control.Concurrent
import Control.Lens
import Control.Monad.State
import Data.BEncode as BE
import Data.ByteString as BS
import Data.ByteString.Lazy as BL
import Data.Default
import Data.List as L
import Data.Maybe
import Data.Map as M
import Data.Tuple

import Data.Torrent as Torrent
import Network.Address
import Network.BitTorrent.Exchange.Bitfield as BF
import Network.BitTorrent.Exchange.Block    as Block
import Network.BitTorrent.Exchange.Message  as Msg
import System.Torrent.Storage (Storage, writePiece)


{-----------------------------------------------------------------------
--  Class
-----------------------------------------------------------------------}

type Updates s a = StateT s IO a

runDownloadUpdates :: MVar s -> Updates s a -> IO a
runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m)

class Download s chunk | s -> chunk where
  scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx]

  -- |
  scheduleBlock  :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx)
  scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf

  -- | Get number of sent requests to this peer.
  getRequestQueueLength :: PeerAddr IP -> Updates s Int

  -- | Remove all pending block requests to the remote peer. May be used
  -- when:
  --
  --     * a peer closes connection;
  --
  --     * remote peer choked this peer;
  --
  --     * timeout expired.
  --
  resetPending :: PeerAddr IP -> Updates s ()

  -- | MAY write to storage, if a new piece have been completed.
  --
  --  You should check if a returned by peer block is actually have
  -- been requested and in-flight. This is needed to avoid "I send
  -- random corrupted block" attacks.
  pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool)

{-----------------------------------------------------------------------
--  Metadata download
-----------------------------------------------------------------------}
-- $metadata-download
-- TODO

data MetadataDownload = MetadataDownload
  { _pendingPieces :: [(PeerAddr IP, PieceIx)]
  , _bucket        :: Bucket
  , _topic         :: InfoHash
  }

makeLenses ''MetadataDownload

-- | Create a new scheduler for infodict of the given size.
metadataDownload :: Int -> InfoHash -> MetadataDownload
metadataDownload ps = MetadataDownload [] (Block.empty ps)

instance Default MetadataDownload where
  def = error "instance Default MetadataDownload"

--cancelPending :: PieceIx -> Updates ()
cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd)

instance Download MetadataDownload (Piece BS.ByteString) where
  scheduleBlock addr bf = do
    bkt  <- use bucket
    case spans metadataPieceSize bkt of
      []              -> return Nothing
      ((off, _ ) : _) -> do
        let pix = off `div` metadataPieceSize
        pendingPieces %= ((addr, pix) :)
        return (Just (BlockIx pix 0 metadataPieceSize))

  resetPending addr = pendingPieces %= L.filter ((addr ==) . fst)

  pushBlock addr Torrent.Piece {..} = do
    p    <- use pendingPieces
    when ((addr, pieceIndex) `L.notElem` p) $
      error "not requested"
    cancelPending pieceIndex

    bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData
    b <- use bucket
    case toPiece b of
      Nothing     -> return Nothing
      Just chunks -> do
          t <- use topic
          case parseInfoDict (BL.toStrict chunks) t of
            Right x -> do
                pendingPieces .= []
                return undefined -- (Just x)
            Left  e -> do
                pendingPieces .= []
                bucket .= Block.empty (Block.size b)
                return undefined -- Nothing
   where
      -- todo use incremental parsing to avoid BS.concat call
      parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict
      parseInfoDict chunk topic =
        case BE.decode chunk of
          Right (infodict @ InfoDict {..})
            | topic == idInfoHash -> return infodict
            |      otherwise      -> Left "broken infodict"
          Left err -> Left $ "unable to parse infodict " ++ err

{-----------------------------------------------------------------------
--  Content download
-----------------------------------------------------------------------}
-- $content-download
--
--  A block can have one of the following status:
--
--     1) /not allowed/: Piece is not in download set.
--
--     2) /waiting/: (allowed?) Block have been allowed to download,
--     but /this/ peer did not send any 'Request' message for this
--     block. To allow some piece use
--     'Network.BitTorrent.Exchange.Selector' and then 'allowedSet'
--     and 'allowPiece'.
--
--     3) /inflight/: (pending?) Block have been requested but
--     /remote/ peer did not send any 'Piece' message for this block.
--     Related functions 'markInflight'
--
--     4) /pending/: (stalled?) Block have have been downloaded
--     Related functions 'insertBlock'.
--
--   Piece status:
--
--     1) /assembled/: (downloaded?) All blocks in piece have been
--     downloaded but the piece did not verified yet.
--
--       * Valid: go to completed;
--
--       * Invalid: go to waiting.
--
--     2) /corrupted/:
--
--     3) /downloaded/: (verified?) A piece have been successfully
--     verified via the hash. Usually the piece should be stored to
--     the 'System.Torrent.Storage' and /this/ peer should send 'Have'
--     messages to the /remote/ peers.
--

data PieceEntry = PieceEntry
  { pending :: [(PeerAddr IP, BlockIx)]
  , stalled :: Bucket
  }

pieceEntry :: PieceSize -> PieceEntry
pieceEntry s = PieceEntry [] (Block.empty s)

isEmpty :: PieceEntry -> Bool
isEmpty PieceEntry {..} = L.null pending && Block.null stalled

_holes :: PieceIx -> PieceEntry -> [BlockIx]
_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
  where
    mkBlockIx (off, sz) = BlockIx pix off sz

data ContentDownload = ContentDownload
  { inprogress     :: !(Map PieceIx PieceEntry)
  , bitfield       :: !Bitfield
  , pieceSize      :: !PieceSize
  , contentStorage ::  Storage
  }

contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload
contentDownload = ContentDownload M.empty

--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates ()
modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s
    { inprogress = alter (g pieceSize) pix inprogress }
  where
    g s = h . f . fromMaybe (pieceEntry s)
    h e
      | isEmpty e = Nothing
      | otherwise = Just e

instance Download ContentDownload (Block BL.ByteString) where
  scheduleBlocks n addr maskBF = do
    ContentDownload {..} <- get
    let wantPieces = maskBF `BF.difference` bitfield
    let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $
                     M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces)
                      inprogress

    bixs <- if L.null wantBlocks
      then do
        mpix <- choosePiece wantPieces
        case mpix of -- TODO return 'n' blocks
          Nothing  -> return []
          Just pix -> return [leadingBlock pix defaultTransferSize]
      else chooseBlocks wantBlocks n

    forM_ bixs $ \ bix -> do
      modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
        { pending = (addr, bix) : pending }

    return bixs
    where
      -- TODO choose block nearest to pending or stalled sets to reduce disk
      -- seeks on remote machines
      --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx]
      chooseBlocks xs n = return (L.take n xs)

      -- TODO use selection strategies from Exchange.Selector
      --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx)
      choosePiece bf
        | BF.null bf = return $ Nothing
        | otherwise  = return $ Just $ BF.findMin bf

  getRequestQueueLength addr = do
    m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress)
    return $ L.sum $ L.map L.length $ M.elems m

  resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
    where
      reset = fmap $ \ e -> e
            { pending = L.filter (not . (==) addr . fst) (pending e) }

  pushBlock addr blk @ Block {..} = do
    mpe <- gets (M.lookup blkPiece . inprogress)
    case mpe of
      Nothing -> return Nothing
      Just (pe @ PieceEntry {..})
        | blockIx blk `L.notElem` fmap snd pending -> return Nothing
        |             otherwise                    -> do
         let bkt' = Block.insertLazy blkOffset blkData stalled
         case toPiece bkt' of
           Nothing        -> do
             modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
               { pending = L.filter ((==) (blockIx blk) . snd) pending
               , stalled = bkt'
               }
             return (Just False)

           Just pieceData -> do
             -- TODO verify
             storage <- gets contentStorage
             liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage
             modify $ \ s @ ContentDownload {..} -> s
               { inprogress =  M.delete blkPiece inprogress
               , bitfield   = BF.insert blkPiece bitfield
               }
             return (Just True)