summaryrefslogtreecommitdiff
path: root/bittorrent/src/Network/BitTorrent/Exchange
diff options
context:
space:
mode:
Diffstat (limited to 'bittorrent/src/Network/BitTorrent/Exchange')
-rw-r--r--bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs405
-rw-r--r--bittorrent/src/Network/BitTorrent/Exchange/Block.hs369
-rw-r--r--bittorrent/src/Network/BitTorrent/Exchange/Connection.hs1012
-rw-r--r--bittorrent/src/Network/BitTorrent/Exchange/Download.hs296
-rw-r--r--bittorrent/src/Network/BitTorrent/Exchange/Manager.hs62
-rw-r--r--bittorrent/src/Network/BitTorrent/Exchange/Message.hs1237
-rw-r--r--bittorrent/src/Network/BitTorrent/Exchange/Session.hs586
7 files changed, 0 insertions, 3967 deletions
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs b/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs
deleted file mode 100644
index 1be9f970..00000000
--- a/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs
+++ /dev/null
@@ -1,405 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This modules provides all necessary machinery to work with
9-- bitfields. Bitfields are used to keep track indices of complete
10-- pieces either this peer have or remote peer have.
11--
12-- There are also commonly used piece selection algorithms
13-- which used to find out which one next piece to download.
14-- Selectors considered to be used in the following order:
15--
16-- * 'randomFirst' - at the start of download.
17--
18-- * 'rarestFirst' - performed to avoid situation when
19-- rarest piece is unaccessible.
20--
21-- * 'endGame' - performed after a peer has requested all
22-- the subpieces of the content.
23--
24-- Note that BitTorrent protocol recommend (TODO link?) the
25-- 'strictFirst' priority policy for /subpiece/ or /blocks/
26-- selection.
27--
28{-# LANGUAGE CPP #-}
29{-# LANGUAGE BangPatterns #-}
30{-# LANGUAGE RecordWildCards #-}
31module Network.BitTorrent.Exchange.Bitfield
32 ( -- * Bitfield
33 PieceIx
34 , PieceCount
35 , Bitfield
36
37 -- * Construction
38 , haveAll
39 , haveNone
40 , have
41 , singleton
42 , interval
43 , adjustSize
44
45 -- * Query
46 -- ** Cardinality
47 , Network.BitTorrent.Exchange.Bitfield.null
48 , Network.BitTorrent.Exchange.Bitfield.full
49 , haveCount
50 , totalCount
51 , completeness
52
53 -- ** Membership
54 , member
55 , notMember
56 , findMin
57 , findMax
58 , isSubsetOf
59
60 -- ** Availability
61 , complement
62 , Frequency
63 , frequencies
64 , rarest
65
66 -- * Combine
67 , insert
68 , union
69 , intersection
70 , difference
71
72 -- * Conversion
73 , toList
74 , fromList
75
76 -- * Serialization
77 , fromBitmap
78 , toBitmap
79
80 -- * Piece selection
81 , Selector
82 , selector
83 , strategyClass
84
85 , strictFirst
86 , strictLast
87 , rarestFirst
88 , randomFirst
89 , endGame
90 ) where
91
92import Control.Monad
93import Control.Monad.ST
94import Data.ByteString (ByteString)
95import qualified Data.ByteString as B
96import qualified Data.ByteString.Lazy as Lazy
97import Data.Vector.Unboxed (Vector)
98import qualified Data.Vector.Unboxed as V
99import qualified Data.Vector.Unboxed.Mutable as VM
100import Data.IntervalSet (IntSet)
101import qualified Data.IntervalSet as S
102import qualified Data.IntervalSet.ByteString as S
103import Data.List (foldl')
104import Data.Monoid
105import Data.Ratio
106
107import Data.Torrent
108
109-- TODO cache some operations
110
111-- | Bitfields are represented just as integer sets but with a restriction:
112-- each integer in the set should be within the given interval. The greatest
113-- lower bound of the interval must be zero, so intervals may be specified by
114-- providing a maximum set size. For example, a bitfield of size 10 might
115-- contain only indices in interval [0..9].
116--
117-- By convention, we use the following aliases for Int:
118--
119-- [ PieceIx ] an Int member of the Bitfield.
120--
121-- [ PieceCount ] maximum set size for a Bitfield.
122data Bitfield = Bitfield {
123 bfSize :: !PieceCount
124 , bfSet :: !IntSet
125 } deriving (Show, Read, Eq)
126
127-- Invariants: all elements of bfSet lie in [0..bfSize - 1];
128
129instance Monoid Bitfield where
130 {-# SPECIALIZE instance Monoid Bitfield #-}
131 mempty = haveNone 0
132 mappend = union
133 mconcat = unions
134
135{-----------------------------------------------------------------------
136 Construction
137-----------------------------------------------------------------------}
138
139-- | The empty bitfield of the given size.
140haveNone :: PieceCount -> Bitfield
141haveNone s = Bitfield s S.empty
142
143-- | The full bitfield containing all piece indices for the given size.
144haveAll :: PieceCount -> Bitfield
145haveAll s = Bitfield s (S.interval 0 (s - 1))
146
147-- | Insert the index in the set ignoring out of range indices.
148have :: PieceIx -> Bitfield -> Bitfield
149have ix Bitfield {..}
150 | 0 <= ix && ix < bfSize = Bitfield bfSize (S.insert ix bfSet)
151 | otherwise = Bitfield bfSize bfSet
152
153singleton :: PieceIx -> PieceCount -> Bitfield
154singleton ix pc = have ix (haveNone pc)
155
156-- | Assign new size to bitfield. FIXME Normally, size should be only
157-- decreased, otherwise exception raised.
158adjustSize :: PieceCount -> Bitfield -> Bitfield
159adjustSize s Bitfield {..} = Bitfield s bfSet
160
161-- | NOTE: for internal use only
162interval :: PieceCount -> PieceIx -> PieceIx -> Bitfield
163interval pc a b = Bitfield pc (S.interval a b)
164
165{-----------------------------------------------------------------------
166 Query
167-----------------------------------------------------------------------}
168
169-- | Test if bitifield have no one index: peer do not have anything.
170null :: Bitfield -> Bool
171null Bitfield {..} = S.null bfSet
172
173-- | Test if bitfield have all pieces.
174full :: Bitfield -> Bool
175full Bitfield {..} = S.size bfSet == bfSize
176
177-- | Count of peer have pieces.
178haveCount :: Bitfield -> PieceCount
179haveCount = S.size . bfSet
180
181-- | Total count of pieces and its indices.
182totalCount :: Bitfield -> PieceCount
183totalCount = bfSize
184
185-- | Ratio of /have/ piece count to the /total/ piece count.
186--
187-- > forall bf. 0 <= completeness bf <= 1
188--
189completeness :: Bitfield -> Ratio PieceCount
190completeness b = haveCount b % totalCount b
191
192inRange :: PieceIx -> Bitfield -> Bool
193inRange ix Bitfield {..} = 0 <= ix && ix < bfSize
194
195member :: PieceIx -> Bitfield -> Bool
196member ix bf @ Bitfield {..}
197 | ix `inRange` bf = ix `S.member` bfSet
198 | otherwise = False
199
200notMember :: PieceIx -> Bitfield -> Bool
201notMember ix bf @ Bitfield {..}
202 | ix `inRange` bf = ix `S.notMember` bfSet
203 | otherwise = True
204
205-- | Find first available piece index.
206findMin :: Bitfield -> PieceIx
207findMin = S.findMin . bfSet
208{-# INLINE findMin #-}
209
210-- | Find last available piece index.
211findMax :: Bitfield -> PieceIx
212findMax = S.findMax . bfSet
213{-# INLINE findMax #-}
214
215-- | Check if all pieces from first bitfield present if the second bitfield
216isSubsetOf :: Bitfield -> Bitfield -> Bool
217isSubsetOf a b = bfSet a `S.isSubsetOf` bfSet b
218{-# INLINE isSubsetOf #-}
219
220-- | Resulting bitfield includes only missing pieces.
221complement :: Bitfield -> Bitfield
222complement Bitfield {..} = Bitfield
223 { bfSet = uni `S.difference` bfSet
224 , bfSize = bfSize
225 }
226 where
227 Bitfield _ uni = haveAll bfSize
228{-# INLINE complement #-}
229
230{-----------------------------------------------------------------------
231-- Availability
232-----------------------------------------------------------------------}
233
234-- | Frequencies are needed in piece selection startegies which use
235-- availability quantity to find out the optimal next piece index to
236-- download.
237type Frequency = Int
238
239-- TODO rename to availability
240-- | How many times each piece index occur in the given bitfield set.
241frequencies :: [Bitfield] -> Vector Frequency
242frequencies [] = V.fromList []
243frequencies xs = runST $ do
244 v <- VM.new size
245 VM.set v 0
246 forM_ xs $ \ Bitfield {..} -> do
247 forM_ (S.toList bfSet) $ \ x -> do
248 fr <- VM.read v x
249 VM.write v x (succ fr)
250 V.unsafeFreeze v
251 where
252 size = maximum (map bfSize xs)
253
254-- TODO it seems like this operation is veeery slow
255
256-- | Find least available piece index. If no piece available return
257-- 'Nothing'.
258rarest :: [Bitfield] -> Maybe PieceIx
259rarest xs
260 | V.null freqMap = Nothing
261 | otherwise
262 = Just $ fst $ V.ifoldr' minIx (0, freqMap V.! 0) freqMap
263 where
264 freqMap = frequencies xs
265 {-# NOINLINE freqMap #-}
266
267 minIx :: PieceIx -> Frequency
268 -> (PieceIx, Frequency)
269 -> (PieceIx, Frequency)
270 minIx ix fr acc@(_, fra)
271 | fr < fra && fr > 0 = (ix, fr)
272 | otherwise = acc
273
274
275{-----------------------------------------------------------------------
276 Combine
277-----------------------------------------------------------------------}
278
279insert :: PieceIx -> Bitfield -> Bitfield
280insert pix bf @ Bitfield {..}
281 | 0 <= pix && pix < bfSize = Bitfield
282 { bfSet = S.insert pix bfSet
283 , bfSize = bfSize
284 }
285 | otherwise = bf
286
287-- | Find indices at least one peer have.
288union :: Bitfield -> Bitfield -> Bitfield
289union a b = {-# SCC union #-} Bitfield {
290 bfSize = bfSize a `max` bfSize b
291 , bfSet = bfSet a `S.union` bfSet b
292 }
293
294-- | Find indices both peers have.
295intersection :: Bitfield -> Bitfield -> Bitfield
296intersection a b = {-# SCC intersection #-} Bitfield {
297 bfSize = bfSize a `min` bfSize b
298 , bfSet = bfSet a `S.intersection` bfSet b
299 }
300
301-- | Find indices which have first peer but do not have the second peer.
302difference :: Bitfield -> Bitfield -> Bitfield
303difference a b = {-# SCC difference #-} Bitfield {
304 bfSize = bfSize a -- FIXME is it reasonable?
305 , bfSet = bfSet a `S.difference` bfSet b
306 }
307
308-- | Find indices the any of the peers have.
309unions :: [Bitfield] -> Bitfield
310unions = {-# SCC unions #-} foldl' union (haveNone 0)
311
312{-----------------------------------------------------------------------
313 Serialization
314-----------------------------------------------------------------------}
315
316-- | List all /have/ indexes.
317toList :: Bitfield -> [PieceIx]
318toList Bitfield {..} = S.toList bfSet
319
320-- | Make bitfield from list of /have/ indexes.
321fromList :: PieceCount -> [PieceIx] -> Bitfield
322fromList s ixs = Bitfield {
323 bfSize = s
324 , bfSet = S.splitGT (-1) $ S.splitLT s $ S.fromList ixs
325 }
326
327-- | Unpack 'Bitfield' from tightly packed bit array. Note resulting
328-- size might be more than real bitfield size, use 'adjustSize'.
329fromBitmap :: ByteString -> Bitfield
330fromBitmap bs = {-# SCC fromBitmap #-} Bitfield {
331 bfSize = B.length bs * 8
332 , bfSet = S.fromByteString bs
333 }
334{-# INLINE fromBitmap #-}
335
336-- | Pack a 'Bitfield' to tightly packed bit array.
337toBitmap :: Bitfield -> Lazy.ByteString
338toBitmap Bitfield {..} = {-# SCC toBitmap #-} Lazy.fromChunks [intsetBM, alignment]
339 where
340 byteSize = bfSize `div` 8 + if bfSize `mod` 8 == 0 then 0 else 1
341 alignment = B.replicate (byteSize - B.length intsetBM) 0
342 intsetBM = S.toByteString bfSet
343
344{-----------------------------------------------------------------------
345-- Piece selection
346-----------------------------------------------------------------------}
347
348type Selector = Bitfield -- ^ Indices of client /have/ pieces.
349 -> Bitfield -- ^ Indices of peer /have/ pieces.
350 -> [Bitfield] -- ^ Indices of other peers /have/ pieces.
351 -> Maybe PieceIx -- ^ Zero-based index of piece to request
352 -- to, if any.
353
354selector :: Selector -- ^ Selector to use at the start.
355 -> Ratio PieceCount
356 -> Selector -- ^ Selector to use after the client have
357 -- the C pieces.
358 -> Selector -- ^ Selector that changes behaviour based
359 -- on completeness.
360selector start pt ready h a xs =
361 case strategyClass pt h of
362 SCBeginning -> start h a xs
363 SCReady -> ready h a xs
364 SCEnd -> endGame h a xs
365
366data StartegyClass
367 = SCBeginning
368 | SCReady
369 | SCEnd
370 deriving (Show, Eq, Ord, Enum, Bounded)
371
372
373strategyClass :: Ratio PieceCount -> Bitfield -> StartegyClass
374strategyClass threshold = classify . completeness
375 where
376 classify c
377 | c < threshold = SCBeginning
378 | c + 1 % numerator c < 1 = SCReady
379 -- FIXME numerator have is not total count
380 | otherwise = SCEnd
381
382
383-- | Select the first available piece.
384strictFirst :: Selector
385strictFirst h a _ = Just $ findMin (difference a h)
386
387-- | Select the last available piece.
388strictLast :: Selector
389strictLast h a _ = Just $ findMax (difference a h)
390
391-- |
392rarestFirst :: Selector
393rarestFirst h a xs = rarest (map (intersection want) xs)
394 where
395 want = difference h a
396
397-- | In average random first is faster than rarest first strategy but
398-- only if all pieces are available.
399randomFirst :: Selector
400randomFirst = do
401-- randomIO
402 error "TODO: randomFirst"
403
404endGame :: Selector
405endGame = strictLast
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Block.hs b/bittorrent/src/Network/BitTorrent/Exchange/Block.hs
deleted file mode 100644
index bc9a3d24..00000000
--- a/bittorrent/src/Network/BitTorrent/Exchange/Block.hs
+++ /dev/null
@@ -1,369 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Blocks are used to transfer pieces.
9--
10{-# LANGUAGE BangPatterns #-}
11{-# LANGUAGE FlexibleInstances #-}
12{-# LANGUAGE TemplateHaskell #-}
13{-# LANGUAGE DeriveFunctor #-}
14{-# LANGUAGE DeriveDataTypeable #-}
15{-# LANGUAGE GeneralizedNewtypeDeriving #-}
16module Network.BitTorrent.Exchange.Block
17 ( -- * Block attributes
18 BlockOffset
19 , BlockCount
20 , BlockSize
21 , defaultTransferSize
22
23 -- * Block index
24 , BlockIx(..)
25 , blockIxRange
26
27 -- * Block data
28 , Block(..)
29 , blockIx
30 , blockSize
31 , blockRange
32 , isPiece
33 , leadingBlock
34
35 -- * Block bucket
36 , Bucket
37
38 -- ** Query
39 , Network.BitTorrent.Exchange.Block.null
40 , Network.BitTorrent.Exchange.Block.full
41 , Network.BitTorrent.Exchange.Block.size
42 , Network.BitTorrent.Exchange.Block.spans
43
44 -- ** Construction
45 , Network.BitTorrent.Exchange.Block.empty
46 , Network.BitTorrent.Exchange.Block.insert
47 , Network.BitTorrent.Exchange.Block.insertLazy
48 , Network.BitTorrent.Exchange.Block.merge
49 , Network.BitTorrent.Exchange.Block.fromList
50
51 -- ** Rendering
52 , Network.BitTorrent.Exchange.Block.toPiece
53
54 -- ** Debug
55 , Network.BitTorrent.Exchange.Block.valid
56 ) where
57
58import Prelude hiding (span)
59import Control.Applicative
60import Data.ByteString as BS hiding (span)
61import Data.ByteString.Lazy as BL hiding (span)
62import Data.ByteString.Lazy.Builder as BS
63import Data.Default
64import Data.Monoid
65import Data.List as L hiding (span)
66import Data.Serialize as S
67import Data.Typeable
68import Numeric
69import Text.PrettyPrint as PP hiding ((<>))
70import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
71
72import Data.Torrent
73
74{-----------------------------------------------------------------------
75-- Block attributes
76-----------------------------------------------------------------------}
77
78-- | Offset of a block in a piece in bytes. Should be multiple of
79-- the choosen block size.
80type BlockOffset = Int
81
82-- | Size of a block in bytes. Should be power of 2.
83--
84-- Normally block size is equal to 'defaultTransferSize'.
85--
86type BlockSize = Int
87
88-- | Number of block in a piece of a torrent. Used to distinguish
89-- block count from piece count.
90type BlockCount = Int
91
92-- | Widely used semi-official block size. Some clients can ignore if
93-- block size of BlockIx in Request message is not equal to this
94-- value.
95--
96defaultTransferSize :: BlockSize
97defaultTransferSize = 16 * 1024
98
99{-----------------------------------------------------------------------
100 Block Index
101-----------------------------------------------------------------------}
102
103-- | BlockIx correspond.
104data BlockIx = BlockIx {
105 -- | Zero-based piece index.
106 ixPiece :: {-# UNPACK #-} !PieceIx
107
108 -- | Zero-based byte offset within the piece.
109 , ixOffset :: {-# UNPACK #-} !BlockOffset
110
111 -- | Block size starting from offset.
112 , ixLength :: {-# UNPACK #-} !BlockSize
113 } deriving (Show, Eq, Typeable)
114
115-- | First block in torrent. Useful for debugging.
116instance Default BlockIx where
117 def = BlockIx 0 0 defaultTransferSize
118
119getInt :: S.Get Int
120getInt = fromIntegral <$> S.getWord32be
121{-# INLINE getInt #-}
122
123putInt :: S.Putter Int
124putInt = S.putWord32be . fromIntegral
125{-# INLINE putInt #-}
126
127instance Serialize BlockIx where
128 {-# SPECIALIZE instance Serialize BlockIx #-}
129 get = BlockIx <$> getInt
130 <*> getInt
131 <*> getInt
132 {-# INLINE get #-}
133
134 put BlockIx {..} = do
135 putInt ixPiece
136 putInt ixOffset
137 putInt ixLength
138 {-# INLINE put #-}
139
140instance Pretty BlockIx where
141 pPrint BlockIx {..} =
142 ("piece = " <> int ixPiece <> ",") <+>
143 ("offset = " <> int ixOffset <> ",") <+>
144 ("length = " <> int ixLength)
145
146-- | Get location of payload bytes in the torrent content.
147blockIxRange :: (Num a, Integral a) => PieceSize -> BlockIx -> (a, a)
148blockIxRange piSize BlockIx {..} = (offset, offset + len)
149 where
150 offset = fromIntegral piSize * fromIntegral ixPiece
151 + fromIntegral ixOffset
152 len = fromIntegral ixLength
153{-# INLINE blockIxRange #-}
154
155{-----------------------------------------------------------------------
156 Block
157-----------------------------------------------------------------------}
158
159data Block payload = Block {
160 -- | Zero-based piece index.
161 blkPiece :: {-# UNPACK #-} !PieceIx
162
163 -- | Zero-based byte offset within the piece.
164 , blkOffset :: {-# UNPACK #-} !BlockOffset
165
166 -- | Payload bytes.
167 , blkData :: !payload
168 } deriving (Show, Eq, Functor, Typeable)
169
170-- | Payload is ommitted.
171instance Pretty (Block BL.ByteString) where
172 pPrint = pPrint . blockIx
173 {-# INLINE pPrint #-}
174
175-- | Get size of block /payload/ in bytes.
176blockSize :: Block BL.ByteString -> BlockSize
177blockSize = fromIntegral . BL.length . blkData
178{-# INLINE blockSize #-}
179
180-- | Get block index of a block.
181blockIx :: Block BL.ByteString -> BlockIx
182blockIx = BlockIx <$> blkPiece <*> blkOffset <*> blockSize
183
184-- | Get location of payload bytes in the torrent content.
185blockRange :: (Num a, Integral a)
186 => PieceSize -> Block BL.ByteString -> (a, a)
187blockRange piSize = blockIxRange piSize . blockIx
188{-# INLINE blockRange #-}
189
190-- | Test if a block can be safely turned into a piece.
191isPiece :: PieceSize -> Block BL.ByteString -> Bool
192isPiece pieceLen blk @ (Block i offset _) =
193 offset == 0 && blockSize blk == pieceLen && i >= 0
194{-# INLINE isPiece #-}
195
196-- | First block in the piece.
197leadingBlock :: PieceIx -> BlockSize -> BlockIx
198leadingBlock pix blockSize = BlockIx
199 { ixPiece = pix
200 , ixOffset = 0
201 , ixLength = blockSize
202 }
203{-# INLINE leadingBlock #-}
204
205{-----------------------------------------------------------------------
206-- Bucket
207-----------------------------------------------------------------------}
208
209type Pos = Int
210type ChunkSize = Int
211
212-- | A sparse set of blocks used to represent an /in progress/ piece.
213data Bucket
214 = Nil
215 | Span {-# UNPACK #-} !ChunkSize !Bucket
216 | Fill {-# UNPACK #-} !ChunkSize !Builder !Bucket
217
218instance Show Bucket where
219 showsPrec i Nil = showString ""
220 showsPrec i (Span s xs) = showString "Span " <> showInt s
221 <> showString " " <> showsPrec i xs
222 showsPrec i (Fill s _ xs) = showString "Fill " <> showInt s
223 <> showString " " <> showsPrec i xs
224
225-- | INVARIANT: 'Nil' should appear only after 'Span' of 'Fill'.
226nilInvFailed :: a
227nilInvFailed = error "Nil: bucket invariant failed"
228
229valid :: Bucket -> Bool
230valid = check Nothing
231 where
232 check Nothing Nil = False -- see 'nilInvFailed'
233 check (Just _) _ = True
234 check prevIsSpan (Span sz xs) =
235 prevIsSpan /= Just True && -- Span n (NotSpan .. ) invariant
236 sz > 0 && -- Span is always non-empty
237 check (Just True) xs
238 check prevIsSpan (Fill sz b xs) =
239 prevIsSpan /= Just True && -- Fill n (NotFill .. ) invariant
240 sz > 0 && -- Fill is always non-empty
241 check (Just False) xs
242
243instance Pretty Bucket where
244 pPrint Nil = nilInvFailed
245 pPrint bkt = go bkt
246 where
247 go Nil = PP.empty
248 go (Span sz xs) = "Span" <+> PP.int sz <+> go xs
249 go (Fill sz b xs) = "Fill" <+> PP.int sz <+> go xs
250
251-- | Smart constructor: use it when some block is /deleted/ from
252-- bucket.
253span :: ChunkSize -> Bucket -> Bucket
254span sz (Span sz' xs) = Span (sz + sz') xs
255span sz xxs = Span sz xxs
256{-# INLINE span #-}
257
258-- | Smart constructor: use it when some block is /inserted/ to
259-- bucket.
260fill :: ChunkSize -> Builder -> Bucket -> Bucket
261fill sz b (Fill sz' b' xs) = Fill (sz + sz') (b <> b') xs
262fill sz b xxs = Fill sz b xxs
263{-# INLINE fill #-}
264
265{-----------------------------------------------------------------------
266-- Bucket queries
267-----------------------------------------------------------------------}
268
269-- | /O(1)/. Test if this bucket is empty.
270null :: Bucket -> Bool
271null Nil = nilInvFailed
272null (Span _ Nil) = True
273null _ = False
274{-# INLINE null #-}
275
276-- | /O(1)/. Test if this bucket is complete.
277full :: Bucket -> Bool
278full Nil = nilInvFailed
279full (Fill _ _ Nil) = True
280full _ = False
281{-# INLINE full #-}
282
283-- | /O(n)/. Total size of the incompleted piece.
284size :: Bucket -> PieceSize
285size Nil = nilInvFailed
286size bkt = go bkt
287 where
288 go Nil = 0
289 go (Span sz xs) = sz + go xs
290 go (Fill sz _ xs) = sz + go xs
291
292-- | /O(n)/. List incomplete blocks to download. If some block have
293-- size more than the specified 'BlockSize' then block is split into
294-- smaller blocks to satisfy given 'BlockSize'. Small (for
295-- e.g. trailing) blocks is not ignored, but returned in-order.
296spans :: BlockSize -> Bucket -> [(BlockOffset, BlockSize)]
297spans expectedSize = go 0
298 where
299 go _ Nil = []
300 go off (Span sz xs) = listChunks off sz ++ go (off + sz) xs
301 go off (Fill sz _ xs) = go (off + sz) xs
302
303 listChunks off restSize
304 | restSize <= 0 = []
305 | otherwise = (off, blkSize)
306 : listChunks (off + blkSize) (restSize - blkSize)
307 where
308 blkSize = min expectedSize restSize
309
310{-----------------------------------------------------------------------
311-- Bucket contstruction
312-----------------------------------------------------------------------}
313
314-- | /O(1)/. A new empty bucket capable to alloof specified size.
315empty :: PieceSize -> Bucket
316empty sz
317 | sz < 0 = error "empty: Bucket size must be a non-negative value"
318 | otherwise = Span sz Nil
319{-# INLINE empty #-}
320
321insertSpan :: Pos -> BS.ByteString -> ChunkSize -> Bucket -> Bucket
322insertSpan !pos !bs !span_sz !xs =
323 let pref_len = pos
324 fill_len = span_sz - pos `min` BS.length bs
325 suff_len = (span_sz - pos) - fill_len
326 in mkSpan pref_len $
327 fill fill_len (byteString (BS.take fill_len bs)) $
328 mkSpan suff_len $
329 xs
330 where
331 mkSpan 0 xs = xs
332 mkSpan sz xs = Span sz xs
333
334-- | /O(n)/. Insert a strict bytestring at specified position.
335--
336-- Best case: if blocks are inserted in sequential order, then this
337-- operation should take /O(1)/.
338--
339insert :: Pos -> BS.ByteString -> Bucket -> Bucket
340insert _ _ Nil = nilInvFailed
341insert dstPos bs bucket = go 0 bucket
342 where
343 intersects curPos sz = dstPos >= curPos && dstPos <= curPos + sz
344
345 go _ Nil = Nil
346 go curPos (Span sz xs)
347 | intersects curPos sz = insertSpan (dstPos - curPos) bs sz xs
348 | otherwise = span sz (go (curPos + sz) xs)
349 go curPos bkt @ (Fill sz br xs)
350 | intersects curPos sz = bkt
351 | otherwise = fill sz br (go (curPos + sz) xs)
352
353fromList :: PieceSize -> [(Pos, BS.ByteString)] -> Bucket
354fromList s = L.foldr (uncurry Network.BitTorrent.Exchange.Block.insert)
355 (Network.BitTorrent.Exchange.Block.empty s)
356
357-- TODO zero-copy
358insertLazy :: Pos -> BL.ByteString -> Bucket -> Bucket
359insertLazy pos bl = Network.BitTorrent.Exchange.Block.insert pos (BL.toStrict bl)
360
361-- | /O(n)/.
362merge :: Bucket -> Bucket -> Bucket
363merge = error "Bucket.merge: not implemented"
364
365-- | /O(1)/.
366toPiece :: Bucket -> Maybe BL.ByteString
367toPiece Nil = nilInvFailed
368toPiece (Fill _ b Nil) = Just (toLazyByteString b)
369toPiece _ = Nothing
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs b/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs
deleted file mode 100644
index 6804d0a2..00000000
--- a/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs
+++ /dev/null
@@ -1,1012 +0,0 @@
1-- |
2-- Module : Network.BitTorrent.Exchange.Wire
3-- Copyright : (c) Sam Truzjan 2013
4-- (c) Daniel Gröber 2013
5-- License : BSD3
6-- Maintainer : pxqr.sta@gmail.com
7-- Stability : experimental
8-- Portability : portable
9--
10-- Each peer wire connection is identified by triple @(topic,
11-- remote_addr, this_addr)@. This means that connections are the
12-- same if and only if their 'ConnectionId' are the same. Of course,
13-- you /must/ avoid duplicated connections.
14--
15-- This module control /integrity/ of data send and received.
16--
17{-# LANGUAGE DeriveDataTypeable #-}
18{-# LANGUAGE TemplateHaskell #-}
19{-# LANGUAGE MultiParamTypeClasses #-}
20{-# LANGUAGE GeneralizedNewtypeDeriving #-}
21module Network.BitTorrent.Exchange.Connection
22 ( -- * Wire
23 Connected
24 , Wire
25 , ChannelSide (..)
26
27 -- * Connection
28 , Connection
29 , connInitiatedBy
30
31 -- ** Identity
32 , connRemoteAddr
33 , connTopic
34 , connRemotePeerId
35 , connThisPeerId
36
37 -- ** Capabilities
38 , connProtocol
39 , connCaps
40 , connExtCaps
41 , connRemoteEhs
42
43 -- ** State
44 , connStatus
45 , connBitfield
46
47 -- ** Env
48 , connOptions
49 , connSession
50 , connStats
51
52 -- ** Status
53 , PeerStatus (..)
54 , ConnectionStatus (..)
55 , updateStatus
56 , statusUpdates
57 , clientStatus
58 , remoteStatus
59 , canUpload
60 , canDownload
61 , defaultUnchokeSlots
62 , defaultRechokeInterval
63
64
65 -- * Setup
66 , ConnectionPrefs (..)
67 , SessionLink (..)
68 , ConnectionConfig (..)
69
70 -- ** Initiate
71 , connectWire
72
73 -- ** Accept
74 , PendingConnection
75 , newPendingConnection
76 , pendingPeer
77 , pendingCaps
78 , pendingTopic
79 , closePending
80 , acceptWire
81
82 -- ** Post setup actions
83 , resizeBitfield
84
85 -- * Messaging
86 , recvMessage
87 , sendMessage
88 , filterQueue
89 , getMaxQueueLength
90
91 -- * Exceptions
92 , ProtocolError (..)
93 , WireFailure (..)
94 , peerPenalty
95 , isWireFailure
96 , disconnectPeer
97
98 -- * Stats
99 , ByteStats (..)
100 , FlowStats (..)
101 , ConnectionStats (..)
102
103 -- * Flood detection
104 , FloodDetector (..)
105
106 -- * Options
107 , Options (..)
108 ) where
109
110import Control.Applicative
111import Control.Concurrent hiding (yield)
112import Control.Exception
113import Control.Monad.Reader
114import Control.Monad.State
115import Control.Monad.Trans.Resource
116import Control.Lens
117import Data.ByteString as BS
118import Data.ByteString.Lazy as BSL
119import Data.Conduit as C
120import Data.Conduit.Cereal
121import Data.Conduit.List
122import Data.Conduit.Network
123import Data.Default
124import Data.IORef
125import Data.List as L
126import Data.Maybe as M
127import Data.Monoid
128import Data.Serialize as S
129import Data.Typeable
130import Network
131import Network.Socket hiding (Connected)
132import Network.Socket.ByteString as BS
133import Text.PrettyPrint as PP hiding ((<>))
134import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
135import Text.Show.Functions ()
136import System.Log.FastLogger (ToLogStr(..))
137import System.Timeout
138
139import Data.Torrent
140import Network.Address
141import Network.BitTorrent.Exchange.Bitfield as BF
142import Network.BitTorrent.Exchange.Message as Msg
143
144-- TODO handle port message?
145-- TODO handle limits?
146-- TODO filter not requested PIECE messages
147-- TODO metadata piece request flood protection
148-- TODO piece request flood protection
149-- TODO protect against flood attacks
150{-----------------------------------------------------------------------
151-- Exceptions
152-----------------------------------------------------------------------}
153
154-- | Used to specify initiator of 'ProtocolError'.
155data ChannelSide
156 = ThisPeer
157 | RemotePeer
158 deriving (Show, Eq, Enum, Bounded)
159
160instance Default ChannelSide where
161 def = ThisPeer
162
163instance Pretty ChannelSide where
164 pPrint = PP.text . show
165
166-- | A protocol errors occur when a peer violates protocol
167-- specification.
168data ProtocolError
169 -- | Protocol string should be 'BitTorrent Protocol' but remote
170 -- peer have sent a different string.
171 = InvalidProtocol ProtocolName
172
173 -- | Sent and received protocol strings do not match. Can occur
174 -- in 'connectWire' only.
175 | UnexpectedProtocol ProtocolName
176
177 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not
178 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in
179 -- 'connectWire' or 'acceptWire' only.
180 | UnexpectedTopic InfoHash
181
182 -- | Some trackers or DHT can return 'PeerId' of a peer. If a
183 -- remote peer handshaked with different 'hsPeerId' then this
184 -- exception is raised. Can occur in 'connectWire' only.
185 | UnexpectedPeerId PeerId
186
187 -- | Accepted peer have sent unknown torrent infohash in
188 -- 'hsInfoHash' field. This situation usually happen when /this/
189 -- peer have deleted the requested torrent. The error can occur in
190 -- 'acceptWire' function only.
191 | UnknownTopic InfoHash
192
193 -- | A remote peer have 'ExtExtended' enabled but did not send an
194 -- 'ExtendedHandshake' back.
195 | HandshakeRefused
196
197 -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST
198 -- be send either once or zero times, but either this peer or
199 -- remote peer send a bitfield message the second time.
200 | BitfieldAlreadySent ChannelSide
201
202 -- | Capabilities violation. For example this exception can occur
203 -- when a peer have sent 'Port' message but 'ExtDHT' is not
204 -- allowed in 'connCaps'.
205 | DisallowedMessage
206 { -- | Who sent invalid message.
207 violentSender :: ChannelSide
208
209 -- | If the 'violentSender' reconnect with this extension
210 -- enabled then he can try to send this message.
211 , extensionRequired :: Extension
212 }
213 deriving Show
214
215instance Pretty ProtocolError where
216 pPrint = PP.text . show
217
218errorPenalty :: ProtocolError -> Int
219errorPenalty (InvalidProtocol _) = 1
220errorPenalty (UnexpectedProtocol _) = 1
221errorPenalty (UnexpectedTopic _) = 1
222errorPenalty (UnexpectedPeerId _) = 1
223errorPenalty (UnknownTopic _) = 0
224errorPenalty (HandshakeRefused ) = 1
225errorPenalty (BitfieldAlreadySent _) = 1
226errorPenalty (DisallowedMessage _ _) = 1
227
228-- | Exceptions used to interrupt the current P2P session.
229data WireFailure
230 = ConnectionRefused IOError
231
232 -- | Force termination of wire connection.
233 --
234 -- Normally you should throw only this exception from event loop
235 -- using 'disconnectPeer', other exceptions are thrown
236 -- automatically by functions from this module.
237 --
238 | DisconnectPeer
239
240 -- | A peer not responding and did not send a 'KeepAlive' message
241 -- for a specified period of time.
242 | PeerDisconnected
243
244 -- | A remote peer have sent some unknown message we unable to
245 -- parse.
246 | DecodingError GetException
247
248 -- | See 'ProtocolError' for more details.
249 | ProtocolError ProtocolError
250
251 -- | A possible malicious peer have sent too many control messages
252 -- without making any progress.
253 | FloodDetected ConnectionStats
254 deriving (Show, Typeable)
255
256instance Exception WireFailure
257
258instance Pretty WireFailure where
259 pPrint = PP.text . show
260
261-- TODO
262-- data Penalty = Ban | Penalty Int
263
264peerPenalty :: WireFailure -> Int
265peerPenalty DisconnectPeer = 0
266peerPenalty PeerDisconnected = 0
267peerPenalty (DecodingError _) = 1
268peerPenalty (ProtocolError e) = errorPenalty e
269peerPenalty (FloodDetected _) = 1
270
271-- | Do nothing with exception, used with 'handle' or 'try'.
272isWireFailure :: Monad m => WireFailure -> m ()
273isWireFailure _ = return ()
274
275protocolError :: MonadThrow m => ProtocolError -> m a
276protocolError = monadThrow . ProtocolError
277
278{-----------------------------------------------------------------------
279-- Stats
280-----------------------------------------------------------------------}
281
282-- | Message stats in one direction.
283data FlowStats = FlowStats
284 { -- | Number of the messages sent or received.
285 messageCount :: {-# UNPACK #-} !Int
286 -- | Sum of byte sequences of all messages.
287 , messageBytes :: {-# UNPACK #-} !ByteStats
288 } deriving Show
289
290instance Pretty FlowStats where
291 pPrint FlowStats {..} =
292 PP.int messageCount <+> "messages" $+$
293 pPrint messageBytes
294
295-- | Zeroed stats.
296instance Default FlowStats where
297 def = FlowStats 0 def
298
299-- | Monoid under addition.
300instance Monoid FlowStats where
301 mempty = def
302 mappend a b = FlowStats
303 { messageBytes = messageBytes a <> messageBytes b
304 , messageCount = messageCount a + messageCount b
305 }
306
307-- | Find average length of byte sequences per message.
308avgByteStats :: FlowStats -> ByteStats
309avgByteStats (FlowStats n ByteStats {..}) = ByteStats
310 { overhead = overhead `quot` n
311 , control = control `quot` n
312 , payload = payload `quot` n
313 }
314
315-- | Message stats in both directions. This data can be retrieved
316-- using 'getStats' function.
317--
318-- Note that this stats is completely different from
319-- 'Data.Torrent.Progress.Progress': payload bytes not necessary
320-- equal to downloaded\/uploaded bytes since a peer can send a
321-- broken block.
322--
323data ConnectionStats = ConnectionStats
324 { -- | Received messages stats.
325 incomingFlow :: !FlowStats
326 -- | Sent messages stats.
327 , outcomingFlow :: !FlowStats
328 } deriving Show
329
330instance Pretty ConnectionStats where
331 pPrint ConnectionStats {..} = vcat
332 [ "Recv:" <+> pPrint incomingFlow
333 , "Sent:" <+> pPrint outcomingFlow
334 , "Both:" <+> pPrint (incomingFlow <> outcomingFlow)
335 ]
336
337-- | Zeroed stats.
338instance Default ConnectionStats where
339 def = ConnectionStats def def
340
341-- | Monoid under addition.
342instance Monoid ConnectionStats where
343 mempty = def
344 mappend a b = ConnectionStats
345 { incomingFlow = incomingFlow a <> incomingFlow b
346 , outcomingFlow = outcomingFlow a <> outcomingFlow b
347 }
348
349-- | Aggregate one more message stats in the /specified/ direction.
350addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats
351addStats ThisPeer x s = s { outcomingFlow = (FlowStats 1 x) <> (outcomingFlow s) }
352addStats RemotePeer x s = s { incomingFlow = (FlowStats 1 x) <> (incomingFlow s) }
353
354-- | Sum of overhead and control bytes in both directions.
355wastedBytes :: ConnectionStats -> Int
356wastedBytes ConnectionStats {..} = overhead + control
357 where
358 FlowStats _ ByteStats {..} = incomingFlow <> outcomingFlow
359
360-- | Sum of payload bytes in both directions.
361payloadBytes :: ConnectionStats -> Int
362payloadBytes ConnectionStats {..} =
363 payload (messageBytes (incomingFlow <> outcomingFlow))
364
365-- | Sum of any bytes in both directions.
366transmittedBytes :: ConnectionStats -> Int
367transmittedBytes ConnectionStats {..} =
368 byteLength (messageBytes (incomingFlow <> outcomingFlow))
369
370{-----------------------------------------------------------------------
371-- Flood protection
372-----------------------------------------------------------------------}
373
374defaultFloodFactor :: Int
375defaultFloodFactor = 1
376
377-- | This is a very permissive value, connection setup usually takes
378-- around 10-100KB, including both directions.
379defaultFloodThreshold :: Int
380defaultFloodThreshold = 2 * 1024 * 1024
381
382-- | A flood detection function.
383type Detector stats = Int -- ^ Factor;
384 -> Int -- ^ Threshold;
385 -> stats -- ^ Stats to analyse;
386 -> Bool -- ^ Is this a flooded connection?
387
388defaultDetector :: Detector ConnectionStats
389defaultDetector factor threshold s =
390 transmittedBytes s > threshold &&
391 factor * wastedBytes s > payloadBytes s
392
393-- | Flood detection is used to protect /this/ peer against a /remote/
394-- malicious peer sending meaningless control messages.
395data FloodDetector = FloodDetector
396 { -- | Max ratio of payload bytes to control bytes.
397 floodFactor :: {-# UNPACK #-} !Int
398
399 -- | Max count of bytes connection /setup/ can take including
400 -- 'Handshake', 'ExtendedHandshake', 'Bitfield', 'Have' and 'Port'
401 -- messages. This value is used to avoid false positives at the
402 -- connection initialization.
403 , floodThreshold :: {-# UNPACK #-} !Int
404
405 -- | Flood predicate on the /current/ 'ConnectionStats'.
406 , floodPredicate :: Detector ConnectionStats
407 } deriving Show
408
409instance Eq FloodDetector where
410 a == b = floodFactor a == floodFactor b
411 && floodThreshold a == floodThreshold b
412
413-- | Flood detector with very permissive options.
414instance Default FloodDetector where
415 def = FloodDetector
416 { floodFactor = defaultFloodFactor
417 , floodThreshold = defaultFloodThreshold
418 , floodPredicate = defaultDetector
419 }
420
421-- | This peer might drop connection if the detector gives positive answer.
422runDetector :: FloodDetector -> ConnectionStats -> Bool
423runDetector FloodDetector {..} = floodPredicate floodFactor floodThreshold
424
425{-----------------------------------------------------------------------
426-- Options
427-----------------------------------------------------------------------}
428
429-- | Various connection settings and limits.
430data Options = Options
431 { -- | How often /this/ peer should send 'KeepAlive' messages.
432 keepaliveInterval :: {-# UNPACK #-} !Int
433
434 -- | /This/ peer will drop connection if a /remote/ peer did not
435 -- send any message for this period of time.
436 , keepaliveTimeout :: {-# UNPACK #-} !Int
437
438 , requestQueueLength :: {-# UNPACK #-} !Int
439
440 -- | Used to protect against flood attacks.
441 , floodDetector :: FloodDetector
442
443 -- | Used to protect against flood attacks in /metadata
444 -- exchange/. Normally, a requesting peer should request each
445 -- 'InfoDict' piece only one time, but a malicious peer can
446 -- saturate wire with 'MetadataRequest' messages thus flooding
447 -- responding peer.
448 --
449 -- This value set upper bound for number of 'MetadataRequests'
450 -- for each piece.
451 --
452 , metadataFactor :: {-# UNPACK #-} !Int
453
454 -- | Used to protect against out-of-memory attacks: malicious peer
455 -- can claim that 'totalSize' is, say, 100TB and send some random
456 -- data instead of infodict pieces. Since requesting peer unable
457 -- to check not completed infodict via the infohash, the
458 -- accumulated pieces will allocate the all available memory.
459 --
460 -- This limit set upper bound for 'InfoDict' size. See
461 -- 'ExtendedMetadata' for more info.
462 --
463 , maxInfoDictSize :: {-# UNPACK #-} !Int
464 } deriving (Show, Eq)
465
466-- | Permissive default parameters, most likely you don't need to
467-- change them.
468instance Default Options where
469 def = Options
470 { keepaliveInterval = defaultKeepAliveInterval
471 , keepaliveTimeout = defaultKeepAliveTimeout
472 , requestQueueLength = defaultRequestQueueLength
473 , floodDetector = def
474 , metadataFactor = defaultMetadataFactor
475 , maxInfoDictSize = defaultMaxInfoDictSize
476 }
477
478{-----------------------------------------------------------------------
479-- Peer status
480-----------------------------------------------------------------------}
481
482-- | Connections contain two bits of state on either end: choked or
483-- not, and interested or not.
484data PeerStatus = PeerStatus
485 { -- | Choking is a notification that no data will be sent until
486 -- unchoking happens.
487 _choking :: !Bool
488
489 -- |
490 , _interested :: !Bool
491 } deriving (Show, Eq, Ord)
492
493$(makeLenses ''PeerStatus)
494
495instance Pretty PeerStatus where
496 pPrint PeerStatus {..} =
497 pPrint (Choking _choking) <+> "and" <+> pPrint (Interested _interested)
498
499-- | Connections start out choked and not interested.
500instance Default PeerStatus where
501 def = PeerStatus True False
502
503instance Monoid PeerStatus where
504 mempty = def
505 mappend a b = PeerStatus
506 { _choking = _choking a && _choking b
507 , _interested = _interested a || _interested b
508 }
509
510-- | Can be used to update remote peer status using incoming 'Status'
511-- message.
512updateStatus :: StatusUpdate -> PeerStatus -> PeerStatus
513updateStatus (Choking b) = choking .~ b
514updateStatus (Interested b) = interested .~ b
515
516-- | Can be used to generate outcoming messages.
517statusUpdates :: PeerStatus -> PeerStatus -> [StatusUpdate]
518statusUpdates a b = M.catMaybes $
519 [ if _choking a == _choking b then Nothing
520 else Just $ Choking $ _choking b
521 , if _interested a == _interested b then Nothing
522 else Just $ Interested $ _interested b
523 ]
524
525{-----------------------------------------------------------------------
526-- Connection status
527-----------------------------------------------------------------------}
528
529-- | Status of the both endpoints.
530data ConnectionStatus = ConnectionStatus
531 { _clientStatus :: !PeerStatus
532 , _remoteStatus :: !PeerStatus
533 } deriving (Show, Eq)
534
535$(makeLenses ''ConnectionStatus)
536
537instance Pretty ConnectionStatus where
538 pPrint ConnectionStatus {..} =
539 "this " PP.<+> pPrint _clientStatus PP.$$
540 "remote" PP.<+> pPrint _remoteStatus
541
542-- | Connections start out choked and not interested.
543instance Default ConnectionStatus where
544 def = ConnectionStatus def def
545
546-- | Can the client transfer to the remote peer?
547canUpload :: ConnectionStatus -> Bool
548canUpload ConnectionStatus {..}
549 = _interested _remoteStatus && not (_choking _clientStatus)
550
551-- | Can the client transfer from the remote peer?
552canDownload :: ConnectionStatus -> Bool
553canDownload ConnectionStatus {..}
554 = _interested _clientStatus && not (_choking _remoteStatus)
555
556-- | Indicates how many peers are allowed to download from the client
557-- by default.
558defaultUnchokeSlots :: Int
559defaultUnchokeSlots = 4
560
561-- |
562defaultRechokeInterval :: Int
563defaultRechokeInterval = 10 * 1000 * 1000
564
565{-----------------------------------------------------------------------
566-- Connection
567-----------------------------------------------------------------------}
568
569data ConnectionState = ConnectionState {
570 -- | If @not (allowed ExtExtended connCaps)@ then this set is always
571 -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of
572 -- 'MessageId' to the message type for the remote peer.
573 --
574 -- Note that this value can change in current session if either
575 -- this or remote peer will initiate rehandshaking.
576 --
577 _connExtCaps :: !ExtendedCaps
578
579 -- | Current extended handshake information from the remote peer
580 , _connRemoteEhs :: !ExtendedHandshake
581
582 -- | Various stats about messages sent and received. Stats can be
583 -- used to protect /this/ peer against flood attacks.
584 --
585 -- Note that this value will change with the next sent or received
586 -- message.
587 , _connStats :: !ConnectionStats
588
589 , _connStatus :: !ConnectionStatus
590
591 -- | Bitfield of remote endpoint.
592 , _connBitfield :: !Bitfield
593 }
594
595makeLenses ''ConnectionState
596
597instance Default ConnectionState where
598 def = ConnectionState
599 { _connExtCaps = def
600 , _connRemoteEhs = def
601 , _connStats = def
602 , _connStatus = def
603 , _connBitfield = BF.haveNone 0
604 }
605
606-- | Connection keep various info about both peers.
607data Connection s = Connection
608 { connInitiatedBy :: !ChannelSide
609
610 , connRemoteAddr :: !(PeerAddr IP)
611
612 -- | /Both/ peers handshaked with this protocol string. The only
613 -- value is \"Bittorrent Protocol\" but this can be changed in
614 -- future.
615 , connProtocol :: !ProtocolName
616
617 -- | Set of enabled core extensions, i.e. the pre BEP10 extension
618 -- mechanism. This value is used to check if a message is allowed
619 -- to be sent or received.
620 , connCaps :: !Caps
621
622 -- | /Both/ peers handshaked with this infohash. A connection can
623 -- handle only one topic, use 'reconnect' to change the current
624 -- topic.
625 , connTopic :: !InfoHash
626
627 -- | Typically extracted from handshake.
628 , connRemotePeerId :: !PeerId
629
630 -- | Typically extracted from handshake.
631 , connThisPeerId :: !PeerId
632
633 -- |
634 , connOptions :: !Options
635
636 -- | Mutable connection state, see 'ConnectionState'
637 , connState :: !(IORef ConnectionState)
638
639-- -- | Max request queue length.
640-- , connMaxQueueLen :: !Int
641
642 -- | Environment data.
643 , connSession :: !s
644
645 , connChan :: !(Chan Message)
646 }
647
648instance Pretty (Connection s) where
649 pPrint Connection {..} = "Connection"
650
651instance ToLogStr (Connection s) where
652 toLogStr Connection {..} = mconcat
653 [ toLogStr (show connRemoteAddr)
654 , toLogStr (show connProtocol)
655 , toLogStr (show connCaps)
656 , toLogStr (show connTopic)
657 , toLogStr (show connRemotePeerId)
658 , toLogStr (show connThisPeerId)
659 , toLogStr (show connOptions)
660 ]
661
662-- TODO check extended messages too
663isAllowed :: Connection s -> Message -> Bool
664isAllowed Connection {..} msg
665 | Just ext <- requires msg = ext `allowed` connCaps
666 | otherwise = True
667
668{-----------------------------------------------------------------------
669-- Hanshaking
670-----------------------------------------------------------------------}
671
672sendHandshake :: Socket -> Handshake -> IO ()
673sendHandshake sock hs = sendAll sock (S.encode hs)
674
675recvHandshake :: Socket -> IO Handshake
676recvHandshake sock = do
677 header <- BS.recv sock 1
678 unless (BS.length header == 1) $
679 throw $ userError "Unable to receive handshake header."
680
681 let protocolLen = BS.head header
682 let restLen = handshakeSize protocolLen - 1
683
684 body <- BS.recv sock restLen
685 let resp = BS.cons protocolLen body
686 either (throwIO . userError) return $ S.decode resp
687
688-- | Handshaking with a peer specified by the second argument.
689--
690-- It's important to send handshake first because /accepting/ peer
691-- do not know handshake topic and will wait until /connecting/ peer
692-- will send handshake.
693--
694initiateHandshake :: Socket -> Handshake -> IO Handshake
695initiateHandshake sock hs = do
696 sendHandshake sock hs
697 recvHandshake sock
698
699data HandshakePair = HandshakePair
700 { handshakeSent :: !Handshake
701 , handshakeRecv :: !Handshake
702 } deriving (Show, Eq)
703
704validatePair :: HandshakePair -> PeerAddr IP -> IO ()
705validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp
706 [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs')
707 , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs')
708 , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs')
709 , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr)
710 , UnexpectedPeerId $ hsPeerId hs')
711 ]
712 where
713 checkProp (t, e) = unless t $ throwIO $ ProtocolError e
714
715-- | Connection state /right/ after handshaking.
716establishedStats :: HandshakePair -> ConnectionStats
717establishedStats HandshakePair {..} = ConnectionStats
718 { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent
719 , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv
720 }
721
722{-----------------------------------------------------------------------
723-- Wire
724-----------------------------------------------------------------------}
725
726-- | do not expose this so we can change it without breaking api
727newtype Connected s a = Connected { runConnected :: (ReaderT (Connection s) IO a) }
728 deriving (Functor, Applicative, Monad
729 , MonadIO, MonadReader (Connection s), MonadThrow
730 )
731
732instance MonadState ConnectionState (Connected s) where
733 get = Connected (asks connState) >>= liftIO . readIORef
734 put x = Connected (asks connState) >>= liftIO . flip writeIORef x
735
736-- | A duplex channel connected to a remote peer which keep tracks
737-- connection parameters.
738type Wire s a = ConduitM Message Message (Connected s) a
739
740{-----------------------------------------------------------------------
741-- Wrapper
742-----------------------------------------------------------------------}
743
744putStats :: ChannelSide -> Message -> Connected s ()
745putStats side msg = connStats %= addStats side (stats msg)
746
747validate :: ChannelSide -> Message -> Connected s ()
748validate side msg = do
749 caps <- asks connCaps
750 case requires msg of
751 Nothing -> return ()
752 Just ext
753 | ext `allowed` caps -> return ()
754 | otherwise -> protocolError $ DisallowedMessage side ext
755
756trackFlow :: ChannelSide -> Wire s ()
757trackFlow side = iterM $ do
758 validate side
759 putStats side
760
761{-----------------------------------------------------------------------
762-- Setup
763-----------------------------------------------------------------------}
764
765-- System.Timeout.timeout multiplier
766seconds :: Int
767seconds = 1000000
768
769sinkChan :: MonadIO m => Chan Message -> Sink Message m ()
770sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan)
771
772sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message
773sourceChan interval chan = do
774 mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan
775 yield $ fromMaybe Msg.KeepAlive mmsg
776
777-- | Normally you should use 'connectWire' or 'acceptWire'.
778runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO ()
779runWire action sock chan conn = flip runReaderT conn $ runConnected $
780 sourceSocket sock $=
781 conduitGet S.get $=
782 trackFlow RemotePeer $=
783 action $=
784 trackFlow ThisPeer C.$$
785 sinkChan chan
786
787-- | This function will block until a peer send new message. You can
788-- also use 'await'.
789recvMessage :: Wire s Message
790recvMessage = await >>= maybe (monadThrow PeerDisconnected) return
791
792-- | You can also use 'yield'.
793sendMessage :: PeerMessage msg => msg -> Wire s ()
794sendMessage msg = do
795 ecaps <- use connExtCaps
796 yield $ envelop ecaps msg
797
798getMaxQueueLength :: Connected s Int
799getMaxQueueLength = do
800 advertisedLen <- ehsQueueLength <$> use connRemoteEhs
801 defaultLen <- asks (requestQueueLength . connOptions)
802 return $ fromMaybe defaultLen advertisedLen
803
804-- | Filter pending messages from send buffer.
805filterQueue :: (Message -> Bool) -> Wire s ()
806filterQueue p = lift $ do
807 chan <- asks connChan
808 liftIO $ getChanContents chan >>= writeList2Chan chan . L.filter p
809
810-- | Forcefully terminate wire session and close socket.
811disconnectPeer :: Wire s a
812disconnectPeer = monadThrow DisconnectPeer
813
814extendedHandshake :: ExtendedCaps -> Wire s ()
815extendedHandshake caps = do
816 -- TODO add other params to the handshake
817 sendMessage $ nullExtendedHandshake caps
818 msg <- recvMessage
819 case msg of
820 Extended (EHandshake remoteEhs@(ExtendedHandshake {..})) -> do
821 connExtCaps .= (ehsCaps <> caps)
822 connRemoteEhs .= remoteEhs
823 _ -> protocolError HandshakeRefused
824
825rehandshake :: ExtendedCaps -> Wire s ()
826rehandshake caps = error "rehandshake"
827
828reconnect :: Wire s ()
829reconnect = error "reconnect"
830
831data ConnectionId = ConnectionId
832 { topic :: !InfoHash
833 , remoteAddr :: !(PeerAddr IP)
834 , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node.
835 }
836
837-- | /Preffered/ settings of wire. To get the real use 'ask'.
838data ConnectionPrefs = ConnectionPrefs
839 { prefOptions :: !Options
840 , prefProtocol :: !ProtocolName
841 , prefCaps :: !Caps
842 , prefExtCaps :: !ExtendedCaps
843 } deriving (Show, Eq)
844
845instance Default ConnectionPrefs where
846 def = ConnectionPrefs
847 { prefOptions = def
848 , prefProtocol = def
849 , prefCaps = def
850 , prefExtCaps = def
851 }
852
853normalize :: ConnectionPrefs -> ConnectionPrefs
854normalize = error "normalize"
855
856-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'.
857data SessionLink s = SessionLink
858 { linkTopic :: !(InfoHash)
859 , linkPeerId :: !(PeerId)
860 , linkMetadataSize :: !(Maybe Int)
861 , linkOutputChan :: !(Maybe (Chan Message))
862 , linkSession :: !(s)
863 }
864
865data ConnectionConfig s = ConnectionConfig
866 { cfgPrefs :: !(ConnectionPrefs)
867 , cfgSession :: !(SessionLink s)
868 , cfgWire :: !(Wire s ())
869 }
870
871configHandshake :: ConnectionConfig s -> Handshake
872configHandshake ConnectionConfig {..} = Handshake
873 { hsProtocol = prefProtocol cfgPrefs
874 , hsReserved = prefCaps cfgPrefs
875 , hsInfoHash = linkTopic cfgSession
876 , hsPeerId = linkPeerId cfgSession
877 }
878
879{-----------------------------------------------------------------------
880-- Pending connections
881-----------------------------------------------------------------------}
882
883-- | Connection in half opened state. A normal usage scenario:
884--
885-- * Opened using 'newPendingConnection', usually in the listener
886-- loop;
887--
888-- * Closed using 'closePending' if 'pendingPeer' is banned,
889-- 'pendingCaps' is prohibited or pendingTopic is unknown;
890--
891-- * Accepted using 'acceptWire' otherwise.
892--
893data PendingConnection = PendingConnection
894 { pendingSock :: Socket
895 , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty;
896 , pendingCaps :: Caps -- ^ advertised by the peer;
897 , pendingTopic :: InfoHash -- ^ possible non-existent topic.
898 }
899
900-- | Reconstruct handshake sent by the remote peer.
901pendingHandshake :: PendingConnection -> Handshake
902pendingHandshake PendingConnection {..} = Handshake
903 { hsProtocol = def
904 , hsReserved = pendingCaps
905 , hsInfoHash = pendingTopic
906 , hsPeerId = fromMaybe (error "pendingHandshake: impossible")
907 (peerId pendingPeer)
908 }
909
910-- |
911--
912-- This function can throw 'WireFailure' exception.
913--
914newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection
915newPendingConnection sock addr = do
916 Handshake {..} <- recvHandshake sock
917 unless (hsProtocol == def) $ do
918 throwIO $ ProtocolError $ InvalidProtocol hsProtocol
919 return PendingConnection
920 { pendingSock = sock
921 , pendingPeer = addr { peerId = Just hsPeerId }
922 , pendingCaps = hsReserved
923 , pendingTopic = hsInfoHash
924 }
925
926-- | Release all resources associated with the given connection. Note
927-- that you /must not/ 'closePending' if you 'acceptWire'.
928closePending :: PendingConnection -> IO ()
929closePending PendingConnection {..} = do
930 close pendingSock
931
932{-----------------------------------------------------------------------
933-- Connection setup
934-----------------------------------------------------------------------}
935
936chanToSock :: Int -> Chan Message -> Socket -> IO ()
937chanToSock ka chan sock =
938 sourceChan ka chan $= conduitPut S.put C.$$ sinkSocket sock
939
940afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair
941 -> ConnectionConfig s -> IO ()
942afterHandshaking initiator addr sock
943 hpair @ (HandshakePair hs hs')
944 (ConnectionConfig
945 { cfgPrefs = ConnectionPrefs {..}
946 , cfgSession = SessionLink {..}
947 , cfgWire = wire
948 }) = do
949 let caps = hsReserved hs <> hsReserved hs'
950 cstate <- newIORef def { _connStats = establishedStats hpair }
951 chan <- maybe newChan return linkOutputChan
952 let conn = Connection {
953 connInitiatedBy = initiator
954 , connRemoteAddr = addr
955 , connProtocol = hsProtocol hs
956 , connCaps = caps
957 , connTopic = hsInfoHash hs
958 , connRemotePeerId = hsPeerId hs'
959 , connThisPeerId = hsPeerId hs
960 , connOptions = def
961 , connState = cstate
962 , connSession = linkSession
963 , connChan = chan
964 }
965
966 -- TODO make KA interval configurable
967 let kaInterval = defaultKeepAliveInterval
968 wire' = if ExtExtended `allowed` caps
969 then extendedHandshake prefExtCaps >> wire
970 else wire
971
972 bracket (forkIO (chanToSock kaInterval chan sock))
973 (killThread)
974 (\ _ -> runWire wire' sock chan conn)
975
976-- | Initiate 'Wire' connection and handshake with a peer. This function will
977-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on
978-- both sides.
979--
980-- This function can throw 'WireFailure' exception.
981--
982connectWire :: PeerAddr IP -> ConnectionConfig s -> IO ()
983connectWire addr cfg = do
984 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return
985 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do
986 let hs = configHandshake cfg
987 hs' <- initiateHandshake sock hs
988 let hpair = HandshakePair hs hs'
989 validatePair hpair addr
990 afterHandshaking ThisPeer addr sock hpair cfg
991
992-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
993-- socket. For peer listener loop the 'acceptSafe' should be
994-- prefered against 'accept'. The socket will be closed at exit.
995--
996-- This function can throw 'WireFailure' exception.
997--
998acceptWire :: PendingConnection -> ConnectionConfig s -> IO ()
999acceptWire pc @ PendingConnection {..} cfg = do
1000 bracket (return pendingSock) close $ \ _ -> do
1001 unless (linkTopic (cfgSession cfg) == pendingTopic) $ do
1002 throwIO (ProtocolError (UnexpectedTopic pendingTopic))
1003
1004 let hs = configHandshake cfg
1005 sendHandshake pendingSock hs
1006 let hpair = HandshakePair hs (pendingHandshake pc)
1007
1008 afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg
1009
1010-- | Used when size of bitfield becomes known.
1011resizeBitfield :: Int -> Connected s ()
1012resizeBitfield n = connBitfield %= adjustSize n
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Download.hs b/bittorrent/src/Network/BitTorrent/Exchange/Download.hs
deleted file mode 100644
index 981db2fb..00000000
--- a/bittorrent/src/Network/BitTorrent/Exchange/Download.hs
+++ /dev/null
@@ -1,296 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8--
9--
10{-# LANGUAGE FlexibleContexts #-}
11{-# LANGUAGE FlexibleInstances #-}
12{-# LANGUAGE MultiParamTypeClasses #-}
13{-# LANGUAGE FunctionalDependencies #-}
14{-# LANGUAGE TemplateHaskell #-}
15module Network.BitTorrent.Exchange.Download
16 ( -- * Downloading
17 Download (..)
18 , Updates
19 , runDownloadUpdates
20
21 -- ** Metadata
22 -- $metadata-download
23 , MetadataDownload
24 , metadataDownload
25
26 -- ** Content
27 -- $content-download
28 , ContentDownload
29 , contentDownload
30 ) where
31
32import Control.Applicative
33import Control.Concurrent
34import Control.Lens
35import Control.Monad.State
36import Data.BEncode as BE
37import Data.ByteString as BS
38import Data.ByteString.Lazy as BL
39import Data.Default
40import Data.List as L
41import Data.Maybe
42import Data.Map as M
43import Data.Tuple
44
45import Data.Torrent as Torrent
46import Network.Address
47import Network.BitTorrent.Exchange.Bitfield as BF
48import Network.BitTorrent.Exchange.Block as Block
49import Network.BitTorrent.Exchange.Message as Msg
50import System.Torrent.Storage (Storage, writePiece)
51
52
53{-----------------------------------------------------------------------
54-- Class
55-----------------------------------------------------------------------}
56
57type Updates s a = StateT s IO a
58
59runDownloadUpdates :: MVar s -> Updates s a -> IO a
60runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m)
61
62class Download s chunk | s -> chunk where
63 scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx]
64
65 -- |
66 scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx)
67 scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf
68
69 -- | Get number of sent requests to this peer.
70 getRequestQueueLength :: PeerAddr IP -> Updates s Int
71
72 -- | Remove all pending block requests to the remote peer. May be used
73 -- when:
74 --
75 -- * a peer closes connection;
76 --
77 -- * remote peer choked this peer;
78 --
79 -- * timeout expired.
80 --
81 resetPending :: PeerAddr IP -> Updates s ()
82
83 -- | MAY write to storage, if a new piece have been completed.
84 --
85 -- You should check if a returned by peer block is actually have
86 -- been requested and in-flight. This is needed to avoid "I send
87 -- random corrupted block" attacks.
88 pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool)
89
90{-----------------------------------------------------------------------
91-- Metadata download
92-----------------------------------------------------------------------}
93-- $metadata-download
94-- TODO
95
96data MetadataDownload = MetadataDownload
97 { _pendingPieces :: [(PeerAddr IP, PieceIx)]
98 , _bucket :: Bucket
99 , _topic :: InfoHash
100 }
101
102makeLenses ''MetadataDownload
103
104-- | Create a new scheduler for infodict of the given size.
105metadataDownload :: Int -> InfoHash -> MetadataDownload
106metadataDownload ps = MetadataDownload [] (Block.empty ps)
107
108instance Default MetadataDownload where
109 def = error "instance Default MetadataDownload"
110
111--cancelPending :: PieceIx -> Updates ()
112cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd)
113
114instance Download MetadataDownload (Piece BS.ByteString) where
115 scheduleBlock addr bf = do
116 bkt <- use bucket
117 case spans metadataPieceSize bkt of
118 [] -> return Nothing
119 ((off, _ ) : _) -> do
120 let pix = off `div` metadataPieceSize
121 pendingPieces %= ((addr, pix) :)
122 return (Just (BlockIx pix 0 metadataPieceSize))
123
124 resetPending addr = pendingPieces %= L.filter ((addr ==) . fst)
125
126 pushBlock addr Torrent.Piece {..} = do
127 p <- use pendingPieces
128 when ((addr, pieceIndex) `L.notElem` p) $
129 error "not requested"
130 cancelPending pieceIndex
131
132 bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData
133 b <- use bucket
134 case toPiece b of
135 Nothing -> return Nothing
136 Just chunks -> do
137 t <- use topic
138 case parseInfoDict (BL.toStrict chunks) t of
139 Right x -> do
140 pendingPieces .= []
141 return undefined -- (Just x)
142 Left e -> do
143 pendingPieces .= []
144 bucket .= Block.empty (Block.size b)
145 return undefined -- Nothing
146 where
147 -- todo use incremental parsing to avoid BS.concat call
148 parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict
149 parseInfoDict chunk topic =
150 case BE.decode chunk of
151 Right (infodict @ InfoDict {..})
152 | topic == idInfoHash -> return infodict
153 | otherwise -> Left "broken infodict"
154 Left err -> Left $ "unable to parse infodict " ++ err
155
156{-----------------------------------------------------------------------
157-- Content download
158-----------------------------------------------------------------------}
159-- $content-download
160--
161-- A block can have one of the following status:
162--
163-- 1) /not allowed/: Piece is not in download set.
164--
165-- 2) /waiting/: (allowed?) Block have been allowed to download,
166-- but /this/ peer did not send any 'Request' message for this
167-- block. To allow some piece use
168-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet'
169-- and 'allowPiece'.
170--
171-- 3) /inflight/: (pending?) Block have been requested but
172-- /remote/ peer did not send any 'Piece' message for this block.
173-- Related functions 'markInflight'
174--
175-- 4) /pending/: (stalled?) Block have have been downloaded
176-- Related functions 'insertBlock'.
177--
178-- Piece status:
179--
180-- 1) /assembled/: (downloaded?) All blocks in piece have been
181-- downloaded but the piece did not verified yet.
182--
183-- * Valid: go to completed;
184--
185-- * Invalid: go to waiting.
186--
187-- 2) /corrupted/:
188--
189-- 3) /downloaded/: (verified?) A piece have been successfully
190-- verified via the hash. Usually the piece should be stored to
191-- the 'System.Torrent.Storage' and /this/ peer should send 'Have'
192-- messages to the /remote/ peers.
193--
194
195data PieceEntry = PieceEntry
196 { pending :: [(PeerAddr IP, BlockIx)]
197 , stalled :: Bucket
198 }
199
200pieceEntry :: PieceSize -> PieceEntry
201pieceEntry s = PieceEntry [] (Block.empty s)
202
203isEmpty :: PieceEntry -> Bool
204isEmpty PieceEntry {..} = L.null pending && Block.null stalled
205
206_holes :: PieceIx -> PieceEntry -> [BlockIx]
207_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
208 where
209 mkBlockIx (off, sz) = BlockIx pix off sz
210
211data ContentDownload = ContentDownload
212 { inprogress :: !(Map PieceIx PieceEntry)
213 , bitfield :: !Bitfield
214 , pieceSize :: !PieceSize
215 , contentStorage :: Storage
216 }
217
218contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload
219contentDownload = ContentDownload M.empty
220
221--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates ()
222modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s
223 { inprogress = alter (g pieceSize) pix inprogress }
224 where
225 g s = h . f . fromMaybe (pieceEntry s)
226 h e
227 | isEmpty e = Nothing
228 | otherwise = Just e
229
230instance Download ContentDownload (Block BL.ByteString) where
231 scheduleBlocks n addr maskBF = do
232 ContentDownload {..} <- get
233 let wantPieces = maskBF `BF.difference` bitfield
234 let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $
235 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces)
236 inprogress
237
238 bixs <- if L.null wantBlocks
239 then do
240 mpix <- choosePiece wantPieces
241 case mpix of -- TODO return 'n' blocks
242 Nothing -> return []
243 Just pix -> return [leadingBlock pix defaultTransferSize]
244 else chooseBlocks wantBlocks n
245
246 forM_ bixs $ \ bix -> do
247 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
248 { pending = (addr, bix) : pending }
249
250 return bixs
251 where
252 -- TODO choose block nearest to pending or stalled sets to reduce disk
253 -- seeks on remote machines
254 --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx]
255 chooseBlocks xs n = return (L.take n xs)
256
257 -- TODO use selection strategies from Exchange.Selector
258 --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx)
259 choosePiece bf
260 | BF.null bf = return $ Nothing
261 | otherwise = return $ Just $ BF.findMin bf
262
263 getRequestQueueLength addr = do
264 m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress)
265 return $ L.sum $ L.map L.length $ M.elems m
266
267 resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
268 where
269 reset = fmap $ \ e -> e
270 { pending = L.filter (not . (==) addr . fst) (pending e) }
271
272 pushBlock addr blk @ Block {..} = do
273 mpe <- gets (M.lookup blkPiece . inprogress)
274 case mpe of
275 Nothing -> return Nothing
276 Just (pe @ PieceEntry {..})
277 | blockIx blk `L.notElem` fmap snd pending -> return Nothing
278 | otherwise -> do
279 let bkt' = Block.insertLazy blkOffset blkData stalled
280 case toPiece bkt' of
281 Nothing -> do
282 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
283 { pending = L.filter ((==) (blockIx blk) . snd) pending
284 , stalled = bkt'
285 }
286 return (Just False)
287
288 Just pieceData -> do
289 -- TODO verify
290 storage <- gets contentStorage
291 liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage
292 modify $ \ s @ ContentDownload {..} -> s
293 { inprogress = M.delete blkPiece inprogress
294 , bitfield = BF.insert blkPiece bitfield
295 }
296 return (Just True)
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs b/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs
deleted file mode 100644
index 30a6a607..00000000
--- a/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs
+++ /dev/null
@@ -1,62 +0,0 @@
1module Network.BitTorrent.Exchange.Manager
2 ( Options (..)
3 , Manager
4 , Handler
5 , newManager
6 , closeManager
7 ) where
8
9import Control.Concurrent
10import Control.Exception hiding (Handler)
11import Control.Monad
12import Data.Default
13import Network.Socket
14
15import Data.Torrent
16import Network.Address
17import Network.BitTorrent.Exchange.Connection hiding (Options)
18import Network.BitTorrent.Exchange.Session
19
20
21data Options = Options
22 { optBacklog :: Int
23 , optPeerAddr :: PeerAddr IP
24 } deriving (Show, Eq)
25
26instance Default Options where
27 def = Options
28 { optBacklog = maxListenQueue
29 , optPeerAddr = def
30 }
31
32data Manager = Manager
33 { listener :: !ThreadId
34 }
35
36type Handler = InfoHash -> IO Session
37
38handleNewConn :: Socket -> PeerAddr IP -> Handler -> IO ()
39handleNewConn sock addr handler = do
40 conn <- newPendingConnection sock addr
41 ses <- handler (pendingTopic conn) `onException` closePending conn
42 establish conn ses
43
44listenIncoming :: Options -> Handler -> IO ()
45listenIncoming Options {..} handler = do
46 bracket (socket AF_INET Stream defaultProtocol) close $ \ sock -> do
47 bind sock (toSockAddr optPeerAddr)
48 listen sock optBacklog
49 forever $ do
50 (conn, sockAddr) <- accept sock
51 case fromSockAddr sockAddr of
52 Nothing -> return ()
53 Just addr -> void $ forkIO $ handleNewConn sock addr handler
54
55newManager :: Options -> Handler -> IO Manager
56newManager opts handler = do
57 tid <- forkIO $ listenIncoming opts handler
58 return (Manager tid)
59
60closeManager :: Manager -> IO ()
61closeManager Manager {..} = do
62 killThread listener \ No newline at end of file
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Message.hs b/bittorrent/src/Network/BitTorrent/Exchange/Message.hs
deleted file mode 100644
index 5c096523..00000000
--- a/bittorrent/src/Network/BitTorrent/Exchange/Message.hs
+++ /dev/null
@@ -1,1237 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Normally peer to peer communication consisting of the following
9-- steps:
10--
11-- * In order to establish the connection between peers we should
12-- send 'Handshake' message. The 'Handshake' is a required message
13-- and must be the first message transmitted by the peer to the
14-- another peer. Another peer should reply with a handshake as well.
15--
16-- * Next peer might sent bitfield message, but might not. In the
17-- former case we should update bitfield peer have. Again, if we
18-- have some pieces we should send bitfield. Normally bitfield
19-- message should sent after the handshake message.
20--
21-- * Regular exchange messages. TODO docs
22--
23-- For more high level API see "Network.BitTorrent.Exchange" module.
24--
25-- For more infomation see:
26-- <https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29>
27--
28{-# LANGUAGE ViewPatterns #-}
29{-# LANGUAGE FlexibleInstances #-}
30{-# LANGUAGE FlexibleContexts #-}
31{-# LANGUAGE TypeFamilies #-}
32{-# LANGUAGE GeneralizedNewtypeDeriving #-}
33{-# LANGUAGE DeriveDataTypeable #-}
34{-# LANGUAGE TemplateHaskell #-}
35{-# LANGUAGE CPP #-}
36{-# OPTIONS -fno-warn-orphans #-}
37module Network.BitTorrent.Exchange.Message
38 ( -- * Capabilities
39 Capabilities (..)
40 , Extension (..)
41 , Caps
42
43 -- * Handshake
44 , ProtocolName
45 , Handshake(..)
46 , defaultHandshake
47 , handshakeSize
48 , handshakeMaxSize
49 , handshakeStats
50
51 -- * Stats
52 , ByteCount
53 , ByteStats (..)
54 , byteLength
55
56 -- * Messages
57 , Message (..)
58 , defaultKeepAliveTimeout
59 , defaultKeepAliveInterval
60 , PeerMessage (..)
61
62 -- ** Core messages
63 , StatusUpdate (..)
64 , Available (..)
65 , Transfer (..)
66 , defaultRequestQueueLength
67
68 -- ** Fast extension
69 , FastMessage (..)
70
71 -- ** Extension protocol
72 , ExtendedMessage (..)
73
74 -- *** Capabilities
75 , ExtendedExtension (..)
76 , ExtendedCaps (..)
77
78 -- *** Handshake
79 , ExtendedHandshake (..)
80 , defaultQueueLength
81 , nullExtendedHandshake
82
83 -- *** Metadata
84 , ExtendedMetadata (..)
85 , metadataPieceSize
86 , defaultMetadataFactor
87 , defaultMaxInfoDictSize
88 , isLastPiece
89 , isValidPiece
90 ) where
91
92import Control.Applicative
93import Control.Arrow ((&&&), (***))
94import Control.Monad (when)
95import Data.Attoparsec.ByteString.Char8 as BS
96import Data.BEncode as BE
97import Data.BEncode.BDict as BE
98import Data.BEncode.Internal as BE (ppBEncode, parser)
99import Data.BEncode.Types (BDict)
100import Data.Bits
101import Data.ByteString as BS
102import Data.ByteString.Char8 as BC
103import Data.ByteString.Lazy as BL
104import Data.Default
105import Data.List as L
106import Data.Map.Strict as M
107import Data.Maybe
108import Data.Monoid
109import Data.Ord
110import Data.Serialize as S
111import Data.String
112import Data.Text as T
113import Data.Typeable
114import Data.Word
115#if MIN_VERSION_iproute(1,7,4)
116import Data.IP hiding (fromSockAddr)
117#else
118import Data.IP
119#endif
120import Network
121import Network.Socket hiding (KeepAlive)
122import Text.PrettyPrint as PP hiding ((<>))
123import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
124
125import Data.Torrent hiding (Piece (..))
126import qualified Data.Torrent as P (Piece (..))
127import Network.Address
128import Network.BitTorrent.Exchange.Bitfield
129import Network.BitTorrent.Exchange.Block
130
131{-----------------------------------------------------------------------
132-- Capabilities
133-----------------------------------------------------------------------}
134
135-- |
136class Capabilities caps where
137 type Ext caps :: *
138
139 -- | Pack extensions to caps.
140 toCaps :: [Ext caps] -> caps
141
142 -- | Unpack extensions from caps.
143 fromCaps :: caps -> [Ext caps]
144
145 -- | Check if an extension is a member of the specified set.
146 allowed :: Ext caps -> caps -> Bool
147
148ppCaps :: Capabilities caps => Pretty (Ext caps) => caps -> Doc
149ppCaps = hcat . punctuate ", " . L.map pPrint . fromCaps
150
151{-----------------------------------------------------------------------
152-- Extensions
153-----------------------------------------------------------------------}
154
155-- | Enumeration of message extension protocols.
156--
157-- For more info see: <http://www.bittorrent.org/beps/bep_0004.html>
158--
159data Extension
160 = ExtDHT -- ^ BEP 5: allow to send PORT messages.
161 | ExtFast -- ^ BEP 6: allow to send FAST messages.
162 | ExtExtended -- ^ BEP 10: allow to send the extension protocol messages.
163 deriving (Show, Eq, Ord, Enum, Bounded)
164
165-- | Full extension names, suitable for logging.
166instance Pretty Extension where
167 pPrint ExtDHT = "Distributed Hash Table Protocol"
168 pPrint ExtFast = "Fast Extension"
169 pPrint ExtExtended = "Extension Protocol"
170
171-- | Extension bitmask as specified by BEP 4.
172extMask :: Extension -> Word64
173extMask ExtDHT = 0x01
174extMask ExtFast = 0x04
175extMask ExtExtended = 0x100000
176
177{-----------------------------------------------------------------------
178-- Capabilities
179-----------------------------------------------------------------------}
180
181-- | Capabilities is a set of 'Extension's usually sent in 'Handshake'
182-- messages.
183newtype Caps = Caps Word64
184 deriving (Show, Eq)
185
186-- | Render set of extensions as comma separated list.
187instance Pretty Caps where
188 pPrint = ppCaps
189 {-# INLINE pPrint #-}
190
191-- | The empty set.
192instance Default Caps where
193 def = Caps 0
194 {-# INLINE def #-}
195
196-- | Monoid under intersection. 'mempty' includes all known extensions.
197instance Monoid Caps where
198 mempty = toCaps [minBound .. maxBound]
199 {-# INLINE mempty #-}
200
201 mappend (Caps a) (Caps b) = Caps (a .&. b)
202 {-# INLINE mappend #-}
203
204-- | 'Handshake' compatible encoding.
205instance Serialize Caps where
206 put (Caps caps) = S.putWord64be caps
207 {-# INLINE put #-}
208
209 get = Caps <$> S.getWord64be
210 {-# INLINE get #-}
211
212instance Capabilities Caps where
213 type Ext Caps = Extension
214
215 allowed e (Caps caps) = (extMask e .&. caps) /= 0
216 {-# INLINE allowed #-}
217
218 toCaps = Caps . L.foldr (.|.) 0 . L.map extMask
219 fromCaps caps = L.filter (`allowed` caps) [minBound..maxBound]
220
221{-----------------------------------------------------------------------
222 Handshake
223-----------------------------------------------------------------------}
224
225maxProtocolNameSize :: Word8
226maxProtocolNameSize = maxBound
227
228-- | The protocol name is used to identify to the local peer which
229-- version of BTP the remote peer uses.
230newtype ProtocolName = ProtocolName BS.ByteString
231 deriving (Eq, Ord, Typeable)
232
233-- | In BTP/1.0 the name is 'BitTorrent protocol'. If this string is
234-- different from the local peers own protocol name, then the
235-- connection is to be dropped.
236instance Default ProtocolName where
237 def = ProtocolName "BitTorrent protocol"
238
239instance Show ProtocolName where
240 show (ProtocolName bs) = show bs
241
242instance Pretty ProtocolName where
243 pPrint (ProtocolName bs) = PP.text $ BC.unpack bs
244
245instance IsString ProtocolName where
246 fromString str
247 | L.length str <= fromIntegral maxProtocolNameSize
248 = ProtocolName (fromString str)
249 | otherwise = error $ "fromString: ProtocolName too long: " ++ str
250
251instance Serialize ProtocolName where
252 put (ProtocolName bs) = do
253 putWord8 $ fromIntegral $ BS.length bs
254 putByteString bs
255
256 get = do
257 len <- getWord8
258 bs <- getByteString $ fromIntegral len
259 return (ProtocolName bs)
260
261-- | Handshake message is used to exchange all information necessary
262-- to establish connection between peers.
263--
264data Handshake = Handshake {
265 -- | Identifier of the protocol. This is usually equal to 'def'.
266 hsProtocol :: ProtocolName
267
268 -- | Reserved bytes used to specify supported BEP's.
269 , hsReserved :: Caps
270
271 -- | Info hash of the info part of the metainfo file. that is
272 -- transmitted in tracker requests. Info hash of the initiator
273 -- handshake and response handshake should match, otherwise
274 -- initiator should break the connection.
275 --
276 , hsInfoHash :: InfoHash
277
278 -- | Peer id of the initiator. This is usually the same peer id
279 -- that is transmitted in tracker requests.
280 --
281 , hsPeerId :: PeerId
282
283 } deriving (Show, Eq)
284
285instance Serialize Handshake where
286 put Handshake {..} = do
287 put hsProtocol
288 put hsReserved
289 put hsInfoHash
290 put hsPeerId
291 get = Handshake <$> get <*> get <*> get <*> get
292
293-- | Show handshake protocol string, caps and fingerprint.
294instance Pretty Handshake where
295 pPrint Handshake {..}
296 = pPrint hsProtocol $$
297 pPrint hsReserved $$
298 pPrint (fingerprint hsPeerId)
299
300-- | Get handshake message size in bytes from the length of protocol
301-- string.
302handshakeSize :: Word8 -> Int
303handshakeSize n = 1 + fromIntegral n + 8 + 20 + 20
304
305-- | Maximum size of handshake message in bytes.
306handshakeMaxSize :: Int
307handshakeMaxSize = handshakeSize maxProtocolNameSize
308
309-- | Handshake with default protocol string and reserved bitmask.
310defaultHandshake :: InfoHash -> PeerId -> Handshake
311defaultHandshake = Handshake def def
312
313handshakeStats :: Handshake -> ByteStats
314handshakeStats (Handshake (ProtocolName bs) _ _ _)
315 = ByteStats 1 (BS.length bs + 8 + 20 + 20) 0
316
317{-----------------------------------------------------------------------
318-- Stats
319-----------------------------------------------------------------------}
320
321-- | Number of bytes.
322type ByteCount = Int
323
324-- | Summary of encoded message byte layout can be used to collect
325-- stats about message flow in both directions. This data can be
326-- retrieved using 'stats' function.
327data ByteStats = ByteStats
328 { -- | Number of bytes used to help encode 'control' and 'payload'
329 -- bytes: message size, message ID's, etc
330 overhead :: {-# UNPACK #-} !ByteCount
331
332 -- | Number of bytes used to exchange peers state\/options: piece
333 -- and block indexes, infohash, port numbers, peer ID\/IP, etc.
334 , control :: {-# UNPACK #-} !ByteCount
335
336 -- | Number of payload bytes: torrent data blocks and infodict
337 -- metadata.
338 , payload :: {-# UNPACK #-} !ByteCount
339 } deriving Show
340
341instance Pretty ByteStats where
342 pPrint s @ ByteStats {..} = fsep
343 [ PP.int overhead, "overhead"
344 , PP.int control, "control"
345 , PP.int payload, "payload"
346 , "bytes"
347 ] $+$ fsep
348 [ PP.int (byteLength s), "total bytes"
349 ]
350
351-- | Empty byte sequences.
352instance Default ByteStats where
353 def = ByteStats 0 0 0
354
355-- | Monoid under addition.
356instance Monoid ByteStats where
357 mempty = def
358 mappend a b = ByteStats
359 { overhead = overhead a + overhead b
360 , control = control a + control b
361 , payload = payload a + payload b
362 }
363
364-- | Sum of the all byte sequences.
365byteLength :: ByteStats -> Int
366byteLength ByteStats {..} = overhead + control + payload
367
368{-----------------------------------------------------------------------
369-- Regular messages
370-----------------------------------------------------------------------}
371
372-- | Messages which can be sent after handshaking. Minimal complete
373-- definition: 'envelop'.
374class PeerMessage a where
375 -- | Construct a message to be /sent/. Note that if 'ExtendedCaps'
376 -- do not contain mapping for this message the default
377 -- 'ExtendedMessageId' is used.
378 envelop :: ExtendedCaps -- ^ The /receiver/ extended capabilities;
379 -> a -- ^ An regular message;
380 -> Message -- ^ Enveloped message to sent.
381
382 -- | Find out the extension this message belong to. Can be used to
383 -- check if this message is allowed to send\/recv in current
384 -- session.
385 requires :: a -> Maybe Extension
386 requires _ = Nothing
387
388 -- | Get sizes of overhead\/control\/payload byte sequences of
389 -- binary message representation without encoding message to binary
390 -- bytestring.
391 --
392 -- This function should obey one law:
393 --
394 -- * 'byteLength' ('stats' msg) == 'BL.length' ('encode' msg)
395 --
396 stats :: a -> ByteStats
397 stats _ = ByteStats 4 0 0
398
399{-----------------------------------------------------------------------
400-- Status messages
401-----------------------------------------------------------------------}
402
403-- | Notification that the sender have updated its
404-- 'Network.BitTorrent.Exchange.Status.PeerStatus'.
405data StatusUpdate
406 -- | Notification that the sender will not upload data to the
407 -- receiver until unchoking happen.
408 = Choking !Bool
409
410 -- | Notification that the sender is interested (or not interested)
411 -- in any of the receiver's data pieces.
412 | Interested !Bool
413 deriving (Show, Eq, Ord, Typeable)
414
415instance Pretty StatusUpdate where
416 pPrint (Choking False) = "not choking"
417 pPrint (Choking True ) = "choking"
418 pPrint (Interested False) = "not interested"
419 pPrint (Interested True ) = "interested"
420
421instance PeerMessage StatusUpdate where
422 envelop _ = Status
423 {-# INLINE envelop #-}
424
425 stats _ = ByteStats 4 1 0
426 {-# INLINE stats #-}
427
428{-----------------------------------------------------------------------
429-- Available messages
430-----------------------------------------------------------------------}
431
432-- | Messages used to inform receiver which pieces of the torrent
433-- sender have.
434data Available =
435 -- | Zero-based index of a piece that has just been successfully
436 -- downloaded and verified via the hash.
437 Have ! PieceIx
438
439 -- | The bitfield message may only be sent immediately after the
440 -- handshaking sequence is complete, and before any other message
441 -- are sent. If client have no pieces then bitfield need not to be
442 -- sent.
443 | Bitfield !Bitfield
444 deriving (Show, Eq)
445
446instance Pretty Available where
447 pPrint (Have ix ) = "Have" <+> int ix
448 pPrint (Bitfield _ ) = "Bitfield"
449
450instance PeerMessage Available where
451 envelop _ = Available
452 {-# INLINE envelop #-}
453
454 stats (Have _) = ByteStats (4 + 1) 4 0
455 stats (Bitfield bf) = ByteStats (4 + 1) (q + trailing) 0
456 where
457 trailing = if r == 0 then 0 else 1
458 (q, r) = quotRem (totalCount bf) 8
459
460{-----------------------------------------------------------------------
461-- Transfer messages
462-----------------------------------------------------------------------}
463
464-- | Messages used to transfer 'Block's.
465data Transfer
466 -- | Request for a particular block. If a client is requested a
467 -- block that another peer do not have the peer might not answer
468 -- at all.
469 = Request ! BlockIx
470
471 -- | Response to a request for a block.
472 | Piece !(Block BL.ByteString)
473
474 -- | Used to cancel block requests. It is typically used during
475 -- "End Game".
476 | Cancel !BlockIx
477 deriving (Show, Eq)
478
479instance Pretty Transfer where
480 pPrint (Request ix ) = "Request" <+> pPrint ix
481 pPrint (Piece blk) = "Piece" <+> pPrint blk
482 pPrint (Cancel i ) = "Cancel" <+> pPrint i
483
484instance PeerMessage Transfer where
485 envelop _ = Transfer
486 {-# INLINE envelop #-}
487
488 stats (Request _ ) = ByteStats (4 + 1) (3 * 4) 0
489 stats (Piece p ) = ByteStats (4 + 1) (4 + 4 + blockSize p) 0
490 stats (Cancel _ ) = ByteStats (4 + 1) (3 * 4) 0
491
492-- TODO increase
493-- | Max number of pending 'Request's inflight.
494defaultRequestQueueLength :: Int
495defaultRequestQueueLength = 1
496
497{-----------------------------------------------------------------------
498-- Fast messages
499-----------------------------------------------------------------------}
500
501-- | BEP6 messages.
502data FastMessage =
503 -- | If a peer have all pieces it might send the 'HaveAll' message
504 -- instead of 'Bitfield' message. Used to save bandwidth.
505 HaveAll
506
507 -- | If a peer have no pieces it might send 'HaveNone' message
508 -- intead of 'Bitfield' message. Used to save bandwidth.
509 | HaveNone
510
511 -- | This is an advisory message meaning "you might like to
512 -- download this piece." Used to avoid excessive disk seeks and
513 -- amount of IO.
514 | SuggestPiece !PieceIx
515
516 -- | Notifies a requesting peer that its request will not be
517 -- satisfied.
518 | RejectRequest !BlockIx
519
520 -- | This is an advisory messsage meaning \"if you ask for this
521 -- piece, I'll give it to you even if you're choked.\" Used to
522 -- shorten starting phase.
523 | AllowedFast !PieceIx
524 deriving (Show, Eq)
525
526instance Pretty FastMessage where
527 pPrint (HaveAll ) = "Have all"
528 pPrint (HaveNone ) = "Have none"
529 pPrint (SuggestPiece pix) = "Suggest" <+> int pix
530 pPrint (RejectRequest bix) = "Reject" <+> pPrint bix
531 pPrint (AllowedFast pix) = "Allowed fast" <+> int pix
532
533instance PeerMessage FastMessage where
534 envelop _ = Fast
535 {-# INLINE envelop #-}
536
537 requires _ = Just ExtFast
538 {-# INLINE requires #-}
539
540 stats HaveAll = ByteStats 4 1 0
541 stats HaveNone = ByteStats 4 1 0
542 stats (SuggestPiece _) = ByteStats 5 4 0
543 stats (RejectRequest _) = ByteStats 5 12 0
544 stats (AllowedFast _) = ByteStats 5 4 0
545
546{-----------------------------------------------------------------------
547-- Extension protocol
548-----------------------------------------------------------------------}
549
550{-----------------------------------------------------------------------
551-- Extended capabilities
552-----------------------------------------------------------------------}
553
554data ExtendedExtension
555 = ExtMetadata -- ^ BEP 9: Extension for Peers to Send Metadata Files
556 deriving (Show, Eq, Ord, Enum, Bounded, Typeable)
557
558instance IsString ExtendedExtension where
559 fromString = fromMaybe (error msg) . fromKey . fromString
560 where
561 msg = "fromString: could not parse ExtendedExtension"
562
563instance Pretty ExtendedExtension where
564 pPrint ExtMetadata = "Extension for Peers to Send Metadata Files"
565
566fromKey :: BKey -> Maybe ExtendedExtension
567fromKey "ut_metadata" = Just ExtMetadata
568fromKey _ = Nothing
569{-# INLINE fromKey #-}
570
571toKey :: ExtendedExtension -> BKey
572toKey ExtMetadata = "ut_metadata"
573{-# INLINE toKey #-}
574
575type ExtendedMessageId = Word8
576
577extId :: ExtendedExtension -> ExtendedMessageId
578extId ExtMetadata = 1
579{-# INLINE extId #-}
580
581type ExtendedMap = Map ExtendedExtension ExtendedMessageId
582
583-- | The extension IDs must be stored for every peer, because every
584-- peer may have different IDs for the same extension.
585--
586newtype ExtendedCaps = ExtendedCaps { extendedCaps :: ExtendedMap }
587 deriving (Show, Eq)
588
589instance Pretty ExtendedCaps where
590 pPrint = ppCaps
591 {-# INLINE pPrint #-}
592
593-- | The empty set.
594instance Default ExtendedCaps where
595 def = ExtendedCaps M.empty
596
597-- | Monoid under intersection:
598--
599-- * The 'mempty' caps includes all known extensions;
600--
601-- * the 'mappend' operation is NOT commutative: it return message
602-- id from the first caps for the extensions existing in both caps.
603--
604instance Monoid ExtendedCaps where
605 mempty = toCaps [minBound..maxBound]
606 mappend (ExtendedCaps a) (ExtendedCaps b) =
607 ExtendedCaps (M.intersection a b)
608
609appendBDict :: BDict -> ExtendedMap -> ExtendedMap
610appendBDict (Cons key val xs) caps
611 | Just ext <- fromKey key
612 , Right eid <- fromBEncode val = M.insert ext eid (appendBDict xs caps)
613 | otherwise = appendBDict xs caps
614appendBDict Nil caps = caps
615
616-- | Handshake compatible encoding.
617instance BEncode ExtendedCaps where
618 toBEncode = BDict . BE.fromAscList . L.sortBy (comparing fst)
619 . L.map (toKey *** toBEncode) . M.toList . extendedCaps
620
621 fromBEncode (BDict bd) = pure $ ExtendedCaps $ appendBDict bd M.empty
622 fromBEncode _ = decodingError "ExtendedCaps"
623
624instance Capabilities ExtendedCaps where
625 type Ext ExtendedCaps = ExtendedExtension
626
627 toCaps = ExtendedCaps . M.fromList . L.map (id &&& extId)
628
629 fromCaps = M.keys . extendedCaps
630 {-# INLINE fromCaps #-}
631
632 allowed e (ExtendedCaps caps) = M.member e caps
633 {-# INLINE allowed #-}
634
635remoteMessageId :: ExtendedExtension -> ExtendedCaps -> ExtendedMessageId
636remoteMessageId ext = fromMaybe (extId ext) . M.lookup ext . extendedCaps
637
638{-----------------------------------------------------------------------
639-- Extended handshake
640-----------------------------------------------------------------------}
641
642-- | This message should be sent immediately after the standard
643-- bittorrent handshake to any peer that supports this extension
644-- protocol. Extended handshakes can be sent more than once, however
645-- an implementation may choose to ignore subsequent handshake
646-- messages.
647--
648data ExtendedHandshake = ExtendedHandshake
649 { -- | If this peer has an IPv4 interface, this is the compact
650 -- representation of that address.
651 ehsIPv4 :: Maybe HostAddress
652
653 -- | If this peer has an IPv6 interface, this is the compact
654 -- representation of that address.
655 , ehsIPv6 :: Maybe HostAddress6
656
657 -- | Dictionary of supported extension messages which maps names
658 -- of extensions to an extended message ID for each extension
659 -- message.
660 , ehsCaps :: ExtendedCaps
661
662 -- | Size of 'Data.Torrent.InfoDict' in bytes. This field should
663 -- be added if 'ExtMetadata' is enabled in current session /and/
664 -- peer have the torrent file.
665 , ehsMetadataSize :: Maybe Int
666
667 -- | Local TCP /listen/ port. Allows each side to learn about the
668 -- TCP port number of the other side.
669 , ehsPort :: Maybe PortNumber
670
671 -- | Request queue the number of outstanding 'Request' messages
672 -- this client supports without dropping any.
673 , ehsQueueLength :: Maybe Int
674
675 -- | Client name and version.
676 , ehsVersion :: Maybe Text
677
678 -- | IP of the remote end
679 , ehsYourIp :: Maybe IP
680 } deriving (Show, Eq, Typeable)
681
682extHandshakeId :: ExtendedMessageId
683extHandshakeId = 0
684
685-- | Default 'Request' queue size.
686defaultQueueLength :: Int
687defaultQueueLength = 1
688
689-- | All fields are empty.
690instance Default ExtendedHandshake where
691 def = ExtendedHandshake def def def def def def def def
692
693instance Monoid ExtendedHandshake where
694 mempty = def { ehsCaps = mempty }
695 mappend old new = ExtendedHandshake {
696 ehsCaps = ehsCaps old <> ehsCaps new,
697 ehsIPv4 = ehsIPv4 old `mergeOld` ehsIPv4 new,
698 ehsIPv6 = ehsIPv6 old `mergeOld` ehsIPv6 new,
699 ehsMetadataSize = ehsMetadataSize old `mergeNew` ehsMetadataSize new,
700 ehsPort = ehsPort old `mergeOld` ehsPort new,
701 ehsQueueLength = ehsQueueLength old `mergeNew` ehsQueueLength new,
702 ehsVersion = ehsVersion old `mergeOld` ehsVersion new,
703 ehsYourIp = ehsYourIp old `mergeOld` ehsYourIp new
704 }
705 where
706 mergeOld mold mnew = mold <|> mnew
707 mergeNew mold mnew = mnew <|> mold
708
709
710instance BEncode ExtendedHandshake where
711 toBEncode ExtendedHandshake {..} = toDict $
712 "ipv4" .=? (S.encode <$> ehsIPv4)
713 .: "ipv6" .=? (S.encode <$> ehsIPv6)
714 .: "m" .=! ehsCaps
715 .: "metadata_size" .=? ehsMetadataSize
716 .: "p" .=? ehsPort
717 .: "reqq" .=? ehsQueueLength
718 .: "v" .=? ehsVersion
719 .: "yourip" .=? (runPut <$> either put put <$> toEither <$> ehsYourIp)
720 .: endDict
721 where
722 toEither (IPv4 v4) = Left v4
723 toEither (IPv6 v6) = Right v6
724
725 fromBEncode = fromDict $ ExtendedHandshake
726 <$>? "ipv4"
727 <*>? "ipv6"
728 <*>! "m"
729 <*>? "metadata_size"
730 <*>? "p"
731 <*>? "reqq"
732 <*>? "v"
733 <*> (opt "yourip" >>= getYourIp)
734
735getYourIp :: Maybe BValue -> BE.Get (Maybe IP)
736getYourIp f =
737 return $ do
738 BString ip <- f
739 either (const Nothing) Just $
740 case BS.length ip of
741 4 -> IPv4 <$> S.decode ip
742 16 -> IPv6 <$> S.decode ip
743 _ -> fail ""
744
745instance Pretty ExtendedHandshake where
746 pPrint = PP.text . show
747
748-- | NOTE: Approximated 'stats'.
749instance PeerMessage ExtendedHandshake where
750 envelop c = envelop c . EHandshake
751 {-# INLINE envelop #-}
752
753 requires _ = Just ExtExtended
754 {-# INLINE requires #-}
755
756 stats _ = ByteStats (4 + 1 + 1) 100 {- is it ok? -} 0 -- FIXME
757 {-# INLINE stats #-}
758
759-- | Set default values and the specified 'ExtendedCaps'.
760nullExtendedHandshake :: ExtendedCaps -> ExtendedHandshake
761nullExtendedHandshake caps = ExtendedHandshake
762 { ehsIPv4 = Nothing
763 , ehsIPv6 = Nothing
764 , ehsCaps = caps
765 , ehsMetadataSize = Nothing
766 , ehsPort = Nothing
767 , ehsQueueLength = Just defaultQueueLength
768 , ehsVersion = Just $ T.pack $ render $ pPrint libFingerprint
769 , ehsYourIp = Nothing
770 }
771
772{-----------------------------------------------------------------------
773-- Metadata exchange extension
774-----------------------------------------------------------------------}
775
776-- | A peer MUST verify that any piece it sends passes the info-hash
777-- verification. i.e. until the peer has the entire metadata, it
778-- cannot run SHA-1 to verify that it yields the same hash as the
779-- info-hash.
780--
781data ExtendedMetadata
782 -- | This message requests the a specified metadata piece. The
783 -- response to this message, from a peer supporting the extension,
784 -- is either a 'MetadataReject' or a 'MetadataData' message.
785 = MetadataRequest PieceIx
786
787 -- | If sender requested a valid 'PieceIx' and receiver have the
788 -- corresponding piece then receiver should respond with this
789 -- message.
790 | MetadataData
791 { -- | A piece of 'Data.Torrent.InfoDict'.
792 piece :: P.Piece BS.ByteString
793
794 -- | This key has the same semantics as the 'ehsMetadataSize' in
795 -- the 'ExtendedHandshake' — it is size of the torrent info
796 -- dict.
797 , totalSize :: Int
798 }
799
800 -- | Peers that do not have the entire metadata MUST respond with
801 -- a reject message to any metadata request.
802 --
803 -- Clients MAY implement flood protection by rejecting request
804 -- messages after a certain number of them have been
805 -- served. Typically the number of pieces of metadata times a
806 -- factor.
807 | MetadataReject PieceIx
808
809 -- | Reserved. By specification we should ignore unknown metadata
810 -- messages.
811 | MetadataUnknown BValue
812 deriving (Show, Eq, Typeable)
813
814-- | Extended metadata message id used in 'msg_type_key'.
815type MetadataId = Int
816
817msg_type_key, piece_key, total_size_key :: BKey
818msg_type_key = "msg_type"
819piece_key = "piece"
820total_size_key = "total_size"
821
822-- | BEP9 compatible encoding.
823instance BEncode ExtendedMetadata where
824 toBEncode (MetadataRequest pix) = toDict $
825 msg_type_key .=! (0 :: MetadataId)
826 .: piece_key .=! pix
827 .: endDict
828 toBEncode (MetadataData (P.Piece pix _) totalSize) = toDict $
829 msg_type_key .=! (1 :: MetadataId)
830 .: piece_key .=! pix
831 .: total_size_key .=! totalSize
832 .: endDict
833 toBEncode (MetadataReject pix) = toDict $
834 msg_type_key .=! (2 :: MetadataId)
835 .: piece_key .=! pix
836 .: endDict
837 toBEncode (MetadataUnknown bval) = bval
838
839 fromBEncode bval = (`fromDict` bval) $ do
840 mid <- field $ req msg_type_key
841 case mid :: MetadataId of
842 0 -> MetadataRequest <$>! piece_key
843 1 -> metadataData <$>! piece_key <*>! total_size_key
844 2 -> MetadataReject <$>! piece_key
845 _ -> pure (MetadataUnknown bval)
846 where
847 metadataData pix s = MetadataData (P.Piece pix BS.empty) s
848
849-- | Piece data bytes are omitted.
850instance Pretty ExtendedMetadata where
851 pPrint (MetadataRequest pix ) = "Request" <+> PP.int pix
852 pPrint (MetadataData p t) = "Data" <+> pPrint p <+> PP.int t
853 pPrint (MetadataReject pix ) = "Reject" <+> PP.int pix
854 pPrint (MetadataUnknown bval ) = "Unknown" <+> ppBEncode bval
855
856-- | NOTE: Approximated 'stats'.
857instance PeerMessage ExtendedMetadata where
858 envelop c = envelop c . EMetadata (remoteMessageId ExtMetadata c)
859 {-# INLINE envelop #-}
860
861 requires _ = Just ExtExtended
862 {-# INLINE requires #-}
863
864 stats (MetadataRequest _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0
865 stats (MetadataData p _) = ByteStats (4 + 1 + 1) {- ~ -} 41 $
866 BS.length (P.pieceData p)
867 stats (MetadataReject _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0
868 stats (MetadataUnknown _) = ByteStats (4 + 1 + 1) {- ? -} 0 0
869
870-- | All 'Piece's in 'MetadataData' messages MUST have size equal to
871-- this value. The last trailing piece can be shorter.
872metadataPieceSize :: PieceSize
873metadataPieceSize = 16 * 1024
874
875isLastPiece :: P.Piece a -> Int -> Bool
876isLastPiece P.Piece {..} total = succ pieceIndex == pcnt
877 where
878 pcnt = q + if r > 0 then 1 else 0
879 (q, r) = quotRem total metadataPieceSize
880
881-- TODO we can check if the piece payload bytestring have appropriate
882-- length; otherwise serialization MUST fail.
883isValidPiece :: P.Piece BL.ByteString -> Int -> Bool
884isValidPiece p @ P.Piece {..} total
885 | isLastPiece p total = pieceSize p <= metadataPieceSize
886 | otherwise = pieceSize p == metadataPieceSize
887
888setMetadataPayload :: BS.ByteString -> ExtendedMetadata -> ExtendedMetadata
889setMetadataPayload bs (MetadataData (P.Piece pix _) t) =
890 MetadataData (P.Piece pix bs) t
891setMetadataPayload _ msg = msg
892
893getMetadataPayload :: ExtendedMetadata -> Maybe BS.ByteString
894getMetadataPayload (MetadataData (P.Piece _ bs) _) = Just bs
895getMetadataPayload _ = Nothing
896
897-- | Metadata BDict usually contain only 'msg_type_key', 'piece_key'
898-- and 'total_size_key' fields so it normally should take less than
899-- 100 bytes. This limit is two order of magnitude larger to be
900-- permissive to 'MetadataUnknown' messages.
901--
902-- See 'maxMessageSize' for further explanation.
903--
904maxMetadataBDictSize :: Int
905maxMetadataBDictSize = 16 * 1024
906
907maxMetadataSize :: Int
908maxMetadataSize = maxMetadataBDictSize + metadataPieceSize
909
910-- to make MetadataData constructor fields a little bit prettier we
911-- cheat here: first we read empty 'pieceData' from bdict, but then we
912-- fill that field with the actual piece data — trailing bytes of
913-- the message
914getMetadata :: Int -> S.Get ExtendedMetadata
915getMetadata len
916 | len > maxMetadataSize = fail $ parseError "size exceeded limit"
917 | otherwise = do
918 bs <- getByteString len
919 parseRes $ BS.parse BE.parser bs
920 where
921 parseError reason = "unable to parse metadata message: " ++ reason
922
923 parseRes (BS.Fail _ _ m) = fail $ parseError $ "bdict: " ++ m
924 parseRes (BS.Partial _) = fail $ parseError "bdict: not enough bytes"
925 parseRes (BS.Done piece bvalueBS)
926 | BS.length piece > metadataPieceSize
927 = fail "infodict piece: size exceeded limit"
928 | otherwise = do
929 metadata <- either (fail . parseError) pure $ fromBEncode bvalueBS
930 return $ setMetadataPayload piece metadata
931
932putMetadata :: ExtendedMetadata -> BL.ByteString
933putMetadata msg
934 | Just bs <- getMetadataPayload msg = BE.encode msg <> BL.fromStrict bs
935 | otherwise = BE.encode msg
936
937-- | Allows a requesting peer to send 2 'MetadataRequest's for the
938-- each piece.
939--
940-- See 'Network.BitTorrent.Wire.Options.metadataFactor' for
941-- explanation why do we need this limit.
942defaultMetadataFactor :: Int
943defaultMetadataFactor = 2
944
945-- | Usually torrent size do not exceed 1MB. This value limit torrent
946-- /content/ size to about 8TB.
947--
948-- See 'Network.BitTorrent.Wire.Options.maxInfoDictSize' for
949-- explanation why do we need this limit.
950defaultMaxInfoDictSize :: Int
951defaultMaxInfoDictSize = 10 * 1024 * 1024
952
953{-----------------------------------------------------------------------
954-- Extension protocol messages
955-----------------------------------------------------------------------}
956
957-- | For more info see <http://www.bittorrent.org/beps/bep_0010.html>
958data ExtendedMessage
959 = EHandshake ExtendedHandshake
960 | EMetadata ExtendedMessageId ExtendedMetadata
961 | EUnknown ExtendedMessageId BS.ByteString
962 deriving (Show, Eq, Typeable)
963
964instance Pretty ExtendedMessage where
965 pPrint (EHandshake ehs) = pPrint ehs
966 pPrint (EMetadata _ msg) = "Metadata" <+> pPrint msg
967 pPrint (EUnknown mid _ ) = "Unknown" <+> PP.text (show mid)
968
969instance PeerMessage ExtendedMessage where
970 envelop _ = Extended
971 {-# INLINE envelop #-}
972
973 requires _ = Just ExtExtended
974 {-# INLINE requires #-}
975
976 stats (EHandshake hs) = stats hs
977 stats (EMetadata _ msg) = stats msg
978 stats (EUnknown _ msg) = ByteStats (4 + 1 + 1) (BS.length msg) 0
979
980{-----------------------------------------------------------------------
981-- The message datatype
982-----------------------------------------------------------------------}
983
984type MessageId = Word8
985
986-- | Messages used in communication between peers.
987--
988-- Note: If some extensions are disabled (not present in extension
989-- mask) and client receive message used by the disabled
990-- extension then the client MUST close the connection.
991--
992data Message
993 -- | Peers may close the TCP connection if they have not received
994 -- any messages for a given period of time, generally 2
995 -- minutes. Thus, the KeepAlive message is sent to keep the
996 -- connection between two peers alive, if no /other/ message has
997 -- been sent in a given period of time.
998 = KeepAlive
999 | Status !StatusUpdate -- ^ Messages used to update peer status.
1000 | Available !Available -- ^ Messages used to inform availability.
1001 | Transfer !Transfer -- ^ Messages used to transfer 'Block's.
1002
1003 -- | Peer receiving a handshake indicating the remote peer
1004 -- supports the 'ExtDHT' should send a 'Port' message. Peers that
1005 -- receive this message should attempt to ping the node on the
1006 -- received port and IP address of the remote peer.
1007 | Port !PortNumber
1008 | Fast !FastMessage
1009 | Extended !ExtendedMessage
1010 deriving (Show, Eq)
1011
1012instance Default Message where
1013 def = KeepAlive
1014 {-# INLINE def #-}
1015
1016-- | Payload bytes are omitted.
1017instance Pretty Message where
1018 pPrint (KeepAlive ) = "Keep alive"
1019 pPrint (Status m) = "Status" <+> pPrint m
1020 pPrint (Available m) = pPrint m
1021 pPrint (Transfer m) = pPrint m
1022 pPrint (Port p) = "Port" <+> int (fromEnum p)
1023 pPrint (Fast m) = pPrint m
1024 pPrint (Extended m) = pPrint m
1025
1026instance PeerMessage Message where
1027 envelop _ = id
1028 {-# INLINE envelop #-}
1029
1030 requires KeepAlive = Nothing
1031 requires (Status _) = Nothing
1032 requires (Available _) = Nothing
1033 requires (Transfer _) = Nothing
1034 requires (Port _) = Just ExtDHT
1035 requires (Fast _) = Just ExtFast
1036 requires (Extended _) = Just ExtExtended
1037
1038 stats KeepAlive = ByteStats 4 0 0
1039 stats (Status m) = stats m
1040 stats (Available m) = stats m
1041 stats (Transfer m) = stats m
1042 stats (Port _) = ByteStats 5 2 0
1043 stats (Fast m) = stats m
1044 stats (Extended m) = stats m
1045
1046-- | PORT message.
1047instance PeerMessage PortNumber where
1048 envelop _ = Port
1049 {-# INLINE envelop #-}
1050
1051 requires _ = Just ExtDHT
1052 {-# INLINE requires #-}
1053
1054-- | How long /this/ peer should wait before dropping connection, in
1055-- seconds.
1056defaultKeepAliveTimeout :: Int
1057defaultKeepAliveTimeout = 2 * 60
1058
1059-- | How often /this/ peer should send 'KeepAlive' messages, in
1060-- seconds.
1061defaultKeepAliveInterval :: Int
1062defaultKeepAliveInterval = 60
1063
1064getInt :: S.Get Int
1065getInt = fromIntegral <$> S.getWord32be
1066{-# INLINE getInt #-}
1067
1068putInt :: S.Putter Int
1069putInt = S.putWord32be . fromIntegral
1070{-# INLINE putInt #-}
1071
1072-- | This limit should protect against "out-of-memory" attacks: if a
1073-- malicious peer have sent a long varlength message then receiver can
1074-- accumulate too long bytestring in the 'Get'.
1075--
1076-- Normal messages should never exceed this limits.
1077--
1078-- See also 'maxBitfieldSize', 'maxBlockSize' limits.
1079--
1080maxMessageSize :: Int
1081maxMessageSize = 20 + 1024 * 1024
1082
1083-- | This also limit max torrent size to:
1084--
1085-- max_bitfield_size * piece_ix_per_byte * max_piece_size =
1086-- 2 ^ 20 * 8 * 1MB =
1087-- 8TB
1088--
1089maxBitfieldSize :: Int
1090maxBitfieldSize = 1024 * 1024
1091
1092getBitfield :: Int -> S.Get Bitfield
1093getBitfield len
1094 | len > maxBitfieldSize = fail "BITFIELD message size exceeded limit"
1095 | otherwise = fromBitmap <$> getByteString len
1096
1097maxBlockSize :: Int
1098maxBlockSize = 4 * defaultTransferSize
1099
1100getBlock :: Int -> S.Get (Block BL.ByteString)
1101getBlock len
1102 | len > maxBlockSize = fail "BLOCK message size exceeded limit"
1103 | otherwise = Block <$> getInt <*> getInt
1104 <*> getLazyByteString (fromIntegral len)
1105{-# INLINE getBlock #-}
1106
1107instance Serialize Message where
1108 get = do
1109 len <- getInt
1110
1111 when (len > maxMessageSize) $ do
1112 fail "message body size exceeded the limit"
1113
1114 if len == 0 then return KeepAlive
1115 else do
1116 mid <- S.getWord8
1117 case mid of
1118 0x00 -> return $ Status (Choking True)
1119 0x01 -> return $ Status (Choking False)
1120 0x02 -> return $ Status (Interested True)
1121 0x03 -> return $ Status (Interested False)
1122 0x04 -> (Available . Have) <$> getInt
1123 0x05 -> (Available . Bitfield) <$> getBitfield (pred len)
1124 0x06 -> (Transfer . Request) <$> S.get
1125 0x07 -> (Transfer . Piece) <$> getBlock (len - 9)
1126 0x08 -> (Transfer . Cancel) <$> S.get
1127 0x09 -> Port <$> S.get
1128 0x0D -> (Fast . SuggestPiece) <$> getInt
1129 0x0E -> return $ Fast HaveAll
1130 0x0F -> return $ Fast HaveNone
1131 0x10 -> (Fast . RejectRequest) <$> S.get
1132 0x11 -> (Fast . AllowedFast) <$> getInt
1133 0x14 -> Extended <$> getExtendedMessage (pred len)
1134 _ -> do
1135 rm <- S.remaining >>= S.getBytes
1136 fail $ "unknown message ID: " ++ show mid ++ "\n"
1137 ++ "remaining available bytes: " ++ show rm
1138
1139 put KeepAlive = putInt 0
1140 put (Status msg) = putStatus msg
1141 put (Available msg) = putAvailable msg
1142 put (Transfer msg) = putTransfer msg
1143 put (Port p ) = putPort p
1144 put (Fast msg) = putFast msg
1145 put (Extended m ) = putExtendedMessage m
1146
1147statusUpdateId :: StatusUpdate -> MessageId
1148statusUpdateId (Choking choking) = fromIntegral (0 + fromEnum choking)
1149statusUpdateId (Interested choking) = fromIntegral (2 + fromEnum choking)
1150
1151putStatus :: Putter StatusUpdate
1152putStatus su = do
1153 putInt 1
1154 putWord8 (statusUpdateId su)
1155
1156putAvailable :: Putter Available
1157putAvailable (Have i) = do
1158 putInt 5
1159 putWord8 0x04
1160 putInt i
1161putAvailable (Bitfield (toBitmap -> bs)) = do
1162 putInt $ 1 + fromIntegral (BL.length bs)
1163 putWord8 0x05
1164 putLazyByteString bs
1165
1166putBlock :: Putter (Block BL.ByteString)
1167putBlock Block {..} = do
1168 putInt blkPiece
1169 putInt blkOffset
1170 putLazyByteString blkData
1171
1172putTransfer :: Putter Transfer
1173putTransfer (Request blk) = putInt 13 >> S.putWord8 0x06 >> S.put blk
1174putTransfer (Piece blk) = do
1175 putInt (9 + blockSize blk)
1176 putWord8 0x07
1177 putBlock blk
1178putTransfer (Cancel blk) = putInt 13 >> S.putWord8 0x08 >> S.put blk
1179
1180putPort :: Putter PortNumber
1181putPort p = do
1182 putInt 3
1183 putWord8 0x09
1184 put p
1185
1186putFast :: Putter FastMessage
1187putFast HaveAll = putInt 1 >> putWord8 0x0E
1188putFast HaveNone = putInt 1 >> putWord8 0x0F
1189putFast (SuggestPiece pix) = putInt 5 >> putWord8 0x0D >> putInt pix
1190putFast (RejectRequest i ) = putInt 13 >> putWord8 0x10 >> put i
1191putFast (AllowedFast i ) = putInt 5 >> putWord8 0x11 >> putInt i
1192
1193maxEHandshakeSize :: Int
1194maxEHandshakeSize = 16 * 1024
1195
1196getExtendedHandshake :: Int -> S.Get ExtendedHandshake
1197getExtendedHandshake messageSize
1198 | messageSize > maxEHandshakeSize
1199 = fail "extended handshake size exceeded limit"
1200 | otherwise = do
1201 bs <- getByteString messageSize
1202 either fail pure $ BE.decode bs
1203
1204maxEUnknownSize :: Int
1205maxEUnknownSize = 64 * 1024
1206
1207getExtendedUnknown :: Int -> S.Get BS.ByteString
1208getExtendedUnknown len
1209 | len > maxEUnknownSize = fail "unknown extended message size exceeded limit"
1210 | otherwise = getByteString len
1211
1212getExtendedMessage :: Int -> S.Get ExtendedMessage
1213getExtendedMessage messageSize = do
1214 msgId <- getWord8
1215 let msgBodySize = messageSize - 1
1216 case msgId of
1217 0 -> EHandshake <$> getExtendedHandshake msgBodySize
1218 1 -> EMetadata msgId <$> getMetadata msgBodySize
1219 _ -> EUnknown msgId <$> getExtendedUnknown msgBodySize
1220
1221-- | By spec.
1222extendedMessageId :: MessageId
1223extendedMessageId = 20
1224
1225putExt :: ExtendedMessageId -> BL.ByteString -> Put
1226putExt mid lbs = do
1227 putWord32be $ fromIntegral (1 + 1 + BL.length lbs)
1228 putWord8 extendedMessageId
1229 putWord8 mid
1230 putLazyByteString lbs
1231
1232-- NOTE: in contrast to getExtendedMessage this function put length
1233-- and message id too!
1234putExtendedMessage :: Putter ExtendedMessage
1235putExtendedMessage (EHandshake hs) = putExt extHandshakeId $ BE.encode hs
1236putExtendedMessage (EMetadata mid msg) = putExt mid $ putMetadata msg
1237putExtendedMessage (EUnknown mid bs) = putExt mid $ BL.fromStrict bs
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Session.hs b/bittorrent/src/Network/BitTorrent/Exchange/Session.hs
deleted file mode 100644
index 38a3c3a6..00000000
--- a/bittorrent/src/Network/BitTorrent/Exchange/Session.hs
+++ /dev/null
@@ -1,586 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE DeriveDataTypeable #-}
3{-# LANGUAGE FlexibleInstances #-}
4{-# LANGUAGE StandaloneDeriving #-}
5{-# LANGUAGE TemplateHaskell #-}
6{-# LANGUAGE TypeFamilies #-}
7module Network.BitTorrent.Exchange.Session
8 ( -- * Session
9 Session
10 , Event (..)
11 , LogFun
12 , sessionLogger
13
14 -- * Construction
15 , newSession
16 , closeSession
17 , withSession
18
19 -- * Connection Set
20 , connect
21 , connectSink
22 , establish
23
24 -- * Query
25 , waitMetadata
26 , takeMetadata
27 ) where
28
29import Control.Applicative
30import Control.Concurrent
31import Control.Concurrent.Chan.Split as CS
32import Control.Concurrent.STM
33import Control.Exception hiding (Handler)
34import Control.Lens
35import Control.Monad as M
36import Control.Monad.Logger
37import Control.Monad.Reader
38import Data.ByteString as BS
39import Data.ByteString.Lazy as BL
40import Data.Conduit as C (Sink, awaitForever, (=$=), ($=))
41import qualified Data.Conduit as C
42import Data.Conduit.List as C
43import Data.Map as M
44import Data.Monoid
45import Data.Set as S
46import Data.Text as T
47import Data.Typeable
48import Text.PrettyPrint hiding ((<>))
49import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
50import System.Log.FastLogger (LogStr, ToLogStr (..))
51
52import Data.BEncode as BE
53import Data.Torrent as Torrent
54import Network.BitTorrent.Internal.Types
55import Network.Address
56import Network.BitTorrent.Exchange.Bitfield as BF
57import Network.BitTorrent.Exchange.Block as Block
58import Network.BitTorrent.Exchange.Connection
59import Network.BitTorrent.Exchange.Download as D
60import Network.BitTorrent.Exchange.Message as Message
61import System.Torrent.Storage
62
63#if !MIN_VERSION_iproute(1,2,12)
64deriving instance Ord IP
65#endif
66
67{-----------------------------------------------------------------------
68-- Exceptions
69-----------------------------------------------------------------------}
70
71data ExchangeError
72 = InvalidRequest BlockIx StorageFailure
73 | CorruptedPiece PieceIx
74 deriving (Show, Typeable)
75
76instance Exception ExchangeError
77
78packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a
79packException f m = try m >>= either (throwIO . f) return
80
81{-----------------------------------------------------------------------
82-- Session state
83-----------------------------------------------------------------------}
84-- TODO unmap storage on zero connections
85
86data Cached a = Cached
87 { cachedValue :: !a
88 , cachedData :: BL.ByteString -- keep lazy
89 }
90
91cache :: BEncode a => a -> Cached a
92cache s = Cached s (BE.encode s)
93
94-- | Logger function.
95type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
96
97--data SessionStatus = Seeder | Leecher
98
99data SessionState
100 = WaitingMetadata
101 { metadataDownload :: MVar MetadataDownload
102 , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters
103 , contentRootPath :: FilePath
104 }
105 | HavingMetadata
106 { metadataCache :: Cached InfoDict
107 , contentDownload :: MVar ContentDownload
108 , contentStorage :: Storage
109 }
110
111newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState
112newSessionState rootPath (Left ih ) = do
113 WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath
114newSessionState rootPath (Right dict) = do
115 storage <- openInfoDict ReadWriteEx rootPath dict
116 download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage))
117 (piPieceLength (idPieceInfo dict))
118 storage
119 return $ HavingMetadata (cache dict) download storage
120
121closeSessionState :: SessionState -> IO ()
122closeSessionState WaitingMetadata {..} = return ()
123closeSessionState HavingMetadata {..} = close contentStorage
124
125haveMetadata :: InfoDict -> SessionState -> IO SessionState
126haveMetadata dict WaitingMetadata {..} = do
127 storage <- openInfoDict ReadWriteEx contentRootPath dict
128 download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage))
129 (piPieceLength (idPieceInfo dict))
130 storage
131 return HavingMetadata
132 { metadataCache = cache dict
133 , contentDownload = download
134 , contentStorage = storage
135 }
136haveMetadata _ s = return s
137
138{-----------------------------------------------------------------------
139-- Session
140-----------------------------------------------------------------------}
141
142data Session = Session
143 { sessionPeerId :: !(PeerId)
144 , sessionTopic :: !(InfoHash)
145 , sessionLogger :: !(LogFun)
146 , sessionEvents :: !(SendPort (Event Session))
147
148 , sessionState :: !(MVar SessionState)
149
150------------------------------------------------------------------------
151 , connectionsPrefs :: !ConnectionPrefs
152
153 -- | Connections either waiting for TCP/uTP 'connect' or waiting
154 -- for BT handshake.
155 , connectionsPending :: !(TVar (Set (PeerAddr IP)))
156
157 -- | Connections successfully handshaked and data transfer can
158 -- take place.
159 , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session)))
160
161 -- | TODO implement choking mechanism
162 , connectionsUnchoked :: [PeerAddr IP]
163
164 -- | Messages written to this channel will be sent to the all
165 -- connections, including pending connections (but right after
166 -- handshake).
167 , connectionsBroadcast :: !(Chan Message)
168 }
169
170instance EventSource Session where
171 data Event Session
172 = ConnectingTo (PeerAddr IP)
173 | ConnectionEstablished (PeerAddr IP)
174 | ConnectionAborted
175 | ConnectionClosed (PeerAddr IP)
176 | SessionClosed
177 deriving Show
178
179 listen Session {..} = CS.listen sessionEvents
180
181newSession :: LogFun
182 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
183 -> FilePath -- ^ root directory for content files;
184 -> Either InfoHash InfoDict -- ^ torrent info dictionary;
185 -> IO Session
186newSession logFun addr rootPath source = do
187 let ih = either id idInfoHash source
188 pid <- maybe genPeerId return (peerId addr)
189 eventStream <- newSendPort
190 sState <- newSessionState rootPath source
191 sStateVar <- newMVar sState
192 pSetVar <- newTVarIO S.empty
193 eSetVar <- newTVarIO M.empty
194 chan <- newChan
195 return Session
196 { sessionPeerId = pid
197 , sessionTopic = ih
198 , sessionLogger = logFun
199 , sessionEvents = eventStream
200 , sessionState = sStateVar
201 , connectionsPrefs = def
202 , connectionsPending = pSetVar
203 , connectionsEstablished = eSetVar
204 , connectionsUnchoked = []
205 , connectionsBroadcast = chan
206 }
207
208closeSession :: Session -> IO ()
209closeSession Session {..} = do
210 s <- readMVar sessionState
211 closeSessionState s
212{-
213 hSet <- atomically $ do
214 pSet <- swapTVar connectionsPending S.empty
215 eSet <- swapTVar connectionsEstablished S.empty
216 return pSet
217 mapM_ kill hSet
218-}
219
220withSession :: ()
221withSession = error "withSession"
222
223{-----------------------------------------------------------------------
224-- Logging
225-----------------------------------------------------------------------}
226
227instance MonadLogger (Connected Session) where
228 monadLoggerLog loc src lvl msg = do
229 conn <- ask
230 ses <- asks connSession
231 addr <- asks connRemoteAddr
232 let addrSrc = src <> " @ " <> T.pack (render (pPrint addr))
233 liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg)
234
235logMessage :: MonadLogger m => Message -> m ()
236logMessage msg = logDebugN $ T.pack (render (pPrint msg))
237
238logEvent :: MonadLogger m => Text -> m ()
239logEvent = logInfoN
240
241{-----------------------------------------------------------------------
242-- Connection set
243-----------------------------------------------------------------------}
244--- Connection status transition:
245---
246--- pending -> established -> finished -> closed
247--- | \|/ /|\
248--- \-------------------------------------|
249---
250--- Purpose of slots:
251--- 1) to avoid duplicates
252--- 2) connect concurrently
253---
254
255-- | Add connection to the pending set.
256pendingConnection :: PeerAddr IP -> Session -> STM Bool
257pendingConnection addr Session {..} = do
258 pSet <- readTVar connectionsPending
259 eSet <- readTVar connectionsEstablished
260 if (addr `S.member` pSet) || (addr `M.member` eSet)
261 then return False
262 else do
263 modifyTVar' connectionsPending (S.insert addr)
264 return True
265
266-- | Pending connection successfully established, add it to the
267-- established set.
268establishedConnection :: Connected Session ()
269establishedConnection = do
270 conn <- ask
271 addr <- asks connRemoteAddr
272 Session {..} <- asks connSession
273 liftIO $ atomically $ do
274 modifyTVar connectionsPending (S.delete addr)
275 modifyTVar connectionsEstablished (M.insert addr conn)
276
277-- | Either this or remote peer decided to finish conversation
278-- (conversation is alread /established/ connection), remote it from
279-- the established set.
280finishedConnection :: Connected Session ()
281finishedConnection = do
282 Session {..} <- asks connSession
283 addr <- asks connRemoteAddr
284 liftIO $ atomically $ do
285 modifyTVar connectionsEstablished $ M.delete addr
286
287-- | There are no state for this connection, remove it from the all
288-- sets.
289closedConnection :: PeerAddr IP -> Session -> STM ()
290closedConnection addr Session {..} = do
291 modifyTVar connectionsPending $ S.delete addr
292 modifyTVar connectionsEstablished $ M.delete addr
293
294getConnectionConfig :: Session -> IO (ConnectionConfig Session)
295getConnectionConfig s @ Session {..} = do
296 chan <- dupChan connectionsBroadcast
297 let sessionLink = SessionLink {
298 linkTopic = sessionTopic
299 , linkPeerId = sessionPeerId
300 , linkMetadataSize = Nothing
301 , linkOutputChan = Just chan
302 , linkSession = s
303 }
304 return ConnectionConfig
305 { cfgPrefs = connectionsPrefs
306 , cfgSession = sessionLink
307 , cfgWire = mainWire
308 }
309
310type Finalizer = IO ()
311type Runner = (ConnectionConfig Session -> IO ())
312
313runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO ()
314runConnection runner finalize addr set @ Session {..} = do
315 _ <- forkIO (action `finally` cleanup)
316 return ()
317 where
318 action = do
319 notExist <- atomically $ pendingConnection addr set
320 when notExist $ do
321 cfg <- getConnectionConfig set
322 runner cfg
323
324 cleanup = do
325 finalize
326-- runStatusUpdates status (SS.resetPending addr)
327 -- TODO Metata.resetPending addr
328 atomically $ closedConnection addr set
329
330-- | Establish connection from scratch. If this endpoint is already
331-- connected, no new connections is created. This function do not block.
332connect :: PeerAddr IP -> Session -> IO ()
333connect addr = runConnection (connectWire addr) (return ()) addr
334
335-- | Establish connection with already pre-connected endpoint. If this
336-- endpoint is already connected, no new connections is created. This
337-- function do not block.
338--
339-- 'PendingConnection' will be closed automatically, you do not need
340-- to call 'closePending'.
341establish :: PendingConnection -> Session -> IO ()
342establish conn = runConnection (acceptWire conn) (closePending conn)
343 (pendingPeer conn)
344
345-- | Conduit version of 'connect'.
346connectSink :: MonadIO m => Session -> Sink [PeerAddr IPv4] m ()
347connectSink s = C.mapM_ (liftIO . connectBatch)
348 where
349 connectBatch = M.mapM_ (\ addr -> connect (IPv4 <$> addr) s)
350
351-- | Why do we need this message?
352type BroadcastMessage = ExtendedCaps -> Message
353
354broadcast :: BroadcastMessage -> Session -> IO ()
355broadcast = error "broadcast"
356
357{-----------------------------------------------------------------------
358-- Helpers
359-----------------------------------------------------------------------}
360
361waitMVar :: MVar a -> IO ()
362waitMVar m = withMVar m (const (return ()))
363
364-- This function appear in new GHC "out of box". (moreover it is atomic)
365tryReadMVar :: MVar a -> IO (Maybe a)
366tryReadMVar m = do
367 ma <- tryTakeMVar m
368 maybe (return ()) (putMVar m) ma
369 return ma
370
371readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString)
372readBlock bix @ BlockIx {..} s = do
373 p <- packException (InvalidRequest bix) $ do readPiece ixPiece s
374 let chunk = BL.take (fromIntegral ixLength) $
375 BL.drop (fromIntegral ixOffset) (pieceData p)
376 if BL.length chunk == fromIntegral ixLength
377 then return $ Block ixPiece ixOffset chunk
378 else throwIO $ InvalidRequest bix (InvalidSize ixLength)
379
380-- |
381tryReadMetadataBlock :: PieceIx
382 -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int))
383tryReadMetadataBlock pix = do
384 Session {..} <- asks connSession
385 s <- liftIO (readMVar sessionState)
386 case s of
387 WaitingMetadata {..} -> error "tryReadMetadataBlock"
388 HavingMetadata {..} -> error "tryReadMetadataBlock"
389
390sendBroadcast :: PeerMessage msg => msg -> Wire Session ()
391sendBroadcast msg = do
392 Session {..} <- asks connSession
393 error "sendBroadcast"
394-- liftIO $ msg `broadcast` sessionConnections
395
396waitMetadata :: Session -> IO InfoDict
397waitMetadata Session {..} = do
398 s <- readMVar sessionState
399 case s of
400 WaitingMetadata {..} -> readMVar metadataCompleted
401 HavingMetadata {..} -> return (cachedValue metadataCache)
402
403takeMetadata :: Session -> IO (Maybe InfoDict)
404takeMetadata Session {..} = do
405 s <- readMVar sessionState
406 case s of
407 WaitingMetadata {..} -> return Nothing
408 HavingMetadata {..} -> return (Just (cachedValue metadataCache))
409
410{-----------------------------------------------------------------------
411-- Triggers
412-----------------------------------------------------------------------}
413
414-- | Trigger is the reaction of a handler at some event.
415type Trigger = Wire Session ()
416
417interesting :: Trigger
418interesting = do
419 addr <- asks connRemoteAddr
420 sendMessage (Interested True)
421 sendMessage (Choking False)
422 tryFillRequestQueue
423
424fillRequestQueue :: Trigger
425fillRequestQueue = do
426 maxN <- lift getMaxQueueLength
427 rbf <- use connBitfield
428 addr <- asks connRemoteAddr
429-- blks <- withStatusUpdates $ do
430-- n <- getRequestQueueLength addr
431-- scheduleBlocks addr rbf (maxN - n)
432-- mapM_ (sendMessage . Request) blks
433 return ()
434
435tryFillRequestQueue :: Trigger
436tryFillRequestQueue = do
437 allowed <- canDownload <$> use connStatus
438 when allowed $ do
439 fillRequestQueue
440
441{-----------------------------------------------------------------------
442-- Incoming message handling
443-----------------------------------------------------------------------}
444
445type Handler msg = msg -> Wire Session ()
446
447handleStatus :: Handler StatusUpdate
448handleStatus s = do
449 connStatus %= over remoteStatus (updateStatus s)
450 case s of
451 Interested _ -> return ()
452 Choking True -> do
453 addr <- asks connRemoteAddr
454-- withStatusUpdates (SS.resetPending addr)
455 return ()
456 Choking False -> tryFillRequestQueue
457
458handleAvailable :: Handler Available
459handleAvailable msg = do
460 connBitfield %= case msg of
461 Have ix -> BF.insert ix
462 Bitfield bf -> const bf
463
464 --thisBf <- getThisBitfield
465 thisBf <- undefined
466 case msg of
467 Have ix
468 | ix `BF.member` thisBf -> return ()
469 | otherwise -> interesting
470 Bitfield bf
471 | bf `BF.isSubsetOf` thisBf -> return ()
472 | otherwise -> interesting
473
474handleTransfer :: Handler Transfer
475handleTransfer (Request bix) = do
476 Session {..} <- asks connSession
477 s <- liftIO $ readMVar sessionState
478 case s of
479 WaitingMetadata {..} -> return ()
480 HavingMetadata {..} -> do
481 bitfield <- undefined -- getThisBitfield
482 upload <- canUpload <$> use connStatus
483 when (upload && ixPiece bix `BF.member` bitfield) $ do
484 blk <- liftIO $ readBlock bix contentStorage
485 sendMessage (Message.Piece blk)
486
487handleTransfer (Message.Piece blk) = do
488 Session {..} <- asks connSession
489 s <- liftIO $ readMVar sessionState
490 case s of
491 WaitingMetadata {..} -> return () -- TODO (?) break connection
492 HavingMetadata {..} -> do
493 isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage)
494 case isSuccess of
495 Nothing -> liftIO $ throwIO $ userError "block is not requested"
496 Just isCompleted -> do
497 when isCompleted $ do
498 sendBroadcast (Have (blkPiece blk))
499-- maybe send not interested
500 tryFillRequestQueue
501
502handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix))
503 where
504 transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix
505 transferResponse _ _ = False
506
507{-----------------------------------------------------------------------
508-- Metadata exchange
509-----------------------------------------------------------------------}
510-- TODO introduce new metadata exchange specific exceptions
511
512waitForMetadata :: Trigger
513waitForMetadata = do
514 Session {..} <- asks connSession
515 needFetch <- undefined --liftIO (isEmptyMVar infodict)
516 when needFetch $ do
517 canFetch <- allowed ExtMetadata <$> use connExtCaps
518 if canFetch
519 then tryRequestMetadataBlock
520 else undefined -- liftIO (waitMVar infodict)
521
522tryRequestMetadataBlock :: Trigger
523tryRequestMetadataBlock = do
524 mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock
525 case mpix of
526 Nothing -> error "tryRequestMetadataBlock"
527 Just pix -> sendMessage (MetadataRequest pix)
528
529handleMetadata :: Handler ExtendedMetadata
530handleMetadata (MetadataRequest pix) =
531 lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse
532 where
533 mkResponse Nothing = MetadataReject pix
534 mkResponse (Just (piece, total)) = MetadataData piece total
535
536handleMetadata (MetadataData {..}) = do
537 ih <- asks connTopic
538 mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih)
539 case mdict of
540 Nothing -> tryRequestMetadataBlock -- not completed, need all blocks
541 Just dict -> do -- complete, wake up payload fetch
542 Session {..} <- asks connSession
543 liftIO $ modifyMVar_ sessionState (haveMetadata dict)
544
545handleMetadata (MetadataReject pix) = do
546 lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix)
547
548handleMetadata (MetadataUnknown _ ) = do
549 logInfoN "Unknown metadata message"
550
551{-----------------------------------------------------------------------
552-- Main entry point
553-----------------------------------------------------------------------}
554
555acceptRehandshake :: ExtendedHandshake -> Trigger
556acceptRehandshake ehs = error "acceptRehandshake"
557
558handleExtended :: Handler ExtendedMessage
559handleExtended (EHandshake ehs) = acceptRehandshake ehs
560handleExtended (EMetadata _ msg) = handleMetadata msg
561handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message"
562
563handleMessage :: Handler Message
564handleMessage KeepAlive = return ()
565handleMessage (Status s) = handleStatus s
566handleMessage (Available msg) = handleAvailable msg
567handleMessage (Transfer msg) = handleTransfer msg
568handleMessage (Port n) = error "handleMessage"
569handleMessage (Fast _) = error "handleMessage"
570handleMessage (Extended msg) = handleExtended msg
571
572exchange :: Wire Session ()
573exchange = do
574 waitForMetadata
575 bf <- undefined --getThisBitfield
576 sendMessage (Bitfield bf)
577 awaitForever handleMessage
578
579mainWire :: Wire Session ()
580mainWire = do
581 lift establishedConnection
582 Session {..} <- asks connSession
583-- lift $ resizeBitfield (totalPieces storage)
584 logEvent "Connection established"
585 iterM logMessage =$= exchange =$= iterM logMessage
586 lift finishedConnection