diff options
Diffstat (limited to 'bittorrent/src/Network/BitTorrent/Exchange')
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs | 399 | ||||
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Block.hs | 369 | ||||
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Connection.hs | 1012 | ||||
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Download.hs | 296 | ||||
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Manager.hs | 62 | ||||
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Message.hs | 1232 | ||||
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Exchange/Session.hs | 586 |
7 files changed, 3956 insertions, 0 deletions
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs b/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs new file mode 100644 index 00000000..7bae3475 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Bitfield.hs | |||
@@ -0,0 +1,399 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This modules provides all necessary machinery to work with | ||
9 | -- bitfields. Bitfields are used to keep track indices of complete | ||
10 | -- pieces either this peer have or remote peer have. | ||
11 | -- | ||
12 | -- There are also commonly used piece seletion algorithms | ||
13 | -- which used to find out which one next piece to download. | ||
14 | -- Selectors considered to be used in the following order: | ||
15 | -- | ||
16 | -- * 'randomFirst' - at the start of download. | ||
17 | -- | ||
18 | -- * 'rarestFirst' - performed to avoid situation when | ||
19 | -- rarest piece is unaccessible. | ||
20 | -- | ||
21 | -- * 'endGame' - performed after a peer has requested all | ||
22 | -- the subpieces of the content. | ||
23 | -- | ||
24 | -- Note that BitTorrent protocol recommend (TODO link?) the | ||
25 | -- 'strictFirst' priority policy for /subpiece/ or /blocks/ | ||
26 | -- selection. | ||
27 | -- | ||
28 | {-# LANGUAGE CPP #-} | ||
29 | {-# LANGUAGE BangPatterns #-} | ||
30 | {-# LANGUAGE RecordWildCards #-} | ||
31 | module Network.BitTorrent.Exchange.Bitfield | ||
32 | ( -- * Bitfield | ||
33 | PieceIx | ||
34 | , PieceCount | ||
35 | , Bitfield | ||
36 | |||
37 | -- * Construction | ||
38 | , haveAll | ||
39 | , haveNone | ||
40 | , have | ||
41 | , singleton | ||
42 | , interval | ||
43 | , adjustSize | ||
44 | |||
45 | -- * Query | ||
46 | -- ** Cardinality | ||
47 | , Network.BitTorrent.Exchange.Bitfield.null | ||
48 | , Network.BitTorrent.Exchange.Bitfield.full | ||
49 | , haveCount | ||
50 | , totalCount | ||
51 | , completeness | ||
52 | |||
53 | -- ** Membership | ||
54 | , member | ||
55 | , notMember | ||
56 | , findMin | ||
57 | , findMax | ||
58 | , isSubsetOf | ||
59 | |||
60 | -- ** Availability | ||
61 | , complement | ||
62 | , Frequency | ||
63 | , frequencies | ||
64 | , rarest | ||
65 | |||
66 | -- * Combine | ||
67 | , insert | ||
68 | , union | ||
69 | , intersection | ||
70 | , difference | ||
71 | |||
72 | -- * Conversion | ||
73 | , toList | ||
74 | , fromList | ||
75 | |||
76 | -- * Serialization | ||
77 | , fromBitmap | ||
78 | , toBitmap | ||
79 | |||
80 | -- * Piece selection | ||
81 | , Selector | ||
82 | , selector | ||
83 | , strategyClass | ||
84 | |||
85 | , strictFirst | ||
86 | , strictLast | ||
87 | , rarestFirst | ||
88 | , randomFirst | ||
89 | , endGame | ||
90 | ) where | ||
91 | |||
92 | import Control.Monad | ||
93 | import Control.Monad.ST | ||
94 | import Data.ByteString (ByteString) | ||
95 | import qualified Data.ByteString as B | ||
96 | import qualified Data.ByteString.Lazy as Lazy | ||
97 | import Data.Vector.Unboxed (Vector) | ||
98 | import qualified Data.Vector.Unboxed as V | ||
99 | import qualified Data.Vector.Unboxed.Mutable as VM | ||
100 | import Data.IntervalSet (IntSet) | ||
101 | import qualified Data.IntervalSet as S | ||
102 | import qualified Data.IntervalSet.ByteString as S | ||
103 | import Data.List (foldl') | ||
104 | import Data.Monoid | ||
105 | import Data.Ratio | ||
106 | |||
107 | import Data.Torrent | ||
108 | |||
109 | -- TODO cache some operations | ||
110 | |||
111 | -- | Bitfields are represented just as integer sets but with | ||
112 | -- restriction: the each set should be within given interval (or | ||
113 | -- subset of the specified interval). Size is used to specify | ||
114 | -- interval, so bitfield of size 10 might contain only indices in | ||
115 | -- interval [0..9]. | ||
116 | -- | ||
117 | data Bitfield = Bitfield { | ||
118 | bfSize :: !PieceCount | ||
119 | , bfSet :: !IntSet | ||
120 | } deriving (Show, Read, Eq) | ||
121 | |||
122 | -- Invariants: all elements of bfSet lie in [0..bfSize - 1]; | ||
123 | |||
124 | instance Monoid Bitfield where | ||
125 | {-# SPECIALIZE instance Monoid Bitfield #-} | ||
126 | mempty = haveNone 0 | ||
127 | mappend = union | ||
128 | mconcat = unions | ||
129 | |||
130 | {----------------------------------------------------------------------- | ||
131 | Construction | ||
132 | -----------------------------------------------------------------------} | ||
133 | |||
134 | -- | The empty bitfield of the given size. | ||
135 | haveNone :: PieceCount -> Bitfield | ||
136 | haveNone s = Bitfield s S.empty | ||
137 | |||
138 | -- | The full bitfield containing all piece indices for the given size. | ||
139 | haveAll :: PieceCount -> Bitfield | ||
140 | haveAll s = Bitfield s (S.interval 0 (s - 1)) | ||
141 | |||
142 | -- | Insert the index in the set ignoring out of range indices. | ||
143 | have :: PieceIx -> Bitfield -> Bitfield | ||
144 | have ix Bitfield {..} | ||
145 | | 0 <= ix && ix < bfSize = Bitfield bfSize (S.insert ix bfSet) | ||
146 | | otherwise = Bitfield bfSize bfSet | ||
147 | |||
148 | singleton :: PieceIx -> PieceCount -> Bitfield | ||
149 | singleton ix pc = have ix (haveNone pc) | ||
150 | |||
151 | -- | Assign new size to bitfield. FIXME Normally, size should be only | ||
152 | -- decreased, otherwise exception raised. | ||
153 | adjustSize :: PieceCount -> Bitfield -> Bitfield | ||
154 | adjustSize s Bitfield {..} = Bitfield s bfSet | ||
155 | |||
156 | -- | NOTE: for internal use only | ||
157 | interval :: PieceCount -> PieceIx -> PieceIx -> Bitfield | ||
158 | interval pc a b = Bitfield pc (S.interval a b) | ||
159 | |||
160 | {----------------------------------------------------------------------- | ||
161 | Query | ||
162 | -----------------------------------------------------------------------} | ||
163 | |||
164 | -- | Test if bitifield have no one index: peer do not have anything. | ||
165 | null :: Bitfield -> Bool | ||
166 | null Bitfield {..} = S.null bfSet | ||
167 | |||
168 | -- | Test if bitfield have all pieces. | ||
169 | full :: Bitfield -> Bool | ||
170 | full Bitfield {..} = S.size bfSet == bfSize | ||
171 | |||
172 | -- | Count of peer have pieces. | ||
173 | haveCount :: Bitfield -> PieceCount | ||
174 | haveCount = S.size . bfSet | ||
175 | |||
176 | -- | Total count of pieces and its indices. | ||
177 | totalCount :: Bitfield -> PieceCount | ||
178 | totalCount = bfSize | ||
179 | |||
180 | -- | Ratio of /have/ piece count to the /total/ piece count. | ||
181 | -- | ||
182 | -- > forall bf. 0 <= completeness bf <= 1 | ||
183 | -- | ||
184 | completeness :: Bitfield -> Ratio PieceCount | ||
185 | completeness b = haveCount b % totalCount b | ||
186 | |||
187 | inRange :: PieceIx -> Bitfield -> Bool | ||
188 | inRange ix Bitfield {..} = 0 <= ix && ix < bfSize | ||
189 | |||
190 | member :: PieceIx -> Bitfield -> Bool | ||
191 | member ix bf @ Bitfield {..} | ||
192 | | ix `inRange` bf = ix `S.member` bfSet | ||
193 | | otherwise = False | ||
194 | |||
195 | notMember :: PieceIx -> Bitfield -> Bool | ||
196 | notMember ix bf @ Bitfield {..} | ||
197 | | ix `inRange` bf = ix `S.notMember` bfSet | ||
198 | | otherwise = True | ||
199 | |||
200 | -- | Find first available piece index. | ||
201 | findMin :: Bitfield -> PieceIx | ||
202 | findMin = S.findMin . bfSet | ||
203 | {-# INLINE findMin #-} | ||
204 | |||
205 | -- | Find last available piece index. | ||
206 | findMax :: Bitfield -> PieceIx | ||
207 | findMax = S.findMax . bfSet | ||
208 | {-# INLINE findMax #-} | ||
209 | |||
210 | -- | Check if all pieces from first bitfield present if the second bitfield | ||
211 | isSubsetOf :: Bitfield -> Bitfield -> Bool | ||
212 | isSubsetOf a b = bfSet a `S.isSubsetOf` bfSet b | ||
213 | {-# INLINE isSubsetOf #-} | ||
214 | |||
215 | -- | Resulting bitfield includes only missing pieces. | ||
216 | complement :: Bitfield -> Bitfield | ||
217 | complement Bitfield {..} = Bitfield | ||
218 | { bfSet = uni `S.difference` bfSet | ||
219 | , bfSize = bfSize | ||
220 | } | ||
221 | where | ||
222 | Bitfield _ uni = haveAll bfSize | ||
223 | {-# INLINE complement #-} | ||
224 | |||
225 | {----------------------------------------------------------------------- | ||
226 | -- Availability | ||
227 | -----------------------------------------------------------------------} | ||
228 | |||
229 | -- | Frequencies are needed in piece selection startegies which use | ||
230 | -- availability quantity to find out the optimal next piece index to | ||
231 | -- download. | ||
232 | type Frequency = Int | ||
233 | |||
234 | -- TODO rename to availability | ||
235 | -- | How many times each piece index occur in the given bitfield set. | ||
236 | frequencies :: [Bitfield] -> Vector Frequency | ||
237 | frequencies [] = V.fromList [] | ||
238 | frequencies xs = runST $ do | ||
239 | v <- VM.new size | ||
240 | VM.set v 0 | ||
241 | forM_ xs $ \ Bitfield {..} -> do | ||
242 | forM_ (S.toList bfSet) $ \ x -> do | ||
243 | fr <- VM.read v x | ||
244 | VM.write v x (succ fr) | ||
245 | V.unsafeFreeze v | ||
246 | where | ||
247 | size = maximum (map bfSize xs) | ||
248 | |||
249 | -- TODO it seems like this operation is veeery slow | ||
250 | |||
251 | -- | Find least available piece index. If no piece available return | ||
252 | -- 'Nothing'. | ||
253 | rarest :: [Bitfield] -> Maybe PieceIx | ||
254 | rarest xs | ||
255 | | V.null freqMap = Nothing | ||
256 | | otherwise | ||
257 | = Just $ fst $ V.ifoldr' minIx (0, freqMap V.! 0) freqMap | ||
258 | where | ||
259 | freqMap = frequencies xs | ||
260 | |||
261 | minIx :: PieceIx -> Frequency | ||
262 | -> (PieceIx, Frequency) | ||
263 | -> (PieceIx, Frequency) | ||
264 | minIx ix fr acc@(_, fra) | ||
265 | | fr < fra && fr > 0 = (ix, fr) | ||
266 | | otherwise = acc | ||
267 | |||
268 | |||
269 | {----------------------------------------------------------------------- | ||
270 | Combine | ||
271 | -----------------------------------------------------------------------} | ||
272 | |||
273 | insert :: PieceIx -> Bitfield -> Bitfield | ||
274 | insert pix bf @ Bitfield {..} | ||
275 | | 0 <= pix && pix < bfSize = Bitfield | ||
276 | { bfSet = S.insert pix bfSet | ||
277 | , bfSize = bfSize | ||
278 | } | ||
279 | | otherwise = bf | ||
280 | |||
281 | -- | Find indices at least one peer have. | ||
282 | union :: Bitfield -> Bitfield -> Bitfield | ||
283 | union a b = {-# SCC union #-} Bitfield { | ||
284 | bfSize = bfSize a `max` bfSize b | ||
285 | , bfSet = bfSet a `S.union` bfSet b | ||
286 | } | ||
287 | |||
288 | -- | Find indices both peers have. | ||
289 | intersection :: Bitfield -> Bitfield -> Bitfield | ||
290 | intersection a b = {-# SCC intersection #-} Bitfield { | ||
291 | bfSize = bfSize a `min` bfSize b | ||
292 | , bfSet = bfSet a `S.intersection` bfSet b | ||
293 | } | ||
294 | |||
295 | -- | Find indices which have first peer but do not have the second peer. | ||
296 | difference :: Bitfield -> Bitfield -> Bitfield | ||
297 | difference a b = {-# SCC difference #-} Bitfield { | ||
298 | bfSize = bfSize a -- FIXME is it reasonable? | ||
299 | , bfSet = bfSet a `S.difference` bfSet b | ||
300 | } | ||
301 | |||
302 | -- | Find indices the any of the peers have. | ||
303 | unions :: [Bitfield] -> Bitfield | ||
304 | unions = {-# SCC unions #-} foldl' union (haveNone 0) | ||
305 | |||
306 | {----------------------------------------------------------------------- | ||
307 | Serialization | ||
308 | -----------------------------------------------------------------------} | ||
309 | |||
310 | -- | List all /have/ indexes. | ||
311 | toList :: Bitfield -> [PieceIx] | ||
312 | toList Bitfield {..} = S.toList bfSet | ||
313 | |||
314 | -- | Make bitfield from list of /have/ indexes. | ||
315 | fromList :: PieceCount -> [PieceIx] -> Bitfield | ||
316 | fromList s ixs = Bitfield { | ||
317 | bfSize = s | ||
318 | , bfSet = S.splitGT (-1) $ S.splitLT s $ S.fromList ixs | ||
319 | } | ||
320 | |||
321 | -- | Unpack 'Bitfield' from tightly packed bit array. Note resulting | ||
322 | -- size might be more than real bitfield size, use 'adjustSize'. | ||
323 | fromBitmap :: ByteString -> Bitfield | ||
324 | fromBitmap bs = {-# SCC fromBitmap #-} Bitfield { | ||
325 | bfSize = B.length bs * 8 | ||
326 | , bfSet = S.fromByteString bs | ||
327 | } | ||
328 | {-# INLINE fromBitmap #-} | ||
329 | |||
330 | -- | Pack a 'Bitfield' to tightly packed bit array. | ||
331 | toBitmap :: Bitfield -> Lazy.ByteString | ||
332 | toBitmap Bitfield {..} = {-# SCC toBitmap #-} Lazy.fromChunks [intsetBM, alignment] | ||
333 | where | ||
334 | byteSize = bfSize `div` 8 + if bfSize `mod` 8 == 0 then 0 else 1 | ||
335 | alignment = B.replicate (byteSize - B.length intsetBM) 0 | ||
336 | intsetBM = S.toByteString bfSet | ||
337 | |||
338 | {----------------------------------------------------------------------- | ||
339 | -- Piece selection | ||
340 | -----------------------------------------------------------------------} | ||
341 | |||
342 | type Selector = Bitfield -- ^ Indices of client /have/ pieces. | ||
343 | -> Bitfield -- ^ Indices of peer /have/ pieces. | ||
344 | -> [Bitfield] -- ^ Indices of other peers /have/ pieces. | ||
345 | -> Maybe PieceIx -- ^ Zero-based index of piece to request | ||
346 | -- to, if any. | ||
347 | |||
348 | selector :: Selector -- ^ Selector to use at the start. | ||
349 | -> Ratio PieceCount | ||
350 | -> Selector -- ^ Selector to use after the client have | ||
351 | -- the C pieces. | ||
352 | -> Selector -- ^ Selector that changes behaviour based | ||
353 | -- on completeness. | ||
354 | selector start pt ready h a xs = | ||
355 | case strategyClass pt h of | ||
356 | SCBeginning -> start h a xs | ||
357 | SCReady -> ready h a xs | ||
358 | SCEnd -> endGame h a xs | ||
359 | |||
360 | data StartegyClass | ||
361 | = SCBeginning | ||
362 | | SCReady | ||
363 | | SCEnd | ||
364 | deriving (Show, Eq, Ord, Enum, Bounded) | ||
365 | |||
366 | |||
367 | strategyClass :: Ratio PieceCount -> Bitfield -> StartegyClass | ||
368 | strategyClass threshold = classify . completeness | ||
369 | where | ||
370 | classify c | ||
371 | | c < threshold = SCBeginning | ||
372 | | c + 1 % numerator c < 1 = SCReady | ||
373 | -- FIXME numerator have is not total count | ||
374 | | otherwise = SCEnd | ||
375 | |||
376 | |||
377 | -- | Select the first available piece. | ||
378 | strictFirst :: Selector | ||
379 | strictFirst h a _ = Just $ findMin (difference a h) | ||
380 | |||
381 | -- | Select the last available piece. | ||
382 | strictLast :: Selector | ||
383 | strictLast h a _ = Just $ findMax (difference a h) | ||
384 | |||
385 | -- | | ||
386 | rarestFirst :: Selector | ||
387 | rarestFirst h a xs = rarest (map (intersection want) xs) | ||
388 | where | ||
389 | want = difference h a | ||
390 | |||
391 | -- | In average random first is faster than rarest first strategy but | ||
392 | -- only if all pieces are available. | ||
393 | randomFirst :: Selector | ||
394 | randomFirst = do | ||
395 | -- randomIO | ||
396 | error "randomFirst" | ||
397 | |||
398 | endGame :: Selector | ||
399 | endGame = strictLast | ||
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Block.hs b/bittorrent/src/Network/BitTorrent/Exchange/Block.hs new file mode 100644 index 00000000..bc9a3d24 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Block.hs | |||
@@ -0,0 +1,369 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Blocks are used to transfer pieces. | ||
9 | -- | ||
10 | {-# LANGUAGE BangPatterns #-} | ||
11 | {-# LANGUAGE FlexibleInstances #-} | ||
12 | {-# LANGUAGE TemplateHaskell #-} | ||
13 | {-# LANGUAGE DeriveFunctor #-} | ||
14 | {-# LANGUAGE DeriveDataTypeable #-} | ||
15 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
16 | module Network.BitTorrent.Exchange.Block | ||
17 | ( -- * Block attributes | ||
18 | BlockOffset | ||
19 | , BlockCount | ||
20 | , BlockSize | ||
21 | , defaultTransferSize | ||
22 | |||
23 | -- * Block index | ||
24 | , BlockIx(..) | ||
25 | , blockIxRange | ||
26 | |||
27 | -- * Block data | ||
28 | , Block(..) | ||
29 | , blockIx | ||
30 | , blockSize | ||
31 | , blockRange | ||
32 | , isPiece | ||
33 | , leadingBlock | ||
34 | |||
35 | -- * Block bucket | ||
36 | , Bucket | ||
37 | |||
38 | -- ** Query | ||
39 | , Network.BitTorrent.Exchange.Block.null | ||
40 | , Network.BitTorrent.Exchange.Block.full | ||
41 | , Network.BitTorrent.Exchange.Block.size | ||
42 | , Network.BitTorrent.Exchange.Block.spans | ||
43 | |||
44 | -- ** Construction | ||
45 | , Network.BitTorrent.Exchange.Block.empty | ||
46 | , Network.BitTorrent.Exchange.Block.insert | ||
47 | , Network.BitTorrent.Exchange.Block.insertLazy | ||
48 | , Network.BitTorrent.Exchange.Block.merge | ||
49 | , Network.BitTorrent.Exchange.Block.fromList | ||
50 | |||
51 | -- ** Rendering | ||
52 | , Network.BitTorrent.Exchange.Block.toPiece | ||
53 | |||
54 | -- ** Debug | ||
55 | , Network.BitTorrent.Exchange.Block.valid | ||
56 | ) where | ||
57 | |||
58 | import Prelude hiding (span) | ||
59 | import Control.Applicative | ||
60 | import Data.ByteString as BS hiding (span) | ||
61 | import Data.ByteString.Lazy as BL hiding (span) | ||
62 | import Data.ByteString.Lazy.Builder as BS | ||
63 | import Data.Default | ||
64 | import Data.Monoid | ||
65 | import Data.List as L hiding (span) | ||
66 | import Data.Serialize as S | ||
67 | import Data.Typeable | ||
68 | import Numeric | ||
69 | import Text.PrettyPrint as PP hiding ((<>)) | ||
70 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
71 | |||
72 | import Data.Torrent | ||
73 | |||
74 | {----------------------------------------------------------------------- | ||
75 | -- Block attributes | ||
76 | -----------------------------------------------------------------------} | ||
77 | |||
78 | -- | Offset of a block in a piece in bytes. Should be multiple of | ||
79 | -- the choosen block size. | ||
80 | type BlockOffset = Int | ||
81 | |||
82 | -- | Size of a block in bytes. Should be power of 2. | ||
83 | -- | ||
84 | -- Normally block size is equal to 'defaultTransferSize'. | ||
85 | -- | ||
86 | type BlockSize = Int | ||
87 | |||
88 | -- | Number of block in a piece of a torrent. Used to distinguish | ||
89 | -- block count from piece count. | ||
90 | type BlockCount = Int | ||
91 | |||
92 | -- | Widely used semi-official block size. Some clients can ignore if | ||
93 | -- block size of BlockIx in Request message is not equal to this | ||
94 | -- value. | ||
95 | -- | ||
96 | defaultTransferSize :: BlockSize | ||
97 | defaultTransferSize = 16 * 1024 | ||
98 | |||
99 | {----------------------------------------------------------------------- | ||
100 | Block Index | ||
101 | -----------------------------------------------------------------------} | ||
102 | |||
103 | -- | BlockIx correspond. | ||
104 | data BlockIx = BlockIx { | ||
105 | -- | Zero-based piece index. | ||
106 | ixPiece :: {-# UNPACK #-} !PieceIx | ||
107 | |||
108 | -- | Zero-based byte offset within the piece. | ||
109 | , ixOffset :: {-# UNPACK #-} !BlockOffset | ||
110 | |||
111 | -- | Block size starting from offset. | ||
112 | , ixLength :: {-# UNPACK #-} !BlockSize | ||
113 | } deriving (Show, Eq, Typeable) | ||
114 | |||
115 | -- | First block in torrent. Useful for debugging. | ||
116 | instance Default BlockIx where | ||
117 | def = BlockIx 0 0 defaultTransferSize | ||
118 | |||
119 | getInt :: S.Get Int | ||
120 | getInt = fromIntegral <$> S.getWord32be | ||
121 | {-# INLINE getInt #-} | ||
122 | |||
123 | putInt :: S.Putter Int | ||
124 | putInt = S.putWord32be . fromIntegral | ||
125 | {-# INLINE putInt #-} | ||
126 | |||
127 | instance Serialize BlockIx where | ||
128 | {-# SPECIALIZE instance Serialize BlockIx #-} | ||
129 | get = BlockIx <$> getInt | ||
130 | <*> getInt | ||
131 | <*> getInt | ||
132 | {-# INLINE get #-} | ||
133 | |||
134 | put BlockIx {..} = do | ||
135 | putInt ixPiece | ||
136 | putInt ixOffset | ||
137 | putInt ixLength | ||
138 | {-# INLINE put #-} | ||
139 | |||
140 | instance Pretty BlockIx where | ||
141 | pPrint BlockIx {..} = | ||
142 | ("piece = " <> int ixPiece <> ",") <+> | ||
143 | ("offset = " <> int ixOffset <> ",") <+> | ||
144 | ("length = " <> int ixLength) | ||
145 | |||
146 | -- | Get location of payload bytes in the torrent content. | ||
147 | blockIxRange :: (Num a, Integral a) => PieceSize -> BlockIx -> (a, a) | ||
148 | blockIxRange piSize BlockIx {..} = (offset, offset + len) | ||
149 | where | ||
150 | offset = fromIntegral piSize * fromIntegral ixPiece | ||
151 | + fromIntegral ixOffset | ||
152 | len = fromIntegral ixLength | ||
153 | {-# INLINE blockIxRange #-} | ||
154 | |||
155 | {----------------------------------------------------------------------- | ||
156 | Block | ||
157 | -----------------------------------------------------------------------} | ||
158 | |||
159 | data Block payload = Block { | ||
160 | -- | Zero-based piece index. | ||
161 | blkPiece :: {-# UNPACK #-} !PieceIx | ||
162 | |||
163 | -- | Zero-based byte offset within the piece. | ||
164 | , blkOffset :: {-# UNPACK #-} !BlockOffset | ||
165 | |||
166 | -- | Payload bytes. | ||
167 | , blkData :: !payload | ||
168 | } deriving (Show, Eq, Functor, Typeable) | ||
169 | |||
170 | -- | Payload is ommitted. | ||
171 | instance Pretty (Block BL.ByteString) where | ||
172 | pPrint = pPrint . blockIx | ||
173 | {-# INLINE pPrint #-} | ||
174 | |||
175 | -- | Get size of block /payload/ in bytes. | ||
176 | blockSize :: Block BL.ByteString -> BlockSize | ||
177 | blockSize = fromIntegral . BL.length . blkData | ||
178 | {-# INLINE blockSize #-} | ||
179 | |||
180 | -- | Get block index of a block. | ||
181 | blockIx :: Block BL.ByteString -> BlockIx | ||
182 | blockIx = BlockIx <$> blkPiece <*> blkOffset <*> blockSize | ||
183 | |||
184 | -- | Get location of payload bytes in the torrent content. | ||
185 | blockRange :: (Num a, Integral a) | ||
186 | => PieceSize -> Block BL.ByteString -> (a, a) | ||
187 | blockRange piSize = blockIxRange piSize . blockIx | ||
188 | {-# INLINE blockRange #-} | ||
189 | |||
190 | -- | Test if a block can be safely turned into a piece. | ||
191 | isPiece :: PieceSize -> Block BL.ByteString -> Bool | ||
192 | isPiece pieceLen blk @ (Block i offset _) = | ||
193 | offset == 0 && blockSize blk == pieceLen && i >= 0 | ||
194 | {-# INLINE isPiece #-} | ||
195 | |||
196 | -- | First block in the piece. | ||
197 | leadingBlock :: PieceIx -> BlockSize -> BlockIx | ||
198 | leadingBlock pix blockSize = BlockIx | ||
199 | { ixPiece = pix | ||
200 | , ixOffset = 0 | ||
201 | , ixLength = blockSize | ||
202 | } | ||
203 | {-# INLINE leadingBlock #-} | ||
204 | |||
205 | {----------------------------------------------------------------------- | ||
206 | -- Bucket | ||
207 | -----------------------------------------------------------------------} | ||
208 | |||
209 | type Pos = Int | ||
210 | type ChunkSize = Int | ||
211 | |||
212 | -- | A sparse set of blocks used to represent an /in progress/ piece. | ||
213 | data Bucket | ||
214 | = Nil | ||
215 | | Span {-# UNPACK #-} !ChunkSize !Bucket | ||
216 | | Fill {-# UNPACK #-} !ChunkSize !Builder !Bucket | ||
217 | |||
218 | instance Show Bucket where | ||
219 | showsPrec i Nil = showString "" | ||
220 | showsPrec i (Span s xs) = showString "Span " <> showInt s | ||
221 | <> showString " " <> showsPrec i xs | ||
222 | showsPrec i (Fill s _ xs) = showString "Fill " <> showInt s | ||
223 | <> showString " " <> showsPrec i xs | ||
224 | |||
225 | -- | INVARIANT: 'Nil' should appear only after 'Span' of 'Fill'. | ||
226 | nilInvFailed :: a | ||
227 | nilInvFailed = error "Nil: bucket invariant failed" | ||
228 | |||
229 | valid :: Bucket -> Bool | ||
230 | valid = check Nothing | ||
231 | where | ||
232 | check Nothing Nil = False -- see 'nilInvFailed' | ||
233 | check (Just _) _ = True | ||
234 | check prevIsSpan (Span sz xs) = | ||
235 | prevIsSpan /= Just True && -- Span n (NotSpan .. ) invariant | ||
236 | sz > 0 && -- Span is always non-empty | ||
237 | check (Just True) xs | ||
238 | check prevIsSpan (Fill sz b xs) = | ||
239 | prevIsSpan /= Just True && -- Fill n (NotFill .. ) invariant | ||
240 | sz > 0 && -- Fill is always non-empty | ||
241 | check (Just False) xs | ||
242 | |||
243 | instance Pretty Bucket where | ||
244 | pPrint Nil = nilInvFailed | ||
245 | pPrint bkt = go bkt | ||
246 | where | ||
247 | go Nil = PP.empty | ||
248 | go (Span sz xs) = "Span" <+> PP.int sz <+> go xs | ||
249 | go (Fill sz b xs) = "Fill" <+> PP.int sz <+> go xs | ||
250 | |||
251 | -- | Smart constructor: use it when some block is /deleted/ from | ||
252 | -- bucket. | ||
253 | span :: ChunkSize -> Bucket -> Bucket | ||
254 | span sz (Span sz' xs) = Span (sz + sz') xs | ||
255 | span sz xxs = Span sz xxs | ||
256 | {-# INLINE span #-} | ||
257 | |||
258 | -- | Smart constructor: use it when some block is /inserted/ to | ||
259 | -- bucket. | ||
260 | fill :: ChunkSize -> Builder -> Bucket -> Bucket | ||
261 | fill sz b (Fill sz' b' xs) = Fill (sz + sz') (b <> b') xs | ||
262 | fill sz b xxs = Fill sz b xxs | ||
263 | {-# INLINE fill #-} | ||
264 | |||
265 | {----------------------------------------------------------------------- | ||
266 | -- Bucket queries | ||
267 | -----------------------------------------------------------------------} | ||
268 | |||
269 | -- | /O(1)/. Test if this bucket is empty. | ||
270 | null :: Bucket -> Bool | ||
271 | null Nil = nilInvFailed | ||
272 | null (Span _ Nil) = True | ||
273 | null _ = False | ||
274 | {-# INLINE null #-} | ||
275 | |||
276 | -- | /O(1)/. Test if this bucket is complete. | ||
277 | full :: Bucket -> Bool | ||
278 | full Nil = nilInvFailed | ||
279 | full (Fill _ _ Nil) = True | ||
280 | full _ = False | ||
281 | {-# INLINE full #-} | ||
282 | |||
283 | -- | /O(n)/. Total size of the incompleted piece. | ||
284 | size :: Bucket -> PieceSize | ||
285 | size Nil = nilInvFailed | ||
286 | size bkt = go bkt | ||
287 | where | ||
288 | go Nil = 0 | ||
289 | go (Span sz xs) = sz + go xs | ||
290 | go (Fill sz _ xs) = sz + go xs | ||
291 | |||
292 | -- | /O(n)/. List incomplete blocks to download. If some block have | ||
293 | -- size more than the specified 'BlockSize' then block is split into | ||
294 | -- smaller blocks to satisfy given 'BlockSize'. Small (for | ||
295 | -- e.g. trailing) blocks is not ignored, but returned in-order. | ||
296 | spans :: BlockSize -> Bucket -> [(BlockOffset, BlockSize)] | ||
297 | spans expectedSize = go 0 | ||
298 | where | ||
299 | go _ Nil = [] | ||
300 | go off (Span sz xs) = listChunks off sz ++ go (off + sz) xs | ||
301 | go off (Fill sz _ xs) = go (off + sz) xs | ||
302 | |||
303 | listChunks off restSize | ||
304 | | restSize <= 0 = [] | ||
305 | | otherwise = (off, blkSize) | ||
306 | : listChunks (off + blkSize) (restSize - blkSize) | ||
307 | where | ||
308 | blkSize = min expectedSize restSize | ||
309 | |||
310 | {----------------------------------------------------------------------- | ||
311 | -- Bucket contstruction | ||
312 | -----------------------------------------------------------------------} | ||
313 | |||
314 | -- | /O(1)/. A new empty bucket capable to alloof specified size. | ||
315 | empty :: PieceSize -> Bucket | ||
316 | empty sz | ||
317 | | sz < 0 = error "empty: Bucket size must be a non-negative value" | ||
318 | | otherwise = Span sz Nil | ||
319 | {-# INLINE empty #-} | ||
320 | |||
321 | insertSpan :: Pos -> BS.ByteString -> ChunkSize -> Bucket -> Bucket | ||
322 | insertSpan !pos !bs !span_sz !xs = | ||
323 | let pref_len = pos | ||
324 | fill_len = span_sz - pos `min` BS.length bs | ||
325 | suff_len = (span_sz - pos) - fill_len | ||
326 | in mkSpan pref_len $ | ||
327 | fill fill_len (byteString (BS.take fill_len bs)) $ | ||
328 | mkSpan suff_len $ | ||
329 | xs | ||
330 | where | ||
331 | mkSpan 0 xs = xs | ||
332 | mkSpan sz xs = Span sz xs | ||
333 | |||
334 | -- | /O(n)/. Insert a strict bytestring at specified position. | ||
335 | -- | ||
336 | -- Best case: if blocks are inserted in sequential order, then this | ||
337 | -- operation should take /O(1)/. | ||
338 | -- | ||
339 | insert :: Pos -> BS.ByteString -> Bucket -> Bucket | ||
340 | insert _ _ Nil = nilInvFailed | ||
341 | insert dstPos bs bucket = go 0 bucket | ||
342 | where | ||
343 | intersects curPos sz = dstPos >= curPos && dstPos <= curPos + sz | ||
344 | |||
345 | go _ Nil = Nil | ||
346 | go curPos (Span sz xs) | ||
347 | | intersects curPos sz = insertSpan (dstPos - curPos) bs sz xs | ||
348 | | otherwise = span sz (go (curPos + sz) xs) | ||
349 | go curPos bkt @ (Fill sz br xs) | ||
350 | | intersects curPos sz = bkt | ||
351 | | otherwise = fill sz br (go (curPos + sz) xs) | ||
352 | |||
353 | fromList :: PieceSize -> [(Pos, BS.ByteString)] -> Bucket | ||
354 | fromList s = L.foldr (uncurry Network.BitTorrent.Exchange.Block.insert) | ||
355 | (Network.BitTorrent.Exchange.Block.empty s) | ||
356 | |||
357 | -- TODO zero-copy | ||
358 | insertLazy :: Pos -> BL.ByteString -> Bucket -> Bucket | ||
359 | insertLazy pos bl = Network.BitTorrent.Exchange.Block.insert pos (BL.toStrict bl) | ||
360 | |||
361 | -- | /O(n)/. | ||
362 | merge :: Bucket -> Bucket -> Bucket | ||
363 | merge = error "Bucket.merge: not implemented" | ||
364 | |||
365 | -- | /O(1)/. | ||
366 | toPiece :: Bucket -> Maybe BL.ByteString | ||
367 | toPiece Nil = nilInvFailed | ||
368 | toPiece (Fill _ b Nil) = Just (toLazyByteString b) | ||
369 | toPiece _ = Nothing | ||
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs b/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs new file mode 100644 index 00000000..6804d0a2 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs | |||
@@ -0,0 +1,1012 @@ | |||
1 | -- | | ||
2 | -- Module : Network.BitTorrent.Exchange.Wire | ||
3 | -- Copyright : (c) Sam Truzjan 2013 | ||
4 | -- (c) Daniel Gröber 2013 | ||
5 | -- License : BSD3 | ||
6 | -- Maintainer : pxqr.sta@gmail.com | ||
7 | -- Stability : experimental | ||
8 | -- Portability : portable | ||
9 | -- | ||
10 | -- Each peer wire connection is identified by triple @(topic, | ||
11 | -- remote_addr, this_addr)@. This means that connections are the | ||
12 | -- same if and only if their 'ConnectionId' are the same. Of course, | ||
13 | -- you /must/ avoid duplicated connections. | ||
14 | -- | ||
15 | -- This module control /integrity/ of data send and received. | ||
16 | -- | ||
17 | {-# LANGUAGE DeriveDataTypeable #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | ||
19 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
20 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
21 | module Network.BitTorrent.Exchange.Connection | ||
22 | ( -- * Wire | ||
23 | Connected | ||
24 | , Wire | ||
25 | , ChannelSide (..) | ||
26 | |||
27 | -- * Connection | ||
28 | , Connection | ||
29 | , connInitiatedBy | ||
30 | |||
31 | -- ** Identity | ||
32 | , connRemoteAddr | ||
33 | , connTopic | ||
34 | , connRemotePeerId | ||
35 | , connThisPeerId | ||
36 | |||
37 | -- ** Capabilities | ||
38 | , connProtocol | ||
39 | , connCaps | ||
40 | , connExtCaps | ||
41 | , connRemoteEhs | ||
42 | |||
43 | -- ** State | ||
44 | , connStatus | ||
45 | , connBitfield | ||
46 | |||
47 | -- ** Env | ||
48 | , connOptions | ||
49 | , connSession | ||
50 | , connStats | ||
51 | |||
52 | -- ** Status | ||
53 | , PeerStatus (..) | ||
54 | , ConnectionStatus (..) | ||
55 | , updateStatus | ||
56 | , statusUpdates | ||
57 | , clientStatus | ||
58 | , remoteStatus | ||
59 | , canUpload | ||
60 | , canDownload | ||
61 | , defaultUnchokeSlots | ||
62 | , defaultRechokeInterval | ||
63 | |||
64 | |||
65 | -- * Setup | ||
66 | , ConnectionPrefs (..) | ||
67 | , SessionLink (..) | ||
68 | , ConnectionConfig (..) | ||
69 | |||
70 | -- ** Initiate | ||
71 | , connectWire | ||
72 | |||
73 | -- ** Accept | ||
74 | , PendingConnection | ||
75 | , newPendingConnection | ||
76 | , pendingPeer | ||
77 | , pendingCaps | ||
78 | , pendingTopic | ||
79 | , closePending | ||
80 | , acceptWire | ||
81 | |||
82 | -- ** Post setup actions | ||
83 | , resizeBitfield | ||
84 | |||
85 | -- * Messaging | ||
86 | , recvMessage | ||
87 | , sendMessage | ||
88 | , filterQueue | ||
89 | , getMaxQueueLength | ||
90 | |||
91 | -- * Exceptions | ||
92 | , ProtocolError (..) | ||
93 | , WireFailure (..) | ||
94 | , peerPenalty | ||
95 | , isWireFailure | ||
96 | , disconnectPeer | ||
97 | |||
98 | -- * Stats | ||
99 | , ByteStats (..) | ||
100 | , FlowStats (..) | ||
101 | , ConnectionStats (..) | ||
102 | |||
103 | -- * Flood detection | ||
104 | , FloodDetector (..) | ||
105 | |||
106 | -- * Options | ||
107 | , Options (..) | ||
108 | ) where | ||
109 | |||
110 | import Control.Applicative | ||
111 | import Control.Concurrent hiding (yield) | ||
112 | import Control.Exception | ||
113 | import Control.Monad.Reader | ||
114 | import Control.Monad.State | ||
115 | import Control.Monad.Trans.Resource | ||
116 | import Control.Lens | ||
117 | import Data.ByteString as BS | ||
118 | import Data.ByteString.Lazy as BSL | ||
119 | import Data.Conduit as C | ||
120 | import Data.Conduit.Cereal | ||
121 | import Data.Conduit.List | ||
122 | import Data.Conduit.Network | ||
123 | import Data.Default | ||
124 | import Data.IORef | ||
125 | import Data.List as L | ||
126 | import Data.Maybe as M | ||
127 | import Data.Monoid | ||
128 | import Data.Serialize as S | ||
129 | import Data.Typeable | ||
130 | import Network | ||
131 | import Network.Socket hiding (Connected) | ||
132 | import Network.Socket.ByteString as BS | ||
133 | import Text.PrettyPrint as PP hiding ((<>)) | ||
134 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
135 | import Text.Show.Functions () | ||
136 | import System.Log.FastLogger (ToLogStr(..)) | ||
137 | import System.Timeout | ||
138 | |||
139 | import Data.Torrent | ||
140 | import Network.Address | ||
141 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
142 | import Network.BitTorrent.Exchange.Message as Msg | ||
143 | |||
144 | -- TODO handle port message? | ||
145 | -- TODO handle limits? | ||
146 | -- TODO filter not requested PIECE messages | ||
147 | -- TODO metadata piece request flood protection | ||
148 | -- TODO piece request flood protection | ||
149 | -- TODO protect against flood attacks | ||
150 | {----------------------------------------------------------------------- | ||
151 | -- Exceptions | ||
152 | -----------------------------------------------------------------------} | ||
153 | |||
154 | -- | Used to specify initiator of 'ProtocolError'. | ||
155 | data ChannelSide | ||
156 | = ThisPeer | ||
157 | | RemotePeer | ||
158 | deriving (Show, Eq, Enum, Bounded) | ||
159 | |||
160 | instance Default ChannelSide where | ||
161 | def = ThisPeer | ||
162 | |||
163 | instance Pretty ChannelSide where | ||
164 | pPrint = PP.text . show | ||
165 | |||
166 | -- | A protocol errors occur when a peer violates protocol | ||
167 | -- specification. | ||
168 | data ProtocolError | ||
169 | -- | Protocol string should be 'BitTorrent Protocol' but remote | ||
170 | -- peer have sent a different string. | ||
171 | = InvalidProtocol ProtocolName | ||
172 | |||
173 | -- | Sent and received protocol strings do not match. Can occur | ||
174 | -- in 'connectWire' only. | ||
175 | | UnexpectedProtocol ProtocolName | ||
176 | |||
177 | -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not | ||
178 | -- match with 'hsInfoHash' /this/ peer have sent. Can occur in | ||
179 | -- 'connectWire' or 'acceptWire' only. | ||
180 | | UnexpectedTopic InfoHash | ||
181 | |||
182 | -- | Some trackers or DHT can return 'PeerId' of a peer. If a | ||
183 | -- remote peer handshaked with different 'hsPeerId' then this | ||
184 | -- exception is raised. Can occur in 'connectWire' only. | ||
185 | | UnexpectedPeerId PeerId | ||
186 | |||
187 | -- | Accepted peer have sent unknown torrent infohash in | ||
188 | -- 'hsInfoHash' field. This situation usually happen when /this/ | ||
189 | -- peer have deleted the requested torrent. The error can occur in | ||
190 | -- 'acceptWire' function only. | ||
191 | | UnknownTopic InfoHash | ||
192 | |||
193 | -- | A remote peer have 'ExtExtended' enabled but did not send an | ||
194 | -- 'ExtendedHandshake' back. | ||
195 | | HandshakeRefused | ||
196 | |||
197 | -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST | ||
198 | -- be send either once or zero times, but either this peer or | ||
199 | -- remote peer send a bitfield message the second time. | ||
200 | | BitfieldAlreadySent ChannelSide | ||
201 | |||
202 | -- | Capabilities violation. For example this exception can occur | ||
203 | -- when a peer have sent 'Port' message but 'ExtDHT' is not | ||
204 | -- allowed in 'connCaps'. | ||
205 | | DisallowedMessage | ||
206 | { -- | Who sent invalid message. | ||
207 | violentSender :: ChannelSide | ||
208 | |||
209 | -- | If the 'violentSender' reconnect with this extension | ||
210 | -- enabled then he can try to send this message. | ||
211 | , extensionRequired :: Extension | ||
212 | } | ||
213 | deriving Show | ||
214 | |||
215 | instance Pretty ProtocolError where | ||
216 | pPrint = PP.text . show | ||
217 | |||
218 | errorPenalty :: ProtocolError -> Int | ||
219 | errorPenalty (InvalidProtocol _) = 1 | ||
220 | errorPenalty (UnexpectedProtocol _) = 1 | ||
221 | errorPenalty (UnexpectedTopic _) = 1 | ||
222 | errorPenalty (UnexpectedPeerId _) = 1 | ||
223 | errorPenalty (UnknownTopic _) = 0 | ||
224 | errorPenalty (HandshakeRefused ) = 1 | ||
225 | errorPenalty (BitfieldAlreadySent _) = 1 | ||
226 | errorPenalty (DisallowedMessage _ _) = 1 | ||
227 | |||
228 | -- | Exceptions used to interrupt the current P2P session. | ||
229 | data WireFailure | ||
230 | = ConnectionRefused IOError | ||
231 | |||
232 | -- | Force termination of wire connection. | ||
233 | -- | ||
234 | -- Normally you should throw only this exception from event loop | ||
235 | -- using 'disconnectPeer', other exceptions are thrown | ||
236 | -- automatically by functions from this module. | ||
237 | -- | ||
238 | | DisconnectPeer | ||
239 | |||
240 | -- | A peer not responding and did not send a 'KeepAlive' message | ||
241 | -- for a specified period of time. | ||
242 | | PeerDisconnected | ||
243 | |||
244 | -- | A remote peer have sent some unknown message we unable to | ||
245 | -- parse. | ||
246 | | DecodingError GetException | ||
247 | |||
248 | -- | See 'ProtocolError' for more details. | ||
249 | | ProtocolError ProtocolError | ||
250 | |||
251 | -- | A possible malicious peer have sent too many control messages | ||
252 | -- without making any progress. | ||
253 | | FloodDetected ConnectionStats | ||
254 | deriving (Show, Typeable) | ||
255 | |||
256 | instance Exception WireFailure | ||
257 | |||
258 | instance Pretty WireFailure where | ||
259 | pPrint = PP.text . show | ||
260 | |||
261 | -- TODO | ||
262 | -- data Penalty = Ban | Penalty Int | ||
263 | |||
264 | peerPenalty :: WireFailure -> Int | ||
265 | peerPenalty DisconnectPeer = 0 | ||
266 | peerPenalty PeerDisconnected = 0 | ||
267 | peerPenalty (DecodingError _) = 1 | ||
268 | peerPenalty (ProtocolError e) = errorPenalty e | ||
269 | peerPenalty (FloodDetected _) = 1 | ||
270 | |||
271 | -- | Do nothing with exception, used with 'handle' or 'try'. | ||
272 | isWireFailure :: Monad m => WireFailure -> m () | ||
273 | isWireFailure _ = return () | ||
274 | |||
275 | protocolError :: MonadThrow m => ProtocolError -> m a | ||
276 | protocolError = monadThrow . ProtocolError | ||
277 | |||
278 | {----------------------------------------------------------------------- | ||
279 | -- Stats | ||
280 | -----------------------------------------------------------------------} | ||
281 | |||
282 | -- | Message stats in one direction. | ||
283 | data FlowStats = FlowStats | ||
284 | { -- | Number of the messages sent or received. | ||
285 | messageCount :: {-# UNPACK #-} !Int | ||
286 | -- | Sum of byte sequences of all messages. | ||
287 | , messageBytes :: {-# UNPACK #-} !ByteStats | ||
288 | } deriving Show | ||
289 | |||
290 | instance Pretty FlowStats where | ||
291 | pPrint FlowStats {..} = | ||
292 | PP.int messageCount <+> "messages" $+$ | ||
293 | pPrint messageBytes | ||
294 | |||
295 | -- | Zeroed stats. | ||
296 | instance Default FlowStats where | ||
297 | def = FlowStats 0 def | ||
298 | |||
299 | -- | Monoid under addition. | ||
300 | instance Monoid FlowStats where | ||
301 | mempty = def | ||
302 | mappend a b = FlowStats | ||
303 | { messageBytes = messageBytes a <> messageBytes b | ||
304 | , messageCount = messageCount a + messageCount b | ||
305 | } | ||
306 | |||
307 | -- | Find average length of byte sequences per message. | ||
308 | avgByteStats :: FlowStats -> ByteStats | ||
309 | avgByteStats (FlowStats n ByteStats {..}) = ByteStats | ||
310 | { overhead = overhead `quot` n | ||
311 | , control = control `quot` n | ||
312 | , payload = payload `quot` n | ||
313 | } | ||
314 | |||
315 | -- | Message stats in both directions. This data can be retrieved | ||
316 | -- using 'getStats' function. | ||
317 | -- | ||
318 | -- Note that this stats is completely different from | ||
319 | -- 'Data.Torrent.Progress.Progress': payload bytes not necessary | ||
320 | -- equal to downloaded\/uploaded bytes since a peer can send a | ||
321 | -- broken block. | ||
322 | -- | ||
323 | data ConnectionStats = ConnectionStats | ||
324 | { -- | Received messages stats. | ||
325 | incomingFlow :: !FlowStats | ||
326 | -- | Sent messages stats. | ||
327 | , outcomingFlow :: !FlowStats | ||
328 | } deriving Show | ||
329 | |||
330 | instance Pretty ConnectionStats where | ||
331 | pPrint ConnectionStats {..} = vcat | ||
332 | [ "Recv:" <+> pPrint incomingFlow | ||
333 | , "Sent:" <+> pPrint outcomingFlow | ||
334 | , "Both:" <+> pPrint (incomingFlow <> outcomingFlow) | ||
335 | ] | ||
336 | |||
337 | -- | Zeroed stats. | ||
338 | instance Default ConnectionStats where | ||
339 | def = ConnectionStats def def | ||
340 | |||
341 | -- | Monoid under addition. | ||
342 | instance Monoid ConnectionStats where | ||
343 | mempty = def | ||
344 | mappend a b = ConnectionStats | ||
345 | { incomingFlow = incomingFlow a <> incomingFlow b | ||
346 | , outcomingFlow = outcomingFlow a <> outcomingFlow b | ||
347 | } | ||
348 | |||
349 | -- | Aggregate one more message stats in the /specified/ direction. | ||
350 | addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats | ||
351 | addStats ThisPeer x s = s { outcomingFlow = (FlowStats 1 x) <> (outcomingFlow s) } | ||
352 | addStats RemotePeer x s = s { incomingFlow = (FlowStats 1 x) <> (incomingFlow s) } | ||
353 | |||
354 | -- | Sum of overhead and control bytes in both directions. | ||
355 | wastedBytes :: ConnectionStats -> Int | ||
356 | wastedBytes ConnectionStats {..} = overhead + control | ||
357 | where | ||
358 | FlowStats _ ByteStats {..} = incomingFlow <> outcomingFlow | ||
359 | |||
360 | -- | Sum of payload bytes in both directions. | ||
361 | payloadBytes :: ConnectionStats -> Int | ||
362 | payloadBytes ConnectionStats {..} = | ||
363 | payload (messageBytes (incomingFlow <> outcomingFlow)) | ||
364 | |||
365 | -- | Sum of any bytes in both directions. | ||
366 | transmittedBytes :: ConnectionStats -> Int | ||
367 | transmittedBytes ConnectionStats {..} = | ||
368 | byteLength (messageBytes (incomingFlow <> outcomingFlow)) | ||
369 | |||
370 | {----------------------------------------------------------------------- | ||
371 | -- Flood protection | ||
372 | -----------------------------------------------------------------------} | ||
373 | |||
374 | defaultFloodFactor :: Int | ||
375 | defaultFloodFactor = 1 | ||
376 | |||
377 | -- | This is a very permissive value, connection setup usually takes | ||
378 | -- around 10-100KB, including both directions. | ||
379 | defaultFloodThreshold :: Int | ||
380 | defaultFloodThreshold = 2 * 1024 * 1024 | ||
381 | |||
382 | -- | A flood detection function. | ||
383 | type Detector stats = Int -- ^ Factor; | ||
384 | -> Int -- ^ Threshold; | ||
385 | -> stats -- ^ Stats to analyse; | ||
386 | -> Bool -- ^ Is this a flooded connection? | ||
387 | |||
388 | defaultDetector :: Detector ConnectionStats | ||
389 | defaultDetector factor threshold s = | ||
390 | transmittedBytes s > threshold && | ||
391 | factor * wastedBytes s > payloadBytes s | ||
392 | |||
393 | -- | Flood detection is used to protect /this/ peer against a /remote/ | ||
394 | -- malicious peer sending meaningless control messages. | ||
395 | data FloodDetector = FloodDetector | ||
396 | { -- | Max ratio of payload bytes to control bytes. | ||
397 | floodFactor :: {-# UNPACK #-} !Int | ||
398 | |||
399 | -- | Max count of bytes connection /setup/ can take including | ||
400 | -- 'Handshake', 'ExtendedHandshake', 'Bitfield', 'Have' and 'Port' | ||
401 | -- messages. This value is used to avoid false positives at the | ||
402 | -- connection initialization. | ||
403 | , floodThreshold :: {-# UNPACK #-} !Int | ||
404 | |||
405 | -- | Flood predicate on the /current/ 'ConnectionStats'. | ||
406 | , floodPredicate :: Detector ConnectionStats | ||
407 | } deriving Show | ||
408 | |||
409 | instance Eq FloodDetector where | ||
410 | a == b = floodFactor a == floodFactor b | ||
411 | && floodThreshold a == floodThreshold b | ||
412 | |||
413 | -- | Flood detector with very permissive options. | ||
414 | instance Default FloodDetector where | ||
415 | def = FloodDetector | ||
416 | { floodFactor = defaultFloodFactor | ||
417 | , floodThreshold = defaultFloodThreshold | ||
418 | , floodPredicate = defaultDetector | ||
419 | } | ||
420 | |||
421 | -- | This peer might drop connection if the detector gives positive answer. | ||
422 | runDetector :: FloodDetector -> ConnectionStats -> Bool | ||
423 | runDetector FloodDetector {..} = floodPredicate floodFactor floodThreshold | ||
424 | |||
425 | {----------------------------------------------------------------------- | ||
426 | -- Options | ||
427 | -----------------------------------------------------------------------} | ||
428 | |||
429 | -- | Various connection settings and limits. | ||
430 | data Options = Options | ||
431 | { -- | How often /this/ peer should send 'KeepAlive' messages. | ||
432 | keepaliveInterval :: {-# UNPACK #-} !Int | ||
433 | |||
434 | -- | /This/ peer will drop connection if a /remote/ peer did not | ||
435 | -- send any message for this period of time. | ||
436 | , keepaliveTimeout :: {-# UNPACK #-} !Int | ||
437 | |||
438 | , requestQueueLength :: {-# UNPACK #-} !Int | ||
439 | |||
440 | -- | Used to protect against flood attacks. | ||
441 | , floodDetector :: FloodDetector | ||
442 | |||
443 | -- | Used to protect against flood attacks in /metadata | ||
444 | -- exchange/. Normally, a requesting peer should request each | ||
445 | -- 'InfoDict' piece only one time, but a malicious peer can | ||
446 | -- saturate wire with 'MetadataRequest' messages thus flooding | ||
447 | -- responding peer. | ||
448 | -- | ||
449 | -- This value set upper bound for number of 'MetadataRequests' | ||
450 | -- for each piece. | ||
451 | -- | ||
452 | , metadataFactor :: {-# UNPACK #-} !Int | ||
453 | |||
454 | -- | Used to protect against out-of-memory attacks: malicious peer | ||
455 | -- can claim that 'totalSize' is, say, 100TB and send some random | ||
456 | -- data instead of infodict pieces. Since requesting peer unable | ||
457 | -- to check not completed infodict via the infohash, the | ||
458 | -- accumulated pieces will allocate the all available memory. | ||
459 | -- | ||
460 | -- This limit set upper bound for 'InfoDict' size. See | ||
461 | -- 'ExtendedMetadata' for more info. | ||
462 | -- | ||
463 | , maxInfoDictSize :: {-# UNPACK #-} !Int | ||
464 | } deriving (Show, Eq) | ||
465 | |||
466 | -- | Permissive default parameters, most likely you don't need to | ||
467 | -- change them. | ||
468 | instance Default Options where | ||
469 | def = Options | ||
470 | { keepaliveInterval = defaultKeepAliveInterval | ||
471 | , keepaliveTimeout = defaultKeepAliveTimeout | ||
472 | , requestQueueLength = defaultRequestQueueLength | ||
473 | , floodDetector = def | ||
474 | , metadataFactor = defaultMetadataFactor | ||
475 | , maxInfoDictSize = defaultMaxInfoDictSize | ||
476 | } | ||
477 | |||
478 | {----------------------------------------------------------------------- | ||
479 | -- Peer status | ||
480 | -----------------------------------------------------------------------} | ||
481 | |||
482 | -- | Connections contain two bits of state on either end: choked or | ||
483 | -- not, and interested or not. | ||
484 | data PeerStatus = PeerStatus | ||
485 | { -- | Choking is a notification that no data will be sent until | ||
486 | -- unchoking happens. | ||
487 | _choking :: !Bool | ||
488 | |||
489 | -- | | ||
490 | , _interested :: !Bool | ||
491 | } deriving (Show, Eq, Ord) | ||
492 | |||
493 | $(makeLenses ''PeerStatus) | ||
494 | |||
495 | instance Pretty PeerStatus where | ||
496 | pPrint PeerStatus {..} = | ||
497 | pPrint (Choking _choking) <+> "and" <+> pPrint (Interested _interested) | ||
498 | |||
499 | -- | Connections start out choked and not interested. | ||
500 | instance Default PeerStatus where | ||
501 | def = PeerStatus True False | ||
502 | |||
503 | instance Monoid PeerStatus where | ||
504 | mempty = def | ||
505 | mappend a b = PeerStatus | ||
506 | { _choking = _choking a && _choking b | ||
507 | , _interested = _interested a || _interested b | ||
508 | } | ||
509 | |||
510 | -- | Can be used to update remote peer status using incoming 'Status' | ||
511 | -- message. | ||
512 | updateStatus :: StatusUpdate -> PeerStatus -> PeerStatus | ||
513 | updateStatus (Choking b) = choking .~ b | ||
514 | updateStatus (Interested b) = interested .~ b | ||
515 | |||
516 | -- | Can be used to generate outcoming messages. | ||
517 | statusUpdates :: PeerStatus -> PeerStatus -> [StatusUpdate] | ||
518 | statusUpdates a b = M.catMaybes $ | ||
519 | [ if _choking a == _choking b then Nothing | ||
520 | else Just $ Choking $ _choking b | ||
521 | , if _interested a == _interested b then Nothing | ||
522 | else Just $ Interested $ _interested b | ||
523 | ] | ||
524 | |||
525 | {----------------------------------------------------------------------- | ||
526 | -- Connection status | ||
527 | -----------------------------------------------------------------------} | ||
528 | |||
529 | -- | Status of the both endpoints. | ||
530 | data ConnectionStatus = ConnectionStatus | ||
531 | { _clientStatus :: !PeerStatus | ||
532 | , _remoteStatus :: !PeerStatus | ||
533 | } deriving (Show, Eq) | ||
534 | |||
535 | $(makeLenses ''ConnectionStatus) | ||
536 | |||
537 | instance Pretty ConnectionStatus where | ||
538 | pPrint ConnectionStatus {..} = | ||
539 | "this " PP.<+> pPrint _clientStatus PP.$$ | ||
540 | "remote" PP.<+> pPrint _remoteStatus | ||
541 | |||
542 | -- | Connections start out choked and not interested. | ||
543 | instance Default ConnectionStatus where | ||
544 | def = ConnectionStatus def def | ||
545 | |||
546 | -- | Can the client transfer to the remote peer? | ||
547 | canUpload :: ConnectionStatus -> Bool | ||
548 | canUpload ConnectionStatus {..} | ||
549 | = _interested _remoteStatus && not (_choking _clientStatus) | ||
550 | |||
551 | -- | Can the client transfer from the remote peer? | ||
552 | canDownload :: ConnectionStatus -> Bool | ||
553 | canDownload ConnectionStatus {..} | ||
554 | = _interested _clientStatus && not (_choking _remoteStatus) | ||
555 | |||
556 | -- | Indicates how many peers are allowed to download from the client | ||
557 | -- by default. | ||
558 | defaultUnchokeSlots :: Int | ||
559 | defaultUnchokeSlots = 4 | ||
560 | |||
561 | -- | | ||
562 | defaultRechokeInterval :: Int | ||
563 | defaultRechokeInterval = 10 * 1000 * 1000 | ||
564 | |||
565 | {----------------------------------------------------------------------- | ||
566 | -- Connection | ||
567 | -----------------------------------------------------------------------} | ||
568 | |||
569 | data ConnectionState = ConnectionState { | ||
570 | -- | If @not (allowed ExtExtended connCaps)@ then this set is always | ||
571 | -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of | ||
572 | -- 'MessageId' to the message type for the remote peer. | ||
573 | -- | ||
574 | -- Note that this value can change in current session if either | ||
575 | -- this or remote peer will initiate rehandshaking. | ||
576 | -- | ||
577 | _connExtCaps :: !ExtendedCaps | ||
578 | |||
579 | -- | Current extended handshake information from the remote peer | ||
580 | , _connRemoteEhs :: !ExtendedHandshake | ||
581 | |||
582 | -- | Various stats about messages sent and received. Stats can be | ||
583 | -- used to protect /this/ peer against flood attacks. | ||
584 | -- | ||
585 | -- Note that this value will change with the next sent or received | ||
586 | -- message. | ||
587 | , _connStats :: !ConnectionStats | ||
588 | |||
589 | , _connStatus :: !ConnectionStatus | ||
590 | |||
591 | -- | Bitfield of remote endpoint. | ||
592 | , _connBitfield :: !Bitfield | ||
593 | } | ||
594 | |||
595 | makeLenses ''ConnectionState | ||
596 | |||
597 | instance Default ConnectionState where | ||
598 | def = ConnectionState | ||
599 | { _connExtCaps = def | ||
600 | , _connRemoteEhs = def | ||
601 | , _connStats = def | ||
602 | , _connStatus = def | ||
603 | , _connBitfield = BF.haveNone 0 | ||
604 | } | ||
605 | |||
606 | -- | Connection keep various info about both peers. | ||
607 | data Connection s = Connection | ||
608 | { connInitiatedBy :: !ChannelSide | ||
609 | |||
610 | , connRemoteAddr :: !(PeerAddr IP) | ||
611 | |||
612 | -- | /Both/ peers handshaked with this protocol string. The only | ||
613 | -- value is \"Bittorrent Protocol\" but this can be changed in | ||
614 | -- future. | ||
615 | , connProtocol :: !ProtocolName | ||
616 | |||
617 | -- | Set of enabled core extensions, i.e. the pre BEP10 extension | ||
618 | -- mechanism. This value is used to check if a message is allowed | ||
619 | -- to be sent or received. | ||
620 | , connCaps :: !Caps | ||
621 | |||
622 | -- | /Both/ peers handshaked with this infohash. A connection can | ||
623 | -- handle only one topic, use 'reconnect' to change the current | ||
624 | -- topic. | ||
625 | , connTopic :: !InfoHash | ||
626 | |||
627 | -- | Typically extracted from handshake. | ||
628 | , connRemotePeerId :: !PeerId | ||
629 | |||
630 | -- | Typically extracted from handshake. | ||
631 | , connThisPeerId :: !PeerId | ||
632 | |||
633 | -- | | ||
634 | , connOptions :: !Options | ||
635 | |||
636 | -- | Mutable connection state, see 'ConnectionState' | ||
637 | , connState :: !(IORef ConnectionState) | ||
638 | |||
639 | -- -- | Max request queue length. | ||
640 | -- , connMaxQueueLen :: !Int | ||
641 | |||
642 | -- | Environment data. | ||
643 | , connSession :: !s | ||
644 | |||
645 | , connChan :: !(Chan Message) | ||
646 | } | ||
647 | |||
648 | instance Pretty (Connection s) where | ||
649 | pPrint Connection {..} = "Connection" | ||
650 | |||
651 | instance ToLogStr (Connection s) where | ||
652 | toLogStr Connection {..} = mconcat | ||
653 | [ toLogStr (show connRemoteAddr) | ||
654 | , toLogStr (show connProtocol) | ||
655 | , toLogStr (show connCaps) | ||
656 | , toLogStr (show connTopic) | ||
657 | , toLogStr (show connRemotePeerId) | ||
658 | , toLogStr (show connThisPeerId) | ||
659 | , toLogStr (show connOptions) | ||
660 | ] | ||
661 | |||
662 | -- TODO check extended messages too | ||
663 | isAllowed :: Connection s -> Message -> Bool | ||
664 | isAllowed Connection {..} msg | ||
665 | | Just ext <- requires msg = ext `allowed` connCaps | ||
666 | | otherwise = True | ||
667 | |||
668 | {----------------------------------------------------------------------- | ||
669 | -- Hanshaking | ||
670 | -----------------------------------------------------------------------} | ||
671 | |||
672 | sendHandshake :: Socket -> Handshake -> IO () | ||
673 | sendHandshake sock hs = sendAll sock (S.encode hs) | ||
674 | |||
675 | recvHandshake :: Socket -> IO Handshake | ||
676 | recvHandshake sock = do | ||
677 | header <- BS.recv sock 1 | ||
678 | unless (BS.length header == 1) $ | ||
679 | throw $ userError "Unable to receive handshake header." | ||
680 | |||
681 | let protocolLen = BS.head header | ||
682 | let restLen = handshakeSize protocolLen - 1 | ||
683 | |||
684 | body <- BS.recv sock restLen | ||
685 | let resp = BS.cons protocolLen body | ||
686 | either (throwIO . userError) return $ S.decode resp | ||
687 | |||
688 | -- | Handshaking with a peer specified by the second argument. | ||
689 | -- | ||
690 | -- It's important to send handshake first because /accepting/ peer | ||
691 | -- do not know handshake topic and will wait until /connecting/ peer | ||
692 | -- will send handshake. | ||
693 | -- | ||
694 | initiateHandshake :: Socket -> Handshake -> IO Handshake | ||
695 | initiateHandshake sock hs = do | ||
696 | sendHandshake sock hs | ||
697 | recvHandshake sock | ||
698 | |||
699 | data HandshakePair = HandshakePair | ||
700 | { handshakeSent :: !Handshake | ||
701 | , handshakeRecv :: !Handshake | ||
702 | } deriving (Show, Eq) | ||
703 | |||
704 | validatePair :: HandshakePair -> PeerAddr IP -> IO () | ||
705 | validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp | ||
706 | [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs') | ||
707 | , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs') | ||
708 | , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs') | ||
709 | , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) | ||
710 | , UnexpectedPeerId $ hsPeerId hs') | ||
711 | ] | ||
712 | where | ||
713 | checkProp (t, e) = unless t $ throwIO $ ProtocolError e | ||
714 | |||
715 | -- | Connection state /right/ after handshaking. | ||
716 | establishedStats :: HandshakePair -> ConnectionStats | ||
717 | establishedStats HandshakePair {..} = ConnectionStats | ||
718 | { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent | ||
719 | , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv | ||
720 | } | ||
721 | |||
722 | {----------------------------------------------------------------------- | ||
723 | -- Wire | ||
724 | -----------------------------------------------------------------------} | ||
725 | |||
726 | -- | do not expose this so we can change it without breaking api | ||
727 | newtype Connected s a = Connected { runConnected :: (ReaderT (Connection s) IO a) } | ||
728 | deriving (Functor, Applicative, Monad | ||
729 | , MonadIO, MonadReader (Connection s), MonadThrow | ||
730 | ) | ||
731 | |||
732 | instance MonadState ConnectionState (Connected s) where | ||
733 | get = Connected (asks connState) >>= liftIO . readIORef | ||
734 | put x = Connected (asks connState) >>= liftIO . flip writeIORef x | ||
735 | |||
736 | -- | A duplex channel connected to a remote peer which keep tracks | ||
737 | -- connection parameters. | ||
738 | type Wire s a = ConduitM Message Message (Connected s) a | ||
739 | |||
740 | {----------------------------------------------------------------------- | ||
741 | -- Wrapper | ||
742 | -----------------------------------------------------------------------} | ||
743 | |||
744 | putStats :: ChannelSide -> Message -> Connected s () | ||
745 | putStats side msg = connStats %= addStats side (stats msg) | ||
746 | |||
747 | validate :: ChannelSide -> Message -> Connected s () | ||
748 | validate side msg = do | ||
749 | caps <- asks connCaps | ||
750 | case requires msg of | ||
751 | Nothing -> return () | ||
752 | Just ext | ||
753 | | ext `allowed` caps -> return () | ||
754 | | otherwise -> protocolError $ DisallowedMessage side ext | ||
755 | |||
756 | trackFlow :: ChannelSide -> Wire s () | ||
757 | trackFlow side = iterM $ do | ||
758 | validate side | ||
759 | putStats side | ||
760 | |||
761 | {----------------------------------------------------------------------- | ||
762 | -- Setup | ||
763 | -----------------------------------------------------------------------} | ||
764 | |||
765 | -- System.Timeout.timeout multiplier | ||
766 | seconds :: Int | ||
767 | seconds = 1000000 | ||
768 | |||
769 | sinkChan :: MonadIO m => Chan Message -> Sink Message m () | ||
770 | sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan) | ||
771 | |||
772 | sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message | ||
773 | sourceChan interval chan = do | ||
774 | mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan | ||
775 | yield $ fromMaybe Msg.KeepAlive mmsg | ||
776 | |||
777 | -- | Normally you should use 'connectWire' or 'acceptWire'. | ||
778 | runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO () | ||
779 | runWire action sock chan conn = flip runReaderT conn $ runConnected $ | ||
780 | sourceSocket sock $= | ||
781 | conduitGet S.get $= | ||
782 | trackFlow RemotePeer $= | ||
783 | action $= | ||
784 | trackFlow ThisPeer C.$$ | ||
785 | sinkChan chan | ||
786 | |||
787 | -- | This function will block until a peer send new message. You can | ||
788 | -- also use 'await'. | ||
789 | recvMessage :: Wire s Message | ||
790 | recvMessage = await >>= maybe (monadThrow PeerDisconnected) return | ||
791 | |||
792 | -- | You can also use 'yield'. | ||
793 | sendMessage :: PeerMessage msg => msg -> Wire s () | ||
794 | sendMessage msg = do | ||
795 | ecaps <- use connExtCaps | ||
796 | yield $ envelop ecaps msg | ||
797 | |||
798 | getMaxQueueLength :: Connected s Int | ||
799 | getMaxQueueLength = do | ||
800 | advertisedLen <- ehsQueueLength <$> use connRemoteEhs | ||
801 | defaultLen <- asks (requestQueueLength . connOptions) | ||
802 | return $ fromMaybe defaultLen advertisedLen | ||
803 | |||
804 | -- | Filter pending messages from send buffer. | ||
805 | filterQueue :: (Message -> Bool) -> Wire s () | ||
806 | filterQueue p = lift $ do | ||
807 | chan <- asks connChan | ||
808 | liftIO $ getChanContents chan >>= writeList2Chan chan . L.filter p | ||
809 | |||
810 | -- | Forcefully terminate wire session and close socket. | ||
811 | disconnectPeer :: Wire s a | ||
812 | disconnectPeer = monadThrow DisconnectPeer | ||
813 | |||
814 | extendedHandshake :: ExtendedCaps -> Wire s () | ||
815 | extendedHandshake caps = do | ||
816 | -- TODO add other params to the handshake | ||
817 | sendMessage $ nullExtendedHandshake caps | ||
818 | msg <- recvMessage | ||
819 | case msg of | ||
820 | Extended (EHandshake remoteEhs@(ExtendedHandshake {..})) -> do | ||
821 | connExtCaps .= (ehsCaps <> caps) | ||
822 | connRemoteEhs .= remoteEhs | ||
823 | _ -> protocolError HandshakeRefused | ||
824 | |||
825 | rehandshake :: ExtendedCaps -> Wire s () | ||
826 | rehandshake caps = error "rehandshake" | ||
827 | |||
828 | reconnect :: Wire s () | ||
829 | reconnect = error "reconnect" | ||
830 | |||
831 | data ConnectionId = ConnectionId | ||
832 | { topic :: !InfoHash | ||
833 | , remoteAddr :: !(PeerAddr IP) | ||
834 | , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node. | ||
835 | } | ||
836 | |||
837 | -- | /Preffered/ settings of wire. To get the real use 'ask'. | ||
838 | data ConnectionPrefs = ConnectionPrefs | ||
839 | { prefOptions :: !Options | ||
840 | , prefProtocol :: !ProtocolName | ||
841 | , prefCaps :: !Caps | ||
842 | , prefExtCaps :: !ExtendedCaps | ||
843 | } deriving (Show, Eq) | ||
844 | |||
845 | instance Default ConnectionPrefs where | ||
846 | def = ConnectionPrefs | ||
847 | { prefOptions = def | ||
848 | , prefProtocol = def | ||
849 | , prefCaps = def | ||
850 | , prefExtCaps = def | ||
851 | } | ||
852 | |||
853 | normalize :: ConnectionPrefs -> ConnectionPrefs | ||
854 | normalize = error "normalize" | ||
855 | |||
856 | -- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'. | ||
857 | data SessionLink s = SessionLink | ||
858 | { linkTopic :: !(InfoHash) | ||
859 | , linkPeerId :: !(PeerId) | ||
860 | , linkMetadataSize :: !(Maybe Int) | ||
861 | , linkOutputChan :: !(Maybe (Chan Message)) | ||
862 | , linkSession :: !(s) | ||
863 | } | ||
864 | |||
865 | data ConnectionConfig s = ConnectionConfig | ||
866 | { cfgPrefs :: !(ConnectionPrefs) | ||
867 | , cfgSession :: !(SessionLink s) | ||
868 | , cfgWire :: !(Wire s ()) | ||
869 | } | ||
870 | |||
871 | configHandshake :: ConnectionConfig s -> Handshake | ||
872 | configHandshake ConnectionConfig {..} = Handshake | ||
873 | { hsProtocol = prefProtocol cfgPrefs | ||
874 | , hsReserved = prefCaps cfgPrefs | ||
875 | , hsInfoHash = linkTopic cfgSession | ||
876 | , hsPeerId = linkPeerId cfgSession | ||
877 | } | ||
878 | |||
879 | {----------------------------------------------------------------------- | ||
880 | -- Pending connections | ||
881 | -----------------------------------------------------------------------} | ||
882 | |||
883 | -- | Connection in half opened state. A normal usage scenario: | ||
884 | -- | ||
885 | -- * Opened using 'newPendingConnection', usually in the listener | ||
886 | -- loop; | ||
887 | -- | ||
888 | -- * Closed using 'closePending' if 'pendingPeer' is banned, | ||
889 | -- 'pendingCaps' is prohibited or pendingTopic is unknown; | ||
890 | -- | ||
891 | -- * Accepted using 'acceptWire' otherwise. | ||
892 | -- | ||
893 | data PendingConnection = PendingConnection | ||
894 | { pendingSock :: Socket | ||
895 | , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty; | ||
896 | , pendingCaps :: Caps -- ^ advertised by the peer; | ||
897 | , pendingTopic :: InfoHash -- ^ possible non-existent topic. | ||
898 | } | ||
899 | |||
900 | -- | Reconstruct handshake sent by the remote peer. | ||
901 | pendingHandshake :: PendingConnection -> Handshake | ||
902 | pendingHandshake PendingConnection {..} = Handshake | ||
903 | { hsProtocol = def | ||
904 | , hsReserved = pendingCaps | ||
905 | , hsInfoHash = pendingTopic | ||
906 | , hsPeerId = fromMaybe (error "pendingHandshake: impossible") | ||
907 | (peerId pendingPeer) | ||
908 | } | ||
909 | |||
910 | -- | | ||
911 | -- | ||
912 | -- This function can throw 'WireFailure' exception. | ||
913 | -- | ||
914 | newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection | ||
915 | newPendingConnection sock addr = do | ||
916 | Handshake {..} <- recvHandshake sock | ||
917 | unless (hsProtocol == def) $ do | ||
918 | throwIO $ ProtocolError $ InvalidProtocol hsProtocol | ||
919 | return PendingConnection | ||
920 | { pendingSock = sock | ||
921 | , pendingPeer = addr { peerId = Just hsPeerId } | ||
922 | , pendingCaps = hsReserved | ||
923 | , pendingTopic = hsInfoHash | ||
924 | } | ||
925 | |||
926 | -- | Release all resources associated with the given connection. Note | ||
927 | -- that you /must not/ 'closePending' if you 'acceptWire'. | ||
928 | closePending :: PendingConnection -> IO () | ||
929 | closePending PendingConnection {..} = do | ||
930 | close pendingSock | ||
931 | |||
932 | {----------------------------------------------------------------------- | ||
933 | -- Connection setup | ||
934 | -----------------------------------------------------------------------} | ||
935 | |||
936 | chanToSock :: Int -> Chan Message -> Socket -> IO () | ||
937 | chanToSock ka chan sock = | ||
938 | sourceChan ka chan $= conduitPut S.put C.$$ sinkSocket sock | ||
939 | |||
940 | afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair | ||
941 | -> ConnectionConfig s -> IO () | ||
942 | afterHandshaking initiator addr sock | ||
943 | hpair @ (HandshakePair hs hs') | ||
944 | (ConnectionConfig | ||
945 | { cfgPrefs = ConnectionPrefs {..} | ||
946 | , cfgSession = SessionLink {..} | ||
947 | , cfgWire = wire | ||
948 | }) = do | ||
949 | let caps = hsReserved hs <> hsReserved hs' | ||
950 | cstate <- newIORef def { _connStats = establishedStats hpair } | ||
951 | chan <- maybe newChan return linkOutputChan | ||
952 | let conn = Connection { | ||
953 | connInitiatedBy = initiator | ||
954 | , connRemoteAddr = addr | ||
955 | , connProtocol = hsProtocol hs | ||
956 | , connCaps = caps | ||
957 | , connTopic = hsInfoHash hs | ||
958 | , connRemotePeerId = hsPeerId hs' | ||
959 | , connThisPeerId = hsPeerId hs | ||
960 | , connOptions = def | ||
961 | , connState = cstate | ||
962 | , connSession = linkSession | ||
963 | , connChan = chan | ||
964 | } | ||
965 | |||
966 | -- TODO make KA interval configurable | ||
967 | let kaInterval = defaultKeepAliveInterval | ||
968 | wire' = if ExtExtended `allowed` caps | ||
969 | then extendedHandshake prefExtCaps >> wire | ||
970 | else wire | ||
971 | |||
972 | bracket (forkIO (chanToSock kaInterval chan sock)) | ||
973 | (killThread) | ||
974 | (\ _ -> runWire wire' sock chan conn) | ||
975 | |||
976 | -- | Initiate 'Wire' connection and handshake with a peer. This function will | ||
977 | -- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on | ||
978 | -- both sides. | ||
979 | -- | ||
980 | -- This function can throw 'WireFailure' exception. | ||
981 | -- | ||
982 | connectWire :: PeerAddr IP -> ConnectionConfig s -> IO () | ||
983 | connectWire addr cfg = do | ||
984 | let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return | ||
985 | bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do | ||
986 | let hs = configHandshake cfg | ||
987 | hs' <- initiateHandshake sock hs | ||
988 | let hpair = HandshakePair hs hs' | ||
989 | validatePair hpair addr | ||
990 | afterHandshaking ThisPeer addr sock hpair cfg | ||
991 | |||
992 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed | ||
993 | -- socket. For peer listener loop the 'acceptSafe' should be | ||
994 | -- prefered against 'accept'. The socket will be closed at exit. | ||
995 | -- | ||
996 | -- This function can throw 'WireFailure' exception. | ||
997 | -- | ||
998 | acceptWire :: PendingConnection -> ConnectionConfig s -> IO () | ||
999 | acceptWire pc @ PendingConnection {..} cfg = do | ||
1000 | bracket (return pendingSock) close $ \ _ -> do | ||
1001 | unless (linkTopic (cfgSession cfg) == pendingTopic) $ do | ||
1002 | throwIO (ProtocolError (UnexpectedTopic pendingTopic)) | ||
1003 | |||
1004 | let hs = configHandshake cfg | ||
1005 | sendHandshake pendingSock hs | ||
1006 | let hpair = HandshakePair hs (pendingHandshake pc) | ||
1007 | |||
1008 | afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg | ||
1009 | |||
1010 | -- | Used when size of bitfield becomes known. | ||
1011 | resizeBitfield :: Int -> Connected s () | ||
1012 | resizeBitfield n = connBitfield %= adjustSize n | ||
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Download.hs b/bittorrent/src/Network/BitTorrent/Exchange/Download.hs new file mode 100644 index 00000000..981db2fb --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Download.hs | |||
@@ -0,0 +1,296 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- | ||
9 | -- | ||
10 | {-# LANGUAGE FlexibleContexts #-} | ||
11 | {-# LANGUAGE FlexibleInstances #-} | ||
12 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
13 | {-# LANGUAGE FunctionalDependencies #-} | ||
14 | {-# LANGUAGE TemplateHaskell #-} | ||
15 | module Network.BitTorrent.Exchange.Download | ||
16 | ( -- * Downloading | ||
17 | Download (..) | ||
18 | , Updates | ||
19 | , runDownloadUpdates | ||
20 | |||
21 | -- ** Metadata | ||
22 | -- $metadata-download | ||
23 | , MetadataDownload | ||
24 | , metadataDownload | ||
25 | |||
26 | -- ** Content | ||
27 | -- $content-download | ||
28 | , ContentDownload | ||
29 | , contentDownload | ||
30 | ) where | ||
31 | |||
32 | import Control.Applicative | ||
33 | import Control.Concurrent | ||
34 | import Control.Lens | ||
35 | import Control.Monad.State | ||
36 | import Data.BEncode as BE | ||
37 | import Data.ByteString as BS | ||
38 | import Data.ByteString.Lazy as BL | ||
39 | import Data.Default | ||
40 | import Data.List as L | ||
41 | import Data.Maybe | ||
42 | import Data.Map as M | ||
43 | import Data.Tuple | ||
44 | |||
45 | import Data.Torrent as Torrent | ||
46 | import Network.Address | ||
47 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
48 | import Network.BitTorrent.Exchange.Block as Block | ||
49 | import Network.BitTorrent.Exchange.Message as Msg | ||
50 | import System.Torrent.Storage (Storage, writePiece) | ||
51 | |||
52 | |||
53 | {----------------------------------------------------------------------- | ||
54 | -- Class | ||
55 | -----------------------------------------------------------------------} | ||
56 | |||
57 | type Updates s a = StateT s IO a | ||
58 | |||
59 | runDownloadUpdates :: MVar s -> Updates s a -> IO a | ||
60 | runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
61 | |||
62 | class Download s chunk | s -> chunk where | ||
63 | scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] | ||
64 | |||
65 | -- | | ||
66 | scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) | ||
67 | scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf | ||
68 | |||
69 | -- | Get number of sent requests to this peer. | ||
70 | getRequestQueueLength :: PeerAddr IP -> Updates s Int | ||
71 | |||
72 | -- | Remove all pending block requests to the remote peer. May be used | ||
73 | -- when: | ||
74 | -- | ||
75 | -- * a peer closes connection; | ||
76 | -- | ||
77 | -- * remote peer choked this peer; | ||
78 | -- | ||
79 | -- * timeout expired. | ||
80 | -- | ||
81 | resetPending :: PeerAddr IP -> Updates s () | ||
82 | |||
83 | -- | MAY write to storage, if a new piece have been completed. | ||
84 | -- | ||
85 | -- You should check if a returned by peer block is actually have | ||
86 | -- been requested and in-flight. This is needed to avoid "I send | ||
87 | -- random corrupted block" attacks. | ||
88 | pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) | ||
89 | |||
90 | {----------------------------------------------------------------------- | ||
91 | -- Metadata download | ||
92 | -----------------------------------------------------------------------} | ||
93 | -- $metadata-download | ||
94 | -- TODO | ||
95 | |||
96 | data MetadataDownload = MetadataDownload | ||
97 | { _pendingPieces :: [(PeerAddr IP, PieceIx)] | ||
98 | , _bucket :: Bucket | ||
99 | , _topic :: InfoHash | ||
100 | } | ||
101 | |||
102 | makeLenses ''MetadataDownload | ||
103 | |||
104 | -- | Create a new scheduler for infodict of the given size. | ||
105 | metadataDownload :: Int -> InfoHash -> MetadataDownload | ||
106 | metadataDownload ps = MetadataDownload [] (Block.empty ps) | ||
107 | |||
108 | instance Default MetadataDownload where | ||
109 | def = error "instance Default MetadataDownload" | ||
110 | |||
111 | --cancelPending :: PieceIx -> Updates () | ||
112 | cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) | ||
113 | |||
114 | instance Download MetadataDownload (Piece BS.ByteString) where | ||
115 | scheduleBlock addr bf = do | ||
116 | bkt <- use bucket | ||
117 | case spans metadataPieceSize bkt of | ||
118 | [] -> return Nothing | ||
119 | ((off, _ ) : _) -> do | ||
120 | let pix = off `div` metadataPieceSize | ||
121 | pendingPieces %= ((addr, pix) :) | ||
122 | return (Just (BlockIx pix 0 metadataPieceSize)) | ||
123 | |||
124 | resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) | ||
125 | |||
126 | pushBlock addr Torrent.Piece {..} = do | ||
127 | p <- use pendingPieces | ||
128 | when ((addr, pieceIndex) `L.notElem` p) $ | ||
129 | error "not requested" | ||
130 | cancelPending pieceIndex | ||
131 | |||
132 | bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData | ||
133 | b <- use bucket | ||
134 | case toPiece b of | ||
135 | Nothing -> return Nothing | ||
136 | Just chunks -> do | ||
137 | t <- use topic | ||
138 | case parseInfoDict (BL.toStrict chunks) t of | ||
139 | Right x -> do | ||
140 | pendingPieces .= [] | ||
141 | return undefined -- (Just x) | ||
142 | Left e -> do | ||
143 | pendingPieces .= [] | ||
144 | bucket .= Block.empty (Block.size b) | ||
145 | return undefined -- Nothing | ||
146 | where | ||
147 | -- todo use incremental parsing to avoid BS.concat call | ||
148 | parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict | ||
149 | parseInfoDict chunk topic = | ||
150 | case BE.decode chunk of | ||
151 | Right (infodict @ InfoDict {..}) | ||
152 | | topic == idInfoHash -> return infodict | ||
153 | | otherwise -> Left "broken infodict" | ||
154 | Left err -> Left $ "unable to parse infodict " ++ err | ||
155 | |||
156 | {----------------------------------------------------------------------- | ||
157 | -- Content download | ||
158 | -----------------------------------------------------------------------} | ||
159 | -- $content-download | ||
160 | -- | ||
161 | -- A block can have one of the following status: | ||
162 | -- | ||
163 | -- 1) /not allowed/: Piece is not in download set. | ||
164 | -- | ||
165 | -- 2) /waiting/: (allowed?) Block have been allowed to download, | ||
166 | -- but /this/ peer did not send any 'Request' message for this | ||
167 | -- block. To allow some piece use | ||
168 | -- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' | ||
169 | -- and 'allowPiece'. | ||
170 | -- | ||
171 | -- 3) /inflight/: (pending?) Block have been requested but | ||
172 | -- /remote/ peer did not send any 'Piece' message for this block. | ||
173 | -- Related functions 'markInflight' | ||
174 | -- | ||
175 | -- 4) /pending/: (stalled?) Block have have been downloaded | ||
176 | -- Related functions 'insertBlock'. | ||
177 | -- | ||
178 | -- Piece status: | ||
179 | -- | ||
180 | -- 1) /assembled/: (downloaded?) All blocks in piece have been | ||
181 | -- downloaded but the piece did not verified yet. | ||
182 | -- | ||
183 | -- * Valid: go to completed; | ||
184 | -- | ||
185 | -- * Invalid: go to waiting. | ||
186 | -- | ||
187 | -- 2) /corrupted/: | ||
188 | -- | ||
189 | -- 3) /downloaded/: (verified?) A piece have been successfully | ||
190 | -- verified via the hash. Usually the piece should be stored to | ||
191 | -- the 'System.Torrent.Storage' and /this/ peer should send 'Have' | ||
192 | -- messages to the /remote/ peers. | ||
193 | -- | ||
194 | |||
195 | data PieceEntry = PieceEntry | ||
196 | { pending :: [(PeerAddr IP, BlockIx)] | ||
197 | , stalled :: Bucket | ||
198 | } | ||
199 | |||
200 | pieceEntry :: PieceSize -> PieceEntry | ||
201 | pieceEntry s = PieceEntry [] (Block.empty s) | ||
202 | |||
203 | isEmpty :: PieceEntry -> Bool | ||
204 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled | ||
205 | |||
206 | _holes :: PieceIx -> PieceEntry -> [BlockIx] | ||
207 | _holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) | ||
208 | where | ||
209 | mkBlockIx (off, sz) = BlockIx pix off sz | ||
210 | |||
211 | data ContentDownload = ContentDownload | ||
212 | { inprogress :: !(Map PieceIx PieceEntry) | ||
213 | , bitfield :: !Bitfield | ||
214 | , pieceSize :: !PieceSize | ||
215 | , contentStorage :: Storage | ||
216 | } | ||
217 | |||
218 | contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload | ||
219 | contentDownload = ContentDownload M.empty | ||
220 | |||
221 | --modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () | ||
222 | modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s | ||
223 | { inprogress = alter (g pieceSize) pix inprogress } | ||
224 | where | ||
225 | g s = h . f . fromMaybe (pieceEntry s) | ||
226 | h e | ||
227 | | isEmpty e = Nothing | ||
228 | | otherwise = Just e | ||
229 | |||
230 | instance Download ContentDownload (Block BL.ByteString) where | ||
231 | scheduleBlocks n addr maskBF = do | ||
232 | ContentDownload {..} <- get | ||
233 | let wantPieces = maskBF `BF.difference` bitfield | ||
234 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ | ||
235 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) | ||
236 | inprogress | ||
237 | |||
238 | bixs <- if L.null wantBlocks | ||
239 | then do | ||
240 | mpix <- choosePiece wantPieces | ||
241 | case mpix of -- TODO return 'n' blocks | ||
242 | Nothing -> return [] | ||
243 | Just pix -> return [leadingBlock pix defaultTransferSize] | ||
244 | else chooseBlocks wantBlocks n | ||
245 | |||
246 | forM_ bixs $ \ bix -> do | ||
247 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e | ||
248 | { pending = (addr, bix) : pending } | ||
249 | |||
250 | return bixs | ||
251 | where | ||
252 | -- TODO choose block nearest to pending or stalled sets to reduce disk | ||
253 | -- seeks on remote machines | ||
254 | --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] | ||
255 | chooseBlocks xs n = return (L.take n xs) | ||
256 | |||
257 | -- TODO use selection strategies from Exchange.Selector | ||
258 | --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) | ||
259 | choosePiece bf | ||
260 | | BF.null bf = return $ Nothing | ||
261 | | otherwise = return $ Just $ BF.findMin bf | ||
262 | |||
263 | getRequestQueueLength addr = do | ||
264 | m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) | ||
265 | return $ L.sum $ L.map L.length $ M.elems m | ||
266 | |||
267 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | ||
268 | where | ||
269 | reset = fmap $ \ e -> e | ||
270 | { pending = L.filter (not . (==) addr . fst) (pending e) } | ||
271 | |||
272 | pushBlock addr blk @ Block {..} = do | ||
273 | mpe <- gets (M.lookup blkPiece . inprogress) | ||
274 | case mpe of | ||
275 | Nothing -> return Nothing | ||
276 | Just (pe @ PieceEntry {..}) | ||
277 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing | ||
278 | | otherwise -> do | ||
279 | let bkt' = Block.insertLazy blkOffset blkData stalled | ||
280 | case toPiece bkt' of | ||
281 | Nothing -> do | ||
282 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e | ||
283 | { pending = L.filter ((==) (blockIx blk) . snd) pending | ||
284 | , stalled = bkt' | ||
285 | } | ||
286 | return (Just False) | ||
287 | |||
288 | Just pieceData -> do | ||
289 | -- TODO verify | ||
290 | storage <- gets contentStorage | ||
291 | liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage | ||
292 | modify $ \ s @ ContentDownload {..} -> s | ||
293 | { inprogress = M.delete blkPiece inprogress | ||
294 | , bitfield = BF.insert blkPiece bitfield | ||
295 | } | ||
296 | return (Just True) | ||
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs b/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs new file mode 100644 index 00000000..30a6a607 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Manager.hs | |||
@@ -0,0 +1,62 @@ | |||
1 | module Network.BitTorrent.Exchange.Manager | ||
2 | ( Options (..) | ||
3 | , Manager | ||
4 | , Handler | ||
5 | , newManager | ||
6 | , closeManager | ||
7 | ) where | ||
8 | |||
9 | import Control.Concurrent | ||
10 | import Control.Exception hiding (Handler) | ||
11 | import Control.Monad | ||
12 | import Data.Default | ||
13 | import Network.Socket | ||
14 | |||
15 | import Data.Torrent | ||
16 | import Network.Address | ||
17 | import Network.BitTorrent.Exchange.Connection hiding (Options) | ||
18 | import Network.BitTorrent.Exchange.Session | ||
19 | |||
20 | |||
21 | data Options = Options | ||
22 | { optBacklog :: Int | ||
23 | , optPeerAddr :: PeerAddr IP | ||
24 | } deriving (Show, Eq) | ||
25 | |||
26 | instance Default Options where | ||
27 | def = Options | ||
28 | { optBacklog = maxListenQueue | ||
29 | , optPeerAddr = def | ||
30 | } | ||
31 | |||
32 | data Manager = Manager | ||
33 | { listener :: !ThreadId | ||
34 | } | ||
35 | |||
36 | type Handler = InfoHash -> IO Session | ||
37 | |||
38 | handleNewConn :: Socket -> PeerAddr IP -> Handler -> IO () | ||
39 | handleNewConn sock addr handler = do | ||
40 | conn <- newPendingConnection sock addr | ||
41 | ses <- handler (pendingTopic conn) `onException` closePending conn | ||
42 | establish conn ses | ||
43 | |||
44 | listenIncoming :: Options -> Handler -> IO () | ||
45 | listenIncoming Options {..} handler = do | ||
46 | bracket (socket AF_INET Stream defaultProtocol) close $ \ sock -> do | ||
47 | bind sock (toSockAddr optPeerAddr) | ||
48 | listen sock optBacklog | ||
49 | forever $ do | ||
50 | (conn, sockAddr) <- accept sock | ||
51 | case fromSockAddr sockAddr of | ||
52 | Nothing -> return () | ||
53 | Just addr -> void $ forkIO $ handleNewConn sock addr handler | ||
54 | |||
55 | newManager :: Options -> Handler -> IO Manager | ||
56 | newManager opts handler = do | ||
57 | tid <- forkIO $ listenIncoming opts handler | ||
58 | return (Manager tid) | ||
59 | |||
60 | closeManager :: Manager -> IO () | ||
61 | closeManager Manager {..} = do | ||
62 | killThread listener \ No newline at end of file | ||
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Message.hs b/bittorrent/src/Network/BitTorrent/Exchange/Message.hs new file mode 100644 index 00000000..2c6770f7 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Message.hs | |||
@@ -0,0 +1,1232 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Normally peer to peer communication consisting of the following | ||
9 | -- steps: | ||
10 | -- | ||
11 | -- * In order to establish the connection between peers we should | ||
12 | -- send 'Handshake' message. The 'Handshake' is a required message | ||
13 | -- and must be the first message transmitted by the peer to the | ||
14 | -- another peer. Another peer should reply with a handshake as well. | ||
15 | -- | ||
16 | -- * Next peer might sent bitfield message, but might not. In the | ||
17 | -- former case we should update bitfield peer have. Again, if we | ||
18 | -- have some pieces we should send bitfield. Normally bitfield | ||
19 | -- message should sent after the handshake message. | ||
20 | -- | ||
21 | -- * Regular exchange messages. TODO docs | ||
22 | -- | ||
23 | -- For more high level API see "Network.BitTorrent.Exchange" module. | ||
24 | -- | ||
25 | -- For more infomation see: | ||
26 | -- <https://wiki.theory.org/BitTorrentSpecification#Peer_wire_protocol_.28TCP.29> | ||
27 | -- | ||
28 | {-# LANGUAGE ViewPatterns #-} | ||
29 | {-# LANGUAGE FlexibleInstances #-} | ||
30 | {-# LANGUAGE FlexibleContexts #-} | ||
31 | {-# LANGUAGE TypeFamilies #-} | ||
32 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
33 | {-# LANGUAGE DeriveDataTypeable #-} | ||
34 | {-# LANGUAGE TemplateHaskell #-} | ||
35 | {-# OPTIONS -fno-warn-orphans #-} | ||
36 | module Network.BitTorrent.Exchange.Message | ||
37 | ( -- * Capabilities | ||
38 | Capabilities (..) | ||
39 | , Extension (..) | ||
40 | , Caps | ||
41 | |||
42 | -- * Handshake | ||
43 | , ProtocolName | ||
44 | , Handshake(..) | ||
45 | , defaultHandshake | ||
46 | , handshakeSize | ||
47 | , handshakeMaxSize | ||
48 | , handshakeStats | ||
49 | |||
50 | -- * Stats | ||
51 | , ByteCount | ||
52 | , ByteStats (..) | ||
53 | , byteLength | ||
54 | |||
55 | -- * Messages | ||
56 | , Message (..) | ||
57 | , defaultKeepAliveTimeout | ||
58 | , defaultKeepAliveInterval | ||
59 | , PeerMessage (..) | ||
60 | |||
61 | -- ** Core messages | ||
62 | , StatusUpdate (..) | ||
63 | , Available (..) | ||
64 | , Transfer (..) | ||
65 | , defaultRequestQueueLength | ||
66 | |||
67 | -- ** Fast extension | ||
68 | , FastMessage (..) | ||
69 | |||
70 | -- ** Extension protocol | ||
71 | , ExtendedMessage (..) | ||
72 | |||
73 | -- *** Capabilities | ||
74 | , ExtendedExtension (..) | ||
75 | , ExtendedCaps (..) | ||
76 | |||
77 | -- *** Handshake | ||
78 | , ExtendedHandshake (..) | ||
79 | , defaultQueueLength | ||
80 | , nullExtendedHandshake | ||
81 | |||
82 | -- *** Metadata | ||
83 | , ExtendedMetadata (..) | ||
84 | , metadataPieceSize | ||
85 | , defaultMetadataFactor | ||
86 | , defaultMaxInfoDictSize | ||
87 | , isLastPiece | ||
88 | , isValidPiece | ||
89 | ) where | ||
90 | |||
91 | import Control.Applicative | ||
92 | import Control.Arrow ((&&&), (***)) | ||
93 | import Control.Monad (when) | ||
94 | import Data.Attoparsec.ByteString.Char8 as BS | ||
95 | import Data.BEncode as BE | ||
96 | import Data.BEncode.BDict as BE | ||
97 | import Data.BEncode.Internal as BE (ppBEncode, parser) | ||
98 | import Data.BEncode.Types (BDict) | ||
99 | import Data.Bits | ||
100 | import Data.ByteString as BS | ||
101 | import Data.ByteString.Char8 as BC | ||
102 | import Data.ByteString.Lazy as BL | ||
103 | import Data.Default | ||
104 | import Data.List as L | ||
105 | import Data.Map.Strict as M | ||
106 | import Data.Maybe | ||
107 | import Data.Monoid | ||
108 | import Data.Ord | ||
109 | import Data.Serialize as S | ||
110 | import Data.String | ||
111 | import Data.Text as T | ||
112 | import Data.Typeable | ||
113 | import Data.Word | ||
114 | import Data.IP | ||
115 | import Network | ||
116 | import Network.Socket hiding (KeepAlive) | ||
117 | import Text.PrettyPrint as PP hiding ((<>)) | ||
118 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
119 | |||
120 | import Data.Torrent hiding (Piece (..)) | ||
121 | import qualified Data.Torrent as P (Piece (..)) | ||
122 | import Network.Address | ||
123 | import Network.BitTorrent.Exchange.Bitfield | ||
124 | import Network.BitTorrent.Exchange.Block | ||
125 | |||
126 | {----------------------------------------------------------------------- | ||
127 | -- Capabilities | ||
128 | -----------------------------------------------------------------------} | ||
129 | |||
130 | -- | | ||
131 | class Capabilities caps where | ||
132 | type Ext caps :: * | ||
133 | |||
134 | -- | Pack extensions to caps. | ||
135 | toCaps :: [Ext caps] -> caps | ||
136 | |||
137 | -- | Unpack extensions from caps. | ||
138 | fromCaps :: caps -> [Ext caps] | ||
139 | |||
140 | -- | Check if an extension is a member of the specified set. | ||
141 | allowed :: Ext caps -> caps -> Bool | ||
142 | |||
143 | ppCaps :: Capabilities caps => Pretty (Ext caps) => caps -> Doc | ||
144 | ppCaps = hcat . punctuate ", " . L.map pPrint . fromCaps | ||
145 | |||
146 | {----------------------------------------------------------------------- | ||
147 | -- Extensions | ||
148 | -----------------------------------------------------------------------} | ||
149 | |||
150 | -- | Enumeration of message extension protocols. | ||
151 | -- | ||
152 | -- For more info see: <http://www.bittorrent.org/beps/bep_0004.html> | ||
153 | -- | ||
154 | data Extension | ||
155 | = ExtDHT -- ^ BEP 5: allow to send PORT messages. | ||
156 | | ExtFast -- ^ BEP 6: allow to send FAST messages. | ||
157 | | ExtExtended -- ^ BEP 10: allow to send the extension protocol messages. | ||
158 | deriving (Show, Eq, Ord, Enum, Bounded) | ||
159 | |||
160 | -- | Full extension names, suitable for logging. | ||
161 | instance Pretty Extension where | ||
162 | pPrint ExtDHT = "Distributed Hash Table Protocol" | ||
163 | pPrint ExtFast = "Fast Extension" | ||
164 | pPrint ExtExtended = "Extension Protocol" | ||
165 | |||
166 | -- | Extension bitmask as specified by BEP 4. | ||
167 | extMask :: Extension -> Word64 | ||
168 | extMask ExtDHT = 0x01 | ||
169 | extMask ExtFast = 0x04 | ||
170 | extMask ExtExtended = 0x100000 | ||
171 | |||
172 | {----------------------------------------------------------------------- | ||
173 | -- Capabilities | ||
174 | -----------------------------------------------------------------------} | ||
175 | |||
176 | -- | Capabilities is a set of 'Extension's usually sent in 'Handshake' | ||
177 | -- messages. | ||
178 | newtype Caps = Caps Word64 | ||
179 | deriving (Show, Eq) | ||
180 | |||
181 | -- | Render set of extensions as comma separated list. | ||
182 | instance Pretty Caps where | ||
183 | pPrint = ppCaps | ||
184 | {-# INLINE pPrint #-} | ||
185 | |||
186 | -- | The empty set. | ||
187 | instance Default Caps where | ||
188 | def = Caps 0 | ||
189 | {-# INLINE def #-} | ||
190 | |||
191 | -- | Monoid under intersection. 'mempty' includes all known extensions. | ||
192 | instance Monoid Caps where | ||
193 | mempty = toCaps [minBound .. maxBound] | ||
194 | {-# INLINE mempty #-} | ||
195 | |||
196 | mappend (Caps a) (Caps b) = Caps (a .&. b) | ||
197 | {-# INLINE mappend #-} | ||
198 | |||
199 | -- | 'Handshake' compatible encoding. | ||
200 | instance Serialize Caps where | ||
201 | put (Caps caps) = S.putWord64be caps | ||
202 | {-# INLINE put #-} | ||
203 | |||
204 | get = Caps <$> S.getWord64be | ||
205 | {-# INLINE get #-} | ||
206 | |||
207 | instance Capabilities Caps where | ||
208 | type Ext Caps = Extension | ||
209 | |||
210 | allowed e (Caps caps) = (extMask e .&. caps) /= 0 | ||
211 | {-# INLINE allowed #-} | ||
212 | |||
213 | toCaps = Caps . L.foldr (.|.) 0 . L.map extMask | ||
214 | fromCaps caps = L.filter (`allowed` caps) [minBound..maxBound] | ||
215 | |||
216 | {----------------------------------------------------------------------- | ||
217 | Handshake | ||
218 | -----------------------------------------------------------------------} | ||
219 | |||
220 | maxProtocolNameSize :: Word8 | ||
221 | maxProtocolNameSize = maxBound | ||
222 | |||
223 | -- | The protocol name is used to identify to the local peer which | ||
224 | -- version of BTP the remote peer uses. | ||
225 | newtype ProtocolName = ProtocolName BS.ByteString | ||
226 | deriving (Eq, Ord, Typeable) | ||
227 | |||
228 | -- | In BTP/1.0 the name is 'BitTorrent protocol'. If this string is | ||
229 | -- different from the local peers own protocol name, then the | ||
230 | -- connection is to be dropped. | ||
231 | instance Default ProtocolName where | ||
232 | def = ProtocolName "BitTorrent protocol" | ||
233 | |||
234 | instance Show ProtocolName where | ||
235 | show (ProtocolName bs) = show bs | ||
236 | |||
237 | instance Pretty ProtocolName where | ||
238 | pPrint (ProtocolName bs) = PP.text $ BC.unpack bs | ||
239 | |||
240 | instance IsString ProtocolName where | ||
241 | fromString str | ||
242 | | L.length str <= fromIntegral maxProtocolNameSize | ||
243 | = ProtocolName (fromString str) | ||
244 | | otherwise = error $ "fromString: ProtocolName too long: " ++ str | ||
245 | |||
246 | instance Serialize ProtocolName where | ||
247 | put (ProtocolName bs) = do | ||
248 | putWord8 $ fromIntegral $ BS.length bs | ||
249 | putByteString bs | ||
250 | |||
251 | get = do | ||
252 | len <- getWord8 | ||
253 | bs <- getByteString $ fromIntegral len | ||
254 | return (ProtocolName bs) | ||
255 | |||
256 | -- | Handshake message is used to exchange all information necessary | ||
257 | -- to establish connection between peers. | ||
258 | -- | ||
259 | data Handshake = Handshake { | ||
260 | -- | Identifier of the protocol. This is usually equal to 'def'. | ||
261 | hsProtocol :: ProtocolName | ||
262 | |||
263 | -- | Reserved bytes used to specify supported BEP's. | ||
264 | , hsReserved :: Caps | ||
265 | |||
266 | -- | Info hash of the info part of the metainfo file. that is | ||
267 | -- transmitted in tracker requests. Info hash of the initiator | ||
268 | -- handshake and response handshake should match, otherwise | ||
269 | -- initiator should break the connection. | ||
270 | -- | ||
271 | , hsInfoHash :: InfoHash | ||
272 | |||
273 | -- | Peer id of the initiator. This is usually the same peer id | ||
274 | -- that is transmitted in tracker requests. | ||
275 | -- | ||
276 | , hsPeerId :: PeerId | ||
277 | |||
278 | } deriving (Show, Eq) | ||
279 | |||
280 | instance Serialize Handshake where | ||
281 | put Handshake {..} = do | ||
282 | put hsProtocol | ||
283 | put hsReserved | ||
284 | put hsInfoHash | ||
285 | put hsPeerId | ||
286 | get = Handshake <$> get <*> get <*> get <*> get | ||
287 | |||
288 | -- | Show handshake protocol string, caps and fingerprint. | ||
289 | instance Pretty Handshake where | ||
290 | pPrint Handshake {..} | ||
291 | = pPrint hsProtocol $$ | ||
292 | pPrint hsReserved $$ | ||
293 | pPrint (fingerprint hsPeerId) | ||
294 | |||
295 | -- | Get handshake message size in bytes from the length of protocol | ||
296 | -- string. | ||
297 | handshakeSize :: Word8 -> Int | ||
298 | handshakeSize n = 1 + fromIntegral n + 8 + 20 + 20 | ||
299 | |||
300 | -- | Maximum size of handshake message in bytes. | ||
301 | handshakeMaxSize :: Int | ||
302 | handshakeMaxSize = handshakeSize maxProtocolNameSize | ||
303 | |||
304 | -- | Handshake with default protocol string and reserved bitmask. | ||
305 | defaultHandshake :: InfoHash -> PeerId -> Handshake | ||
306 | defaultHandshake = Handshake def def | ||
307 | |||
308 | handshakeStats :: Handshake -> ByteStats | ||
309 | handshakeStats (Handshake (ProtocolName bs) _ _ _) | ||
310 | = ByteStats 1 (BS.length bs + 8 + 20 + 20) 0 | ||
311 | |||
312 | {----------------------------------------------------------------------- | ||
313 | -- Stats | ||
314 | -----------------------------------------------------------------------} | ||
315 | |||
316 | -- | Number of bytes. | ||
317 | type ByteCount = Int | ||
318 | |||
319 | -- | Summary of encoded message byte layout can be used to collect | ||
320 | -- stats about message flow in both directions. This data can be | ||
321 | -- retrieved using 'stats' function. | ||
322 | data ByteStats = ByteStats | ||
323 | { -- | Number of bytes used to help encode 'control' and 'payload' | ||
324 | -- bytes: message size, message ID's, etc | ||
325 | overhead :: {-# UNPACK #-} !ByteCount | ||
326 | |||
327 | -- | Number of bytes used to exchange peers state\/options: piece | ||
328 | -- and block indexes, infohash, port numbers, peer ID\/IP, etc. | ||
329 | , control :: {-# UNPACK #-} !ByteCount | ||
330 | |||
331 | -- | Number of payload bytes: torrent data blocks and infodict | ||
332 | -- metadata. | ||
333 | , payload :: {-# UNPACK #-} !ByteCount | ||
334 | } deriving Show | ||
335 | |||
336 | instance Pretty ByteStats where | ||
337 | pPrint s @ ByteStats {..} = fsep | ||
338 | [ PP.int overhead, "overhead" | ||
339 | , PP.int control, "control" | ||
340 | , PP.int payload, "payload" | ||
341 | , "bytes" | ||
342 | ] $+$ fsep | ||
343 | [ PP.int (byteLength s), "total bytes" | ||
344 | ] | ||
345 | |||
346 | -- | Empty byte sequences. | ||
347 | instance Default ByteStats where | ||
348 | def = ByteStats 0 0 0 | ||
349 | |||
350 | -- | Monoid under addition. | ||
351 | instance Monoid ByteStats where | ||
352 | mempty = def | ||
353 | mappend a b = ByteStats | ||
354 | { overhead = overhead a + overhead b | ||
355 | , control = control a + control b | ||
356 | , payload = payload a + payload b | ||
357 | } | ||
358 | |||
359 | -- | Sum of the all byte sequences. | ||
360 | byteLength :: ByteStats -> Int | ||
361 | byteLength ByteStats {..} = overhead + control + payload | ||
362 | |||
363 | {----------------------------------------------------------------------- | ||
364 | -- Regular messages | ||
365 | -----------------------------------------------------------------------} | ||
366 | |||
367 | -- | Messages which can be sent after handshaking. Minimal complete | ||
368 | -- definition: 'envelop'. | ||
369 | class PeerMessage a where | ||
370 | -- | Construct a message to be /sent/. Note that if 'ExtendedCaps' | ||
371 | -- do not contain mapping for this message the default | ||
372 | -- 'ExtendedMessageId' is used. | ||
373 | envelop :: ExtendedCaps -- ^ The /receiver/ extended capabilities; | ||
374 | -> a -- ^ An regular message; | ||
375 | -> Message -- ^ Enveloped message to sent. | ||
376 | |||
377 | -- | Find out the extension this message belong to. Can be used to | ||
378 | -- check if this message is allowed to send\/recv in current | ||
379 | -- session. | ||
380 | requires :: a -> Maybe Extension | ||
381 | requires _ = Nothing | ||
382 | |||
383 | -- | Get sizes of overhead\/control\/payload byte sequences of | ||
384 | -- binary message representation without encoding message to binary | ||
385 | -- bytestring. | ||
386 | -- | ||
387 | -- This function should obey one law: | ||
388 | -- | ||
389 | -- * 'byteLength' ('stats' msg) == 'BL.length' ('encode' msg) | ||
390 | -- | ||
391 | stats :: a -> ByteStats | ||
392 | stats _ = ByteStats 4 0 0 | ||
393 | |||
394 | {----------------------------------------------------------------------- | ||
395 | -- Status messages | ||
396 | -----------------------------------------------------------------------} | ||
397 | |||
398 | -- | Notification that the sender have updated its | ||
399 | -- 'Network.BitTorrent.Exchange.Status.PeerStatus'. | ||
400 | data StatusUpdate | ||
401 | -- | Notification that the sender will not upload data to the | ||
402 | -- receiver until unchoking happen. | ||
403 | = Choking !Bool | ||
404 | |||
405 | -- | Notification that the sender is interested (or not interested) | ||
406 | -- in any of the receiver's data pieces. | ||
407 | | Interested !Bool | ||
408 | deriving (Show, Eq, Ord, Typeable) | ||
409 | |||
410 | instance Pretty StatusUpdate where | ||
411 | pPrint (Choking False) = "not choking" | ||
412 | pPrint (Choking True ) = "choking" | ||
413 | pPrint (Interested False) = "not interested" | ||
414 | pPrint (Interested True ) = "interested" | ||
415 | |||
416 | instance PeerMessage StatusUpdate where | ||
417 | envelop _ = Status | ||
418 | {-# INLINE envelop #-} | ||
419 | |||
420 | stats _ = ByteStats 4 1 0 | ||
421 | {-# INLINE stats #-} | ||
422 | |||
423 | {----------------------------------------------------------------------- | ||
424 | -- Available messages | ||
425 | -----------------------------------------------------------------------} | ||
426 | |||
427 | -- | Messages used to inform receiver which pieces of the torrent | ||
428 | -- sender have. | ||
429 | data Available = | ||
430 | -- | Zero-based index of a piece that has just been successfully | ||
431 | -- downloaded and verified via the hash. | ||
432 | Have ! PieceIx | ||
433 | |||
434 | -- | The bitfield message may only be sent immediately after the | ||
435 | -- handshaking sequence is complete, and before any other message | ||
436 | -- are sent. If client have no pieces then bitfield need not to be | ||
437 | -- sent. | ||
438 | | Bitfield !Bitfield | ||
439 | deriving (Show, Eq) | ||
440 | |||
441 | instance Pretty Available where | ||
442 | pPrint (Have ix ) = "Have" <+> int ix | ||
443 | pPrint (Bitfield _ ) = "Bitfield" | ||
444 | |||
445 | instance PeerMessage Available where | ||
446 | envelop _ = Available | ||
447 | {-# INLINE envelop #-} | ||
448 | |||
449 | stats (Have _) = ByteStats (4 + 1) 4 0 | ||
450 | stats (Bitfield bf) = ByteStats (4 + 1) (q + trailing) 0 | ||
451 | where | ||
452 | trailing = if r == 0 then 0 else 1 | ||
453 | (q, r) = quotRem (totalCount bf) 8 | ||
454 | |||
455 | {----------------------------------------------------------------------- | ||
456 | -- Transfer messages | ||
457 | -----------------------------------------------------------------------} | ||
458 | |||
459 | -- | Messages used to transfer 'Block's. | ||
460 | data Transfer | ||
461 | -- | Request for a particular block. If a client is requested a | ||
462 | -- block that another peer do not have the peer might not answer | ||
463 | -- at all. | ||
464 | = Request ! BlockIx | ||
465 | |||
466 | -- | Response to a request for a block. | ||
467 | | Piece !(Block BL.ByteString) | ||
468 | |||
469 | -- | Used to cancel block requests. It is typically used during | ||
470 | -- "End Game". | ||
471 | | Cancel !BlockIx | ||
472 | deriving (Show, Eq) | ||
473 | |||
474 | instance Pretty Transfer where | ||
475 | pPrint (Request ix ) = "Request" <+> pPrint ix | ||
476 | pPrint (Piece blk) = "Piece" <+> pPrint blk | ||
477 | pPrint (Cancel i ) = "Cancel" <+> pPrint i | ||
478 | |||
479 | instance PeerMessage Transfer where | ||
480 | envelop _ = Transfer | ||
481 | {-# INLINE envelop #-} | ||
482 | |||
483 | stats (Request _ ) = ByteStats (4 + 1) (3 * 4) 0 | ||
484 | stats (Piece p ) = ByteStats (4 + 1) (4 + 4 + blockSize p) 0 | ||
485 | stats (Cancel _ ) = ByteStats (4 + 1) (3 * 4) 0 | ||
486 | |||
487 | -- TODO increase | ||
488 | -- | Max number of pending 'Request's inflight. | ||
489 | defaultRequestQueueLength :: Int | ||
490 | defaultRequestQueueLength = 1 | ||
491 | |||
492 | {----------------------------------------------------------------------- | ||
493 | -- Fast messages | ||
494 | -----------------------------------------------------------------------} | ||
495 | |||
496 | -- | BEP6 messages. | ||
497 | data FastMessage = | ||
498 | -- | If a peer have all pieces it might send the 'HaveAll' message | ||
499 | -- instead of 'Bitfield' message. Used to save bandwidth. | ||
500 | HaveAll | ||
501 | |||
502 | -- | If a peer have no pieces it might send 'HaveNone' message | ||
503 | -- intead of 'Bitfield' message. Used to save bandwidth. | ||
504 | | HaveNone | ||
505 | |||
506 | -- | This is an advisory message meaning "you might like to | ||
507 | -- download this piece." Used to avoid excessive disk seeks and | ||
508 | -- amount of IO. | ||
509 | | SuggestPiece !PieceIx | ||
510 | |||
511 | -- | Notifies a requesting peer that its request will not be | ||
512 | -- satisfied. | ||
513 | | RejectRequest !BlockIx | ||
514 | |||
515 | -- | This is an advisory messsage meaning \"if you ask for this | ||
516 | -- piece, I'll give it to you even if you're choked.\" Used to | ||
517 | -- shorten starting phase. | ||
518 | | AllowedFast !PieceIx | ||
519 | deriving (Show, Eq) | ||
520 | |||
521 | instance Pretty FastMessage where | ||
522 | pPrint (HaveAll ) = "Have all" | ||
523 | pPrint (HaveNone ) = "Have none" | ||
524 | pPrint (SuggestPiece pix) = "Suggest" <+> int pix | ||
525 | pPrint (RejectRequest bix) = "Reject" <+> pPrint bix | ||
526 | pPrint (AllowedFast pix) = "Allowed fast" <+> int pix | ||
527 | |||
528 | instance PeerMessage FastMessage where | ||
529 | envelop _ = Fast | ||
530 | {-# INLINE envelop #-} | ||
531 | |||
532 | requires _ = Just ExtFast | ||
533 | {-# INLINE requires #-} | ||
534 | |||
535 | stats HaveAll = ByteStats 4 1 0 | ||
536 | stats HaveNone = ByteStats 4 1 0 | ||
537 | stats (SuggestPiece _) = ByteStats 5 4 0 | ||
538 | stats (RejectRequest _) = ByteStats 5 12 0 | ||
539 | stats (AllowedFast _) = ByteStats 5 4 0 | ||
540 | |||
541 | {----------------------------------------------------------------------- | ||
542 | -- Extension protocol | ||
543 | -----------------------------------------------------------------------} | ||
544 | |||
545 | {----------------------------------------------------------------------- | ||
546 | -- Extended capabilities | ||
547 | -----------------------------------------------------------------------} | ||
548 | |||
549 | data ExtendedExtension | ||
550 | = ExtMetadata -- ^ BEP 9: Extension for Peers to Send Metadata Files | ||
551 | deriving (Show, Eq, Ord, Enum, Bounded, Typeable) | ||
552 | |||
553 | instance IsString ExtendedExtension where | ||
554 | fromString = fromMaybe (error msg) . fromKey . fromString | ||
555 | where | ||
556 | msg = "fromString: could not parse ExtendedExtension" | ||
557 | |||
558 | instance Pretty ExtendedExtension where | ||
559 | pPrint ExtMetadata = "Extension for Peers to Send Metadata Files" | ||
560 | |||
561 | fromKey :: BKey -> Maybe ExtendedExtension | ||
562 | fromKey "ut_metadata" = Just ExtMetadata | ||
563 | fromKey _ = Nothing | ||
564 | {-# INLINE fromKey #-} | ||
565 | |||
566 | toKey :: ExtendedExtension -> BKey | ||
567 | toKey ExtMetadata = "ut_metadata" | ||
568 | {-# INLINE toKey #-} | ||
569 | |||
570 | type ExtendedMessageId = Word8 | ||
571 | |||
572 | extId :: ExtendedExtension -> ExtendedMessageId | ||
573 | extId ExtMetadata = 1 | ||
574 | {-# INLINE extId #-} | ||
575 | |||
576 | type ExtendedMap = Map ExtendedExtension ExtendedMessageId | ||
577 | |||
578 | -- | The extension IDs must be stored for every peer, because every | ||
579 | -- peer may have different IDs for the same extension. | ||
580 | -- | ||
581 | newtype ExtendedCaps = ExtendedCaps { extendedCaps :: ExtendedMap } | ||
582 | deriving (Show, Eq) | ||
583 | |||
584 | instance Pretty ExtendedCaps where | ||
585 | pPrint = ppCaps | ||
586 | {-# INLINE pPrint #-} | ||
587 | |||
588 | -- | The empty set. | ||
589 | instance Default ExtendedCaps where | ||
590 | def = ExtendedCaps M.empty | ||
591 | |||
592 | -- | Monoid under intersection: | ||
593 | -- | ||
594 | -- * The 'mempty' caps includes all known extensions; | ||
595 | -- | ||
596 | -- * the 'mappend' operation is NOT commutative: it return message | ||
597 | -- id from the first caps for the extensions existing in both caps. | ||
598 | -- | ||
599 | instance Monoid ExtendedCaps where | ||
600 | mempty = toCaps [minBound..maxBound] | ||
601 | mappend (ExtendedCaps a) (ExtendedCaps b) = | ||
602 | ExtendedCaps (M.intersection a b) | ||
603 | |||
604 | appendBDict :: BDict -> ExtendedMap -> ExtendedMap | ||
605 | appendBDict (Cons key val xs) caps | ||
606 | | Just ext <- fromKey key | ||
607 | , Right eid <- fromBEncode val = M.insert ext eid (appendBDict xs caps) | ||
608 | | otherwise = appendBDict xs caps | ||
609 | appendBDict Nil caps = caps | ||
610 | |||
611 | -- | Handshake compatible encoding. | ||
612 | instance BEncode ExtendedCaps where | ||
613 | toBEncode = BDict . BE.fromAscList . L.sortBy (comparing fst) | ||
614 | . L.map (toKey *** toBEncode) . M.toList . extendedCaps | ||
615 | |||
616 | fromBEncode (BDict bd) = pure $ ExtendedCaps $ appendBDict bd M.empty | ||
617 | fromBEncode _ = decodingError "ExtendedCaps" | ||
618 | |||
619 | instance Capabilities ExtendedCaps where | ||
620 | type Ext ExtendedCaps = ExtendedExtension | ||
621 | |||
622 | toCaps = ExtendedCaps . M.fromList . L.map (id &&& extId) | ||
623 | |||
624 | fromCaps = M.keys . extendedCaps | ||
625 | {-# INLINE fromCaps #-} | ||
626 | |||
627 | allowed e (ExtendedCaps caps) = M.member e caps | ||
628 | {-# INLINE allowed #-} | ||
629 | |||
630 | remoteMessageId :: ExtendedExtension -> ExtendedCaps -> ExtendedMessageId | ||
631 | remoteMessageId ext = fromMaybe (extId ext) . M.lookup ext . extendedCaps | ||
632 | |||
633 | {----------------------------------------------------------------------- | ||
634 | -- Extended handshake | ||
635 | -----------------------------------------------------------------------} | ||
636 | |||
637 | -- | This message should be sent immediately after the standard | ||
638 | -- bittorrent handshake to any peer that supports this extension | ||
639 | -- protocol. Extended handshakes can be sent more than once, however | ||
640 | -- an implementation may choose to ignore subsequent handshake | ||
641 | -- messages. | ||
642 | -- | ||
643 | data ExtendedHandshake = ExtendedHandshake | ||
644 | { -- | If this peer has an IPv4 interface, this is the compact | ||
645 | -- representation of that address. | ||
646 | ehsIPv4 :: Maybe HostAddress | ||
647 | |||
648 | -- | If this peer has an IPv6 interface, this is the compact | ||
649 | -- representation of that address. | ||
650 | , ehsIPv6 :: Maybe HostAddress6 | ||
651 | |||
652 | -- | Dictionary of supported extension messages which maps names | ||
653 | -- of extensions to an extended message ID for each extension | ||
654 | -- message. | ||
655 | , ehsCaps :: ExtendedCaps | ||
656 | |||
657 | -- | Size of 'Data.Torrent.InfoDict' in bytes. This field should | ||
658 | -- be added if 'ExtMetadata' is enabled in current session /and/ | ||
659 | -- peer have the torrent file. | ||
660 | , ehsMetadataSize :: Maybe Int | ||
661 | |||
662 | -- | Local TCP /listen/ port. Allows each side to learn about the | ||
663 | -- TCP port number of the other side. | ||
664 | , ehsPort :: Maybe PortNumber | ||
665 | |||
666 | -- | Request queue the number of outstanding 'Request' messages | ||
667 | -- this client supports without dropping any. | ||
668 | , ehsQueueLength :: Maybe Int | ||
669 | |||
670 | -- | Client name and version. | ||
671 | , ehsVersion :: Maybe Text | ||
672 | |||
673 | -- | IP of the remote end | ||
674 | , ehsYourIp :: Maybe IP | ||
675 | } deriving (Show, Eq, Typeable) | ||
676 | |||
677 | extHandshakeId :: ExtendedMessageId | ||
678 | extHandshakeId = 0 | ||
679 | |||
680 | -- | Default 'Request' queue size. | ||
681 | defaultQueueLength :: Int | ||
682 | defaultQueueLength = 1 | ||
683 | |||
684 | -- | All fields are empty. | ||
685 | instance Default ExtendedHandshake where | ||
686 | def = ExtendedHandshake def def def def def def def def | ||
687 | |||
688 | instance Monoid ExtendedHandshake where | ||
689 | mempty = def { ehsCaps = mempty } | ||
690 | mappend old new = ExtendedHandshake { | ||
691 | ehsCaps = ehsCaps old <> ehsCaps new, | ||
692 | ehsIPv4 = ehsIPv4 old `mergeOld` ehsIPv4 new, | ||
693 | ehsIPv6 = ehsIPv6 old `mergeOld` ehsIPv6 new, | ||
694 | ehsMetadataSize = ehsMetadataSize old `mergeNew` ehsMetadataSize new, | ||
695 | ehsPort = ehsPort old `mergeOld` ehsPort new, | ||
696 | ehsQueueLength = ehsQueueLength old `mergeNew` ehsQueueLength new, | ||
697 | ehsVersion = ehsVersion old `mergeOld` ehsVersion new, | ||
698 | ehsYourIp = ehsYourIp old `mergeOld` ehsYourIp new | ||
699 | } | ||
700 | where | ||
701 | mergeOld mold mnew = mold <|> mnew | ||
702 | mergeNew mold mnew = mnew <|> mold | ||
703 | |||
704 | |||
705 | instance BEncode ExtendedHandshake where | ||
706 | toBEncode ExtendedHandshake {..} = toDict $ | ||
707 | "ipv4" .=? (S.encode <$> ehsIPv4) | ||
708 | .: "ipv6" .=? (S.encode <$> ehsIPv6) | ||
709 | .: "m" .=! ehsCaps | ||
710 | .: "metadata_size" .=? ehsMetadataSize | ||
711 | .: "p" .=? ehsPort | ||
712 | .: "reqq" .=? ehsQueueLength | ||
713 | .: "v" .=? ehsVersion | ||
714 | .: "yourip" .=? (runPut <$> either put put <$> toEither <$> ehsYourIp) | ||
715 | .: endDict | ||
716 | where | ||
717 | toEither (IPv4 v4) = Left v4 | ||
718 | toEither (IPv6 v6) = Right v6 | ||
719 | |||
720 | fromBEncode = fromDict $ ExtendedHandshake | ||
721 | <$>? "ipv4" | ||
722 | <*>? "ipv6" | ||
723 | <*>! "m" | ||
724 | <*>? "metadata_size" | ||
725 | <*>? "p" | ||
726 | <*>? "reqq" | ||
727 | <*>? "v" | ||
728 | <*> (opt "yourip" >>= getYourIp) | ||
729 | |||
730 | getYourIp :: Maybe BValue -> BE.Get (Maybe IP) | ||
731 | getYourIp f = | ||
732 | return $ do | ||
733 | BString ip <- f | ||
734 | either (const Nothing) Just $ | ||
735 | case BS.length ip of | ||
736 | 4 -> IPv4 <$> S.decode ip | ||
737 | 16 -> IPv6 <$> S.decode ip | ||
738 | _ -> fail "" | ||
739 | |||
740 | instance Pretty ExtendedHandshake where | ||
741 | pPrint = PP.text . show | ||
742 | |||
743 | -- | NOTE: Approximated 'stats'. | ||
744 | instance PeerMessage ExtendedHandshake where | ||
745 | envelop c = envelop c . EHandshake | ||
746 | {-# INLINE envelop #-} | ||
747 | |||
748 | requires _ = Just ExtExtended | ||
749 | {-# INLINE requires #-} | ||
750 | |||
751 | stats _ = ByteStats (4 + 1 + 1) 100 {- is it ok? -} 0 -- FIXME | ||
752 | {-# INLINE stats #-} | ||
753 | |||
754 | -- | Set default values and the specified 'ExtendedCaps'. | ||
755 | nullExtendedHandshake :: ExtendedCaps -> ExtendedHandshake | ||
756 | nullExtendedHandshake caps = ExtendedHandshake | ||
757 | { ehsIPv4 = Nothing | ||
758 | , ehsIPv6 = Nothing | ||
759 | , ehsCaps = caps | ||
760 | , ehsMetadataSize = Nothing | ||
761 | , ehsPort = Nothing | ||
762 | , ehsQueueLength = Just defaultQueueLength | ||
763 | , ehsVersion = Just $ T.pack $ render $ pPrint libFingerprint | ||
764 | , ehsYourIp = Nothing | ||
765 | } | ||
766 | |||
767 | {----------------------------------------------------------------------- | ||
768 | -- Metadata exchange extension | ||
769 | -----------------------------------------------------------------------} | ||
770 | |||
771 | -- | A peer MUST verify that any piece it sends passes the info-hash | ||
772 | -- verification. i.e. until the peer has the entire metadata, it | ||
773 | -- cannot run SHA-1 to verify that it yields the same hash as the | ||
774 | -- info-hash. | ||
775 | -- | ||
776 | data ExtendedMetadata | ||
777 | -- | This message requests the a specified metadata piece. The | ||
778 | -- response to this message, from a peer supporting the extension, | ||
779 | -- is either a 'MetadataReject' or a 'MetadataData' message. | ||
780 | = MetadataRequest PieceIx | ||
781 | |||
782 | -- | If sender requested a valid 'PieceIx' and receiver have the | ||
783 | -- corresponding piece then receiver should respond with this | ||
784 | -- message. | ||
785 | | MetadataData | ||
786 | { -- | A piece of 'Data.Torrent.InfoDict'. | ||
787 | piece :: P.Piece BS.ByteString | ||
788 | |||
789 | -- | This key has the same semantics as the 'ehsMetadataSize' in | ||
790 | -- the 'ExtendedHandshake' — it is size of the torrent info | ||
791 | -- dict. | ||
792 | , totalSize :: Int | ||
793 | } | ||
794 | |||
795 | -- | Peers that do not have the entire metadata MUST respond with | ||
796 | -- a reject message to any metadata request. | ||
797 | -- | ||
798 | -- Clients MAY implement flood protection by rejecting request | ||
799 | -- messages after a certain number of them have been | ||
800 | -- served. Typically the number of pieces of metadata times a | ||
801 | -- factor. | ||
802 | | MetadataReject PieceIx | ||
803 | |||
804 | -- | Reserved. By specification we should ignore unknown metadata | ||
805 | -- messages. | ||
806 | | MetadataUnknown BValue | ||
807 | deriving (Show, Eq, Typeable) | ||
808 | |||
809 | -- | Extended metadata message id used in 'msg_type_key'. | ||
810 | type MetadataId = Int | ||
811 | |||
812 | msg_type_key, piece_key, total_size_key :: BKey | ||
813 | msg_type_key = "msg_type" | ||
814 | piece_key = "piece" | ||
815 | total_size_key = "total_size" | ||
816 | |||
817 | -- | BEP9 compatible encoding. | ||
818 | instance BEncode ExtendedMetadata where | ||
819 | toBEncode (MetadataRequest pix) = toDict $ | ||
820 | msg_type_key .=! (0 :: MetadataId) | ||
821 | .: piece_key .=! pix | ||
822 | .: endDict | ||
823 | toBEncode (MetadataData (P.Piece pix _) totalSize) = toDict $ | ||
824 | msg_type_key .=! (1 :: MetadataId) | ||
825 | .: piece_key .=! pix | ||
826 | .: total_size_key .=! totalSize | ||
827 | .: endDict | ||
828 | toBEncode (MetadataReject pix) = toDict $ | ||
829 | msg_type_key .=! (2 :: MetadataId) | ||
830 | .: piece_key .=! pix | ||
831 | .: endDict | ||
832 | toBEncode (MetadataUnknown bval) = bval | ||
833 | |||
834 | fromBEncode bval = (`fromDict` bval) $ do | ||
835 | mid <- field $ req msg_type_key | ||
836 | case mid :: MetadataId of | ||
837 | 0 -> MetadataRequest <$>! piece_key | ||
838 | 1 -> metadataData <$>! piece_key <*>! total_size_key | ||
839 | 2 -> MetadataReject <$>! piece_key | ||
840 | _ -> pure (MetadataUnknown bval) | ||
841 | where | ||
842 | metadataData pix s = MetadataData (P.Piece pix BS.empty) s | ||
843 | |||
844 | -- | Piece data bytes are omitted. | ||
845 | instance Pretty ExtendedMetadata where | ||
846 | pPrint (MetadataRequest pix ) = "Request" <+> PP.int pix | ||
847 | pPrint (MetadataData p t) = "Data" <+> pPrint p <+> PP.int t | ||
848 | pPrint (MetadataReject pix ) = "Reject" <+> PP.int pix | ||
849 | pPrint (MetadataUnknown bval ) = "Unknown" <+> ppBEncode bval | ||
850 | |||
851 | -- | NOTE: Approximated 'stats'. | ||
852 | instance PeerMessage ExtendedMetadata where | ||
853 | envelop c = envelop c . EMetadata (remoteMessageId ExtMetadata c) | ||
854 | {-# INLINE envelop #-} | ||
855 | |||
856 | requires _ = Just ExtExtended | ||
857 | {-# INLINE requires #-} | ||
858 | |||
859 | stats (MetadataRequest _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0 | ||
860 | stats (MetadataData p _) = ByteStats (4 + 1 + 1) {- ~ -} 41 $ | ||
861 | BS.length (P.pieceData p) | ||
862 | stats (MetadataReject _) = ByteStats (4 + 1 + 1) {- ~ -} 25 0 | ||
863 | stats (MetadataUnknown _) = ByteStats (4 + 1 + 1) {- ? -} 0 0 | ||
864 | |||
865 | -- | All 'Piece's in 'MetadataData' messages MUST have size equal to | ||
866 | -- this value. The last trailing piece can be shorter. | ||
867 | metadataPieceSize :: PieceSize | ||
868 | metadataPieceSize = 16 * 1024 | ||
869 | |||
870 | isLastPiece :: P.Piece a -> Int -> Bool | ||
871 | isLastPiece P.Piece {..} total = succ pieceIndex == pcnt | ||
872 | where | ||
873 | pcnt = q + if r > 0 then 1 else 0 | ||
874 | (q, r) = quotRem total metadataPieceSize | ||
875 | |||
876 | -- TODO we can check if the piece payload bytestring have appropriate | ||
877 | -- length; otherwise serialization MUST fail. | ||
878 | isValidPiece :: P.Piece BL.ByteString -> Int -> Bool | ||
879 | isValidPiece p @ P.Piece {..} total | ||
880 | | isLastPiece p total = pieceSize p <= metadataPieceSize | ||
881 | | otherwise = pieceSize p == metadataPieceSize | ||
882 | |||
883 | setMetadataPayload :: BS.ByteString -> ExtendedMetadata -> ExtendedMetadata | ||
884 | setMetadataPayload bs (MetadataData (P.Piece pix _) t) = | ||
885 | MetadataData (P.Piece pix bs) t | ||
886 | setMetadataPayload _ msg = msg | ||
887 | |||
888 | getMetadataPayload :: ExtendedMetadata -> Maybe BS.ByteString | ||
889 | getMetadataPayload (MetadataData (P.Piece _ bs) _) = Just bs | ||
890 | getMetadataPayload _ = Nothing | ||
891 | |||
892 | -- | Metadata BDict usually contain only 'msg_type_key', 'piece_key' | ||
893 | -- and 'total_size_key' fields so it normally should take less than | ||
894 | -- 100 bytes. This limit is two order of magnitude larger to be | ||
895 | -- permissive to 'MetadataUnknown' messages. | ||
896 | -- | ||
897 | -- See 'maxMessageSize' for further explanation. | ||
898 | -- | ||
899 | maxMetadataBDictSize :: Int | ||
900 | maxMetadataBDictSize = 16 * 1024 | ||
901 | |||
902 | maxMetadataSize :: Int | ||
903 | maxMetadataSize = maxMetadataBDictSize + metadataPieceSize | ||
904 | |||
905 | -- to make MetadataData constructor fields a little bit prettier we | ||
906 | -- cheat here: first we read empty 'pieceData' from bdict, but then we | ||
907 | -- fill that field with the actual piece data — trailing bytes of | ||
908 | -- the message | ||
909 | getMetadata :: Int -> S.Get ExtendedMetadata | ||
910 | getMetadata len | ||
911 | | len > maxMetadataSize = fail $ parseError "size exceeded limit" | ||
912 | | otherwise = do | ||
913 | bs <- getByteString len | ||
914 | parseRes $ BS.parse BE.parser bs | ||
915 | where | ||
916 | parseError reason = "unable to parse metadata message: " ++ reason | ||
917 | |||
918 | parseRes (BS.Fail _ _ m) = fail $ parseError $ "bdict: " ++ m | ||
919 | parseRes (BS.Partial _) = fail $ parseError "bdict: not enough bytes" | ||
920 | parseRes (BS.Done piece bvalueBS) | ||
921 | | BS.length piece > metadataPieceSize | ||
922 | = fail "infodict piece: size exceeded limit" | ||
923 | | otherwise = do | ||
924 | metadata <- either (fail . parseError) pure $ fromBEncode bvalueBS | ||
925 | return $ setMetadataPayload piece metadata | ||
926 | |||
927 | putMetadata :: ExtendedMetadata -> BL.ByteString | ||
928 | putMetadata msg | ||
929 | | Just bs <- getMetadataPayload msg = BE.encode msg <> BL.fromStrict bs | ||
930 | | otherwise = BE.encode msg | ||
931 | |||
932 | -- | Allows a requesting peer to send 2 'MetadataRequest's for the | ||
933 | -- each piece. | ||
934 | -- | ||
935 | -- See 'Network.BitTorrent.Wire.Options.metadataFactor' for | ||
936 | -- explanation why do we need this limit. | ||
937 | defaultMetadataFactor :: Int | ||
938 | defaultMetadataFactor = 2 | ||
939 | |||
940 | -- | Usually torrent size do not exceed 1MB. This value limit torrent | ||
941 | -- /content/ size to about 8TB. | ||
942 | -- | ||
943 | -- See 'Network.BitTorrent.Wire.Options.maxInfoDictSize' for | ||
944 | -- explanation why do we need this limit. | ||
945 | defaultMaxInfoDictSize :: Int | ||
946 | defaultMaxInfoDictSize = 10 * 1024 * 1024 | ||
947 | |||
948 | {----------------------------------------------------------------------- | ||
949 | -- Extension protocol messages | ||
950 | -----------------------------------------------------------------------} | ||
951 | |||
952 | -- | For more info see <http://www.bittorrent.org/beps/bep_0010.html> | ||
953 | data ExtendedMessage | ||
954 | = EHandshake ExtendedHandshake | ||
955 | | EMetadata ExtendedMessageId ExtendedMetadata | ||
956 | | EUnknown ExtendedMessageId BS.ByteString | ||
957 | deriving (Show, Eq, Typeable) | ||
958 | |||
959 | instance Pretty ExtendedMessage where | ||
960 | pPrint (EHandshake ehs) = pPrint ehs | ||
961 | pPrint (EMetadata _ msg) = "Metadata" <+> pPrint msg | ||
962 | pPrint (EUnknown mid _ ) = "Unknown" <+> PP.text (show mid) | ||
963 | |||
964 | instance PeerMessage ExtendedMessage where | ||
965 | envelop _ = Extended | ||
966 | {-# INLINE envelop #-} | ||
967 | |||
968 | requires _ = Just ExtExtended | ||
969 | {-# INLINE requires #-} | ||
970 | |||
971 | stats (EHandshake hs) = stats hs | ||
972 | stats (EMetadata _ msg) = stats msg | ||
973 | stats (EUnknown _ msg) = ByteStats (4 + 1 + 1) (BS.length msg) 0 | ||
974 | |||
975 | {----------------------------------------------------------------------- | ||
976 | -- The message datatype | ||
977 | -----------------------------------------------------------------------} | ||
978 | |||
979 | type MessageId = Word8 | ||
980 | |||
981 | -- | Messages used in communication between peers. | ||
982 | -- | ||
983 | -- Note: If some extensions are disabled (not present in extension | ||
984 | -- mask) and client receive message used by the disabled | ||
985 | -- extension then the client MUST close the connection. | ||
986 | -- | ||
987 | data Message | ||
988 | -- | Peers may close the TCP connection if they have not received | ||
989 | -- any messages for a given period of time, generally 2 | ||
990 | -- minutes. Thus, the KeepAlive message is sent to keep the | ||
991 | -- connection between two peers alive, if no /other/ message has | ||
992 | -- been sent in a given period of time. | ||
993 | = KeepAlive | ||
994 | | Status !StatusUpdate -- ^ Messages used to update peer status. | ||
995 | | Available !Available -- ^ Messages used to inform availability. | ||
996 | | Transfer !Transfer -- ^ Messages used to transfer 'Block's. | ||
997 | |||
998 | -- | Peer receiving a handshake indicating the remote peer | ||
999 | -- supports the 'ExtDHT' should send a 'Port' message. Peers that | ||
1000 | -- receive this message should attempt to ping the node on the | ||
1001 | -- received port and IP address of the remote peer. | ||
1002 | | Port !PortNumber | ||
1003 | | Fast !FastMessage | ||
1004 | | Extended !ExtendedMessage | ||
1005 | deriving (Show, Eq) | ||
1006 | |||
1007 | instance Default Message where | ||
1008 | def = KeepAlive | ||
1009 | {-# INLINE def #-} | ||
1010 | |||
1011 | -- | Payload bytes are omitted. | ||
1012 | instance Pretty Message where | ||
1013 | pPrint (KeepAlive ) = "Keep alive" | ||
1014 | pPrint (Status m) = "Status" <+> pPrint m | ||
1015 | pPrint (Available m) = pPrint m | ||
1016 | pPrint (Transfer m) = pPrint m | ||
1017 | pPrint (Port p) = "Port" <+> int (fromEnum p) | ||
1018 | pPrint (Fast m) = pPrint m | ||
1019 | pPrint (Extended m) = pPrint m | ||
1020 | |||
1021 | instance PeerMessage Message where | ||
1022 | envelop _ = id | ||
1023 | {-# INLINE envelop #-} | ||
1024 | |||
1025 | requires KeepAlive = Nothing | ||
1026 | requires (Status _) = Nothing | ||
1027 | requires (Available _) = Nothing | ||
1028 | requires (Transfer _) = Nothing | ||
1029 | requires (Port _) = Just ExtDHT | ||
1030 | requires (Fast _) = Just ExtFast | ||
1031 | requires (Extended _) = Just ExtExtended | ||
1032 | |||
1033 | stats KeepAlive = ByteStats 4 0 0 | ||
1034 | stats (Status m) = stats m | ||
1035 | stats (Available m) = stats m | ||
1036 | stats (Transfer m) = stats m | ||
1037 | stats (Port _) = ByteStats 5 2 0 | ||
1038 | stats (Fast m) = stats m | ||
1039 | stats (Extended m) = stats m | ||
1040 | |||
1041 | -- | PORT message. | ||
1042 | instance PeerMessage PortNumber where | ||
1043 | envelop _ = Port | ||
1044 | {-# INLINE envelop #-} | ||
1045 | |||
1046 | requires _ = Just ExtDHT | ||
1047 | {-# INLINE requires #-} | ||
1048 | |||
1049 | -- | How long /this/ peer should wait before dropping connection, in | ||
1050 | -- seconds. | ||
1051 | defaultKeepAliveTimeout :: Int | ||
1052 | defaultKeepAliveTimeout = 2 * 60 | ||
1053 | |||
1054 | -- | How often /this/ peer should send 'KeepAlive' messages, in | ||
1055 | -- seconds. | ||
1056 | defaultKeepAliveInterval :: Int | ||
1057 | defaultKeepAliveInterval = 60 | ||
1058 | |||
1059 | getInt :: S.Get Int | ||
1060 | getInt = fromIntegral <$> S.getWord32be | ||
1061 | {-# INLINE getInt #-} | ||
1062 | |||
1063 | putInt :: S.Putter Int | ||
1064 | putInt = S.putWord32be . fromIntegral | ||
1065 | {-# INLINE putInt #-} | ||
1066 | |||
1067 | -- | This limit should protect against "out-of-memory" attacks: if a | ||
1068 | -- malicious peer have sent a long varlength message then receiver can | ||
1069 | -- accumulate too long bytestring in the 'Get'. | ||
1070 | -- | ||
1071 | -- Normal messages should never exceed this limits. | ||
1072 | -- | ||
1073 | -- See also 'maxBitfieldSize', 'maxBlockSize' limits. | ||
1074 | -- | ||
1075 | maxMessageSize :: Int | ||
1076 | maxMessageSize = 20 + 1024 * 1024 | ||
1077 | |||
1078 | -- | This also limit max torrent size to: | ||
1079 | -- | ||
1080 | -- max_bitfield_size * piece_ix_per_byte * max_piece_size = | ||
1081 | -- 2 ^ 20 * 8 * 1MB = | ||
1082 | -- 8TB | ||
1083 | -- | ||
1084 | maxBitfieldSize :: Int | ||
1085 | maxBitfieldSize = 1024 * 1024 | ||
1086 | |||
1087 | getBitfield :: Int -> S.Get Bitfield | ||
1088 | getBitfield len | ||
1089 | | len > maxBitfieldSize = fail "BITFIELD message size exceeded limit" | ||
1090 | | otherwise = fromBitmap <$> getByteString len | ||
1091 | |||
1092 | maxBlockSize :: Int | ||
1093 | maxBlockSize = 4 * defaultTransferSize | ||
1094 | |||
1095 | getBlock :: Int -> S.Get (Block BL.ByteString) | ||
1096 | getBlock len | ||
1097 | | len > maxBlockSize = fail "BLOCK message size exceeded limit" | ||
1098 | | otherwise = Block <$> getInt <*> getInt | ||
1099 | <*> getLazyByteString (fromIntegral len) | ||
1100 | {-# INLINE getBlock #-} | ||
1101 | |||
1102 | instance Serialize Message where | ||
1103 | get = do | ||
1104 | len <- getInt | ||
1105 | |||
1106 | when (len > maxMessageSize) $ do | ||
1107 | fail "message body size exceeded the limit" | ||
1108 | |||
1109 | if len == 0 then return KeepAlive | ||
1110 | else do | ||
1111 | mid <- S.getWord8 | ||
1112 | case mid of | ||
1113 | 0x00 -> return $ Status (Choking True) | ||
1114 | 0x01 -> return $ Status (Choking False) | ||
1115 | 0x02 -> return $ Status (Interested True) | ||
1116 | 0x03 -> return $ Status (Interested False) | ||
1117 | 0x04 -> (Available . Have) <$> getInt | ||
1118 | 0x05 -> (Available . Bitfield) <$> getBitfield (pred len) | ||
1119 | 0x06 -> (Transfer . Request) <$> S.get | ||
1120 | 0x07 -> (Transfer . Piece) <$> getBlock (len - 9) | ||
1121 | 0x08 -> (Transfer . Cancel) <$> S.get | ||
1122 | 0x09 -> Port <$> S.get | ||
1123 | 0x0D -> (Fast . SuggestPiece) <$> getInt | ||
1124 | 0x0E -> return $ Fast HaveAll | ||
1125 | 0x0F -> return $ Fast HaveNone | ||
1126 | 0x10 -> (Fast . RejectRequest) <$> S.get | ||
1127 | 0x11 -> (Fast . AllowedFast) <$> getInt | ||
1128 | 0x14 -> Extended <$> getExtendedMessage (pred len) | ||
1129 | _ -> do | ||
1130 | rm <- S.remaining >>= S.getBytes | ||
1131 | fail $ "unknown message ID: " ++ show mid ++ "\n" | ||
1132 | ++ "remaining available bytes: " ++ show rm | ||
1133 | |||
1134 | put KeepAlive = putInt 0 | ||
1135 | put (Status msg) = putStatus msg | ||
1136 | put (Available msg) = putAvailable msg | ||
1137 | put (Transfer msg) = putTransfer msg | ||
1138 | put (Port p ) = putPort p | ||
1139 | put (Fast msg) = putFast msg | ||
1140 | put (Extended m ) = putExtendedMessage m | ||
1141 | |||
1142 | statusUpdateId :: StatusUpdate -> MessageId | ||
1143 | statusUpdateId (Choking choking) = fromIntegral (0 + fromEnum choking) | ||
1144 | statusUpdateId (Interested choking) = fromIntegral (2 + fromEnum choking) | ||
1145 | |||
1146 | putStatus :: Putter StatusUpdate | ||
1147 | putStatus su = do | ||
1148 | putInt 1 | ||
1149 | putWord8 (statusUpdateId su) | ||
1150 | |||
1151 | putAvailable :: Putter Available | ||
1152 | putAvailable (Have i) = do | ||
1153 | putInt 5 | ||
1154 | putWord8 0x04 | ||
1155 | putInt i | ||
1156 | putAvailable (Bitfield (toBitmap -> bs)) = do | ||
1157 | putInt $ 1 + fromIntegral (BL.length bs) | ||
1158 | putWord8 0x05 | ||
1159 | putLazyByteString bs | ||
1160 | |||
1161 | putBlock :: Putter (Block BL.ByteString) | ||
1162 | putBlock Block {..} = do | ||
1163 | putInt blkPiece | ||
1164 | putInt blkOffset | ||
1165 | putLazyByteString blkData | ||
1166 | |||
1167 | putTransfer :: Putter Transfer | ||
1168 | putTransfer (Request blk) = putInt 13 >> S.putWord8 0x06 >> S.put blk | ||
1169 | putTransfer (Piece blk) = do | ||
1170 | putInt (9 + blockSize blk) | ||
1171 | putWord8 0x07 | ||
1172 | putBlock blk | ||
1173 | putTransfer (Cancel blk) = putInt 13 >> S.putWord8 0x08 >> S.put blk | ||
1174 | |||
1175 | putPort :: Putter PortNumber | ||
1176 | putPort p = do | ||
1177 | putInt 3 | ||
1178 | putWord8 0x09 | ||
1179 | put p | ||
1180 | |||
1181 | putFast :: Putter FastMessage | ||
1182 | putFast HaveAll = putInt 1 >> putWord8 0x0E | ||
1183 | putFast HaveNone = putInt 1 >> putWord8 0x0F | ||
1184 | putFast (SuggestPiece pix) = putInt 5 >> putWord8 0x0D >> putInt pix | ||
1185 | putFast (RejectRequest i ) = putInt 13 >> putWord8 0x10 >> put i | ||
1186 | putFast (AllowedFast i ) = putInt 5 >> putWord8 0x11 >> putInt i | ||
1187 | |||
1188 | maxEHandshakeSize :: Int | ||
1189 | maxEHandshakeSize = 16 * 1024 | ||
1190 | |||
1191 | getExtendedHandshake :: Int -> S.Get ExtendedHandshake | ||
1192 | getExtendedHandshake messageSize | ||
1193 | | messageSize > maxEHandshakeSize | ||
1194 | = fail "extended handshake size exceeded limit" | ||
1195 | | otherwise = do | ||
1196 | bs <- getByteString messageSize | ||
1197 | either fail pure $ BE.decode bs | ||
1198 | |||
1199 | maxEUnknownSize :: Int | ||
1200 | maxEUnknownSize = 64 * 1024 | ||
1201 | |||
1202 | getExtendedUnknown :: Int -> S.Get BS.ByteString | ||
1203 | getExtendedUnknown len | ||
1204 | | len > maxEUnknownSize = fail "unknown extended message size exceeded limit" | ||
1205 | | otherwise = getByteString len | ||
1206 | |||
1207 | getExtendedMessage :: Int -> S.Get ExtendedMessage | ||
1208 | getExtendedMessage messageSize = do | ||
1209 | msgId <- getWord8 | ||
1210 | let msgBodySize = messageSize - 1 | ||
1211 | case msgId of | ||
1212 | 0 -> EHandshake <$> getExtendedHandshake msgBodySize | ||
1213 | 1 -> EMetadata msgId <$> getMetadata msgBodySize | ||
1214 | _ -> EUnknown msgId <$> getExtendedUnknown msgBodySize | ||
1215 | |||
1216 | -- | By spec. | ||
1217 | extendedMessageId :: MessageId | ||
1218 | extendedMessageId = 20 | ||
1219 | |||
1220 | putExt :: ExtendedMessageId -> BL.ByteString -> Put | ||
1221 | putExt mid lbs = do | ||
1222 | putWord32be $ fromIntegral (1 + 1 + BL.length lbs) | ||
1223 | putWord8 extendedMessageId | ||
1224 | putWord8 mid | ||
1225 | putLazyByteString lbs | ||
1226 | |||
1227 | -- NOTE: in contrast to getExtendedMessage this function put length | ||
1228 | -- and message id too! | ||
1229 | putExtendedMessage :: Putter ExtendedMessage | ||
1230 | putExtendedMessage (EHandshake hs) = putExt extHandshakeId $ BE.encode hs | ||
1231 | putExtendedMessage (EMetadata mid msg) = putExt mid $ putMetadata msg | ||
1232 | putExtendedMessage (EUnknown mid bs) = putExt mid $ BL.fromStrict bs | ||
diff --git a/bittorrent/src/Network/BitTorrent/Exchange/Session.hs b/bittorrent/src/Network/BitTorrent/Exchange/Session.hs new file mode 100644 index 00000000..38a3c3a6 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -0,0 +1,586 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE DeriveDataTypeable #-} | ||
3 | {-# LANGUAGE FlexibleInstances #-} | ||
4 | {-# LANGUAGE StandaloneDeriving #-} | ||
5 | {-# LANGUAGE TemplateHaskell #-} | ||
6 | {-# LANGUAGE TypeFamilies #-} | ||
7 | module Network.BitTorrent.Exchange.Session | ||
8 | ( -- * Session | ||
9 | Session | ||
10 | , Event (..) | ||
11 | , LogFun | ||
12 | , sessionLogger | ||
13 | |||
14 | -- * Construction | ||
15 | , newSession | ||
16 | , closeSession | ||
17 | , withSession | ||
18 | |||
19 | -- * Connection Set | ||
20 | , connect | ||
21 | , connectSink | ||
22 | , establish | ||
23 | |||
24 | -- * Query | ||
25 | , waitMetadata | ||
26 | , takeMetadata | ||
27 | ) where | ||
28 | |||
29 | import Control.Applicative | ||
30 | import Control.Concurrent | ||
31 | import Control.Concurrent.Chan.Split as CS | ||
32 | import Control.Concurrent.STM | ||
33 | import Control.Exception hiding (Handler) | ||
34 | import Control.Lens | ||
35 | import Control.Monad as M | ||
36 | import Control.Monad.Logger | ||
37 | import Control.Monad.Reader | ||
38 | import Data.ByteString as BS | ||
39 | import Data.ByteString.Lazy as BL | ||
40 | import Data.Conduit as C (Sink, awaitForever, (=$=), ($=)) | ||
41 | import qualified Data.Conduit as C | ||
42 | import Data.Conduit.List as C | ||
43 | import Data.Map as M | ||
44 | import Data.Monoid | ||
45 | import Data.Set as S | ||
46 | import Data.Text as T | ||
47 | import Data.Typeable | ||
48 | import Text.PrettyPrint hiding ((<>)) | ||
49 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
50 | import System.Log.FastLogger (LogStr, ToLogStr (..)) | ||
51 | |||
52 | import Data.BEncode as BE | ||
53 | import Data.Torrent as Torrent | ||
54 | import Network.BitTorrent.Internal.Types | ||
55 | import Network.Address | ||
56 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
57 | import Network.BitTorrent.Exchange.Block as Block | ||
58 | import Network.BitTorrent.Exchange.Connection | ||
59 | import Network.BitTorrent.Exchange.Download as D | ||
60 | import Network.BitTorrent.Exchange.Message as Message | ||
61 | import System.Torrent.Storage | ||
62 | |||
63 | #if !MIN_VERSION_iproute(1,2,12) | ||
64 | deriving instance Ord IP | ||
65 | #endif | ||
66 | |||
67 | {----------------------------------------------------------------------- | ||
68 | -- Exceptions | ||
69 | -----------------------------------------------------------------------} | ||
70 | |||
71 | data ExchangeError | ||
72 | = InvalidRequest BlockIx StorageFailure | ||
73 | | CorruptedPiece PieceIx | ||
74 | deriving (Show, Typeable) | ||
75 | |||
76 | instance Exception ExchangeError | ||
77 | |||
78 | packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a | ||
79 | packException f m = try m >>= either (throwIO . f) return | ||
80 | |||
81 | {----------------------------------------------------------------------- | ||
82 | -- Session state | ||
83 | -----------------------------------------------------------------------} | ||
84 | -- TODO unmap storage on zero connections | ||
85 | |||
86 | data Cached a = Cached | ||
87 | { cachedValue :: !a | ||
88 | , cachedData :: BL.ByteString -- keep lazy | ||
89 | } | ||
90 | |||
91 | cache :: BEncode a => a -> Cached a | ||
92 | cache s = Cached s (BE.encode s) | ||
93 | |||
94 | -- | Logger function. | ||
95 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
96 | |||
97 | --data SessionStatus = Seeder | Leecher | ||
98 | |||
99 | data SessionState | ||
100 | = WaitingMetadata | ||
101 | { metadataDownload :: MVar MetadataDownload | ||
102 | , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters | ||
103 | , contentRootPath :: FilePath | ||
104 | } | ||
105 | | HavingMetadata | ||
106 | { metadataCache :: Cached InfoDict | ||
107 | , contentDownload :: MVar ContentDownload | ||
108 | , contentStorage :: Storage | ||
109 | } | ||
110 | |||
111 | newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState | ||
112 | newSessionState rootPath (Left ih ) = do | ||
113 | WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath | ||
114 | newSessionState rootPath (Right dict) = do | ||
115 | storage <- openInfoDict ReadWriteEx rootPath dict | ||
116 | download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) | ||
117 | (piPieceLength (idPieceInfo dict)) | ||
118 | storage | ||
119 | return $ HavingMetadata (cache dict) download storage | ||
120 | |||
121 | closeSessionState :: SessionState -> IO () | ||
122 | closeSessionState WaitingMetadata {..} = return () | ||
123 | closeSessionState HavingMetadata {..} = close contentStorage | ||
124 | |||
125 | haveMetadata :: InfoDict -> SessionState -> IO SessionState | ||
126 | haveMetadata dict WaitingMetadata {..} = do | ||
127 | storage <- openInfoDict ReadWriteEx contentRootPath dict | ||
128 | download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) | ||
129 | (piPieceLength (idPieceInfo dict)) | ||
130 | storage | ||
131 | return HavingMetadata | ||
132 | { metadataCache = cache dict | ||
133 | , contentDownload = download | ||
134 | , contentStorage = storage | ||
135 | } | ||
136 | haveMetadata _ s = return s | ||
137 | |||
138 | {----------------------------------------------------------------------- | ||
139 | -- Session | ||
140 | -----------------------------------------------------------------------} | ||
141 | |||
142 | data Session = Session | ||
143 | { sessionPeerId :: !(PeerId) | ||
144 | , sessionTopic :: !(InfoHash) | ||
145 | , sessionLogger :: !(LogFun) | ||
146 | , sessionEvents :: !(SendPort (Event Session)) | ||
147 | |||
148 | , sessionState :: !(MVar SessionState) | ||
149 | |||
150 | ------------------------------------------------------------------------ | ||
151 | , connectionsPrefs :: !ConnectionPrefs | ||
152 | |||
153 | -- | Connections either waiting for TCP/uTP 'connect' or waiting | ||
154 | -- for BT handshake. | ||
155 | , connectionsPending :: !(TVar (Set (PeerAddr IP))) | ||
156 | |||
157 | -- | Connections successfully handshaked and data transfer can | ||
158 | -- take place. | ||
159 | , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) | ||
160 | |||
161 | -- | TODO implement choking mechanism | ||
162 | , connectionsUnchoked :: [PeerAddr IP] | ||
163 | |||
164 | -- | Messages written to this channel will be sent to the all | ||
165 | -- connections, including pending connections (but right after | ||
166 | -- handshake). | ||
167 | , connectionsBroadcast :: !(Chan Message) | ||
168 | } | ||
169 | |||
170 | instance EventSource Session where | ||
171 | data Event Session | ||
172 | = ConnectingTo (PeerAddr IP) | ||
173 | | ConnectionEstablished (PeerAddr IP) | ||
174 | | ConnectionAborted | ||
175 | | ConnectionClosed (PeerAddr IP) | ||
176 | | SessionClosed | ||
177 | deriving Show | ||
178 | |||
179 | listen Session {..} = CS.listen sessionEvents | ||
180 | |||
181 | newSession :: LogFun | ||
182 | -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | ||
183 | -> FilePath -- ^ root directory for content files; | ||
184 | -> Either InfoHash InfoDict -- ^ torrent info dictionary; | ||
185 | -> IO Session | ||
186 | newSession logFun addr rootPath source = do | ||
187 | let ih = either id idInfoHash source | ||
188 | pid <- maybe genPeerId return (peerId addr) | ||
189 | eventStream <- newSendPort | ||
190 | sState <- newSessionState rootPath source | ||
191 | sStateVar <- newMVar sState | ||
192 | pSetVar <- newTVarIO S.empty | ||
193 | eSetVar <- newTVarIO M.empty | ||
194 | chan <- newChan | ||
195 | return Session | ||
196 | { sessionPeerId = pid | ||
197 | , sessionTopic = ih | ||
198 | , sessionLogger = logFun | ||
199 | , sessionEvents = eventStream | ||
200 | , sessionState = sStateVar | ||
201 | , connectionsPrefs = def | ||
202 | , connectionsPending = pSetVar | ||
203 | , connectionsEstablished = eSetVar | ||
204 | , connectionsUnchoked = [] | ||
205 | , connectionsBroadcast = chan | ||
206 | } | ||
207 | |||
208 | closeSession :: Session -> IO () | ||
209 | closeSession Session {..} = do | ||
210 | s <- readMVar sessionState | ||
211 | closeSessionState s | ||
212 | {- | ||
213 | hSet <- atomically $ do | ||
214 | pSet <- swapTVar connectionsPending S.empty | ||
215 | eSet <- swapTVar connectionsEstablished S.empty | ||
216 | return pSet | ||
217 | mapM_ kill hSet | ||
218 | -} | ||
219 | |||
220 | withSession :: () | ||
221 | withSession = error "withSession" | ||
222 | |||
223 | {----------------------------------------------------------------------- | ||
224 | -- Logging | ||
225 | -----------------------------------------------------------------------} | ||
226 | |||
227 | instance MonadLogger (Connected Session) where | ||
228 | monadLoggerLog loc src lvl msg = do | ||
229 | conn <- ask | ||
230 | ses <- asks connSession | ||
231 | addr <- asks connRemoteAddr | ||
232 | let addrSrc = src <> " @ " <> T.pack (render (pPrint addr)) | ||
233 | liftIO $ sessionLogger ses loc addrSrc lvl (toLogStr msg) | ||
234 | |||
235 | logMessage :: MonadLogger m => Message -> m () | ||
236 | logMessage msg = logDebugN $ T.pack (render (pPrint msg)) | ||
237 | |||
238 | logEvent :: MonadLogger m => Text -> m () | ||
239 | logEvent = logInfoN | ||
240 | |||
241 | {----------------------------------------------------------------------- | ||
242 | -- Connection set | ||
243 | -----------------------------------------------------------------------} | ||
244 | --- Connection status transition: | ||
245 | --- | ||
246 | --- pending -> established -> finished -> closed | ||
247 | --- | \|/ /|\ | ||
248 | --- \-------------------------------------| | ||
249 | --- | ||
250 | --- Purpose of slots: | ||
251 | --- 1) to avoid duplicates | ||
252 | --- 2) connect concurrently | ||
253 | --- | ||
254 | |||
255 | -- | Add connection to the pending set. | ||
256 | pendingConnection :: PeerAddr IP -> Session -> STM Bool | ||
257 | pendingConnection addr Session {..} = do | ||
258 | pSet <- readTVar connectionsPending | ||
259 | eSet <- readTVar connectionsEstablished | ||
260 | if (addr `S.member` pSet) || (addr `M.member` eSet) | ||
261 | then return False | ||
262 | else do | ||
263 | modifyTVar' connectionsPending (S.insert addr) | ||
264 | return True | ||
265 | |||
266 | -- | Pending connection successfully established, add it to the | ||
267 | -- established set. | ||
268 | establishedConnection :: Connected Session () | ||
269 | establishedConnection = do | ||
270 | conn <- ask | ||
271 | addr <- asks connRemoteAddr | ||
272 | Session {..} <- asks connSession | ||
273 | liftIO $ atomically $ do | ||
274 | modifyTVar connectionsPending (S.delete addr) | ||
275 | modifyTVar connectionsEstablished (M.insert addr conn) | ||
276 | |||
277 | -- | Either this or remote peer decided to finish conversation | ||
278 | -- (conversation is alread /established/ connection), remote it from | ||
279 | -- the established set. | ||
280 | finishedConnection :: Connected Session () | ||
281 | finishedConnection = do | ||
282 | Session {..} <- asks connSession | ||
283 | addr <- asks connRemoteAddr | ||
284 | liftIO $ atomically $ do | ||
285 | modifyTVar connectionsEstablished $ M.delete addr | ||
286 | |||
287 | -- | There are no state for this connection, remove it from the all | ||
288 | -- sets. | ||
289 | closedConnection :: PeerAddr IP -> Session -> STM () | ||
290 | closedConnection addr Session {..} = do | ||
291 | modifyTVar connectionsPending $ S.delete addr | ||
292 | modifyTVar connectionsEstablished $ M.delete addr | ||
293 | |||
294 | getConnectionConfig :: Session -> IO (ConnectionConfig Session) | ||
295 | getConnectionConfig s @ Session {..} = do | ||
296 | chan <- dupChan connectionsBroadcast | ||
297 | let sessionLink = SessionLink { | ||
298 | linkTopic = sessionTopic | ||
299 | , linkPeerId = sessionPeerId | ||
300 | , linkMetadataSize = Nothing | ||
301 | , linkOutputChan = Just chan | ||
302 | , linkSession = s | ||
303 | } | ||
304 | return ConnectionConfig | ||
305 | { cfgPrefs = connectionsPrefs | ||
306 | , cfgSession = sessionLink | ||
307 | , cfgWire = mainWire | ||
308 | } | ||
309 | |||
310 | type Finalizer = IO () | ||
311 | type Runner = (ConnectionConfig Session -> IO ()) | ||
312 | |||
313 | runConnection :: Runner -> Finalizer -> PeerAddr IP -> Session -> IO () | ||
314 | runConnection runner finalize addr set @ Session {..} = do | ||
315 | _ <- forkIO (action `finally` cleanup) | ||
316 | return () | ||
317 | where | ||
318 | action = do | ||
319 | notExist <- atomically $ pendingConnection addr set | ||
320 | when notExist $ do | ||
321 | cfg <- getConnectionConfig set | ||
322 | runner cfg | ||
323 | |||
324 | cleanup = do | ||
325 | finalize | ||
326 | -- runStatusUpdates status (SS.resetPending addr) | ||
327 | -- TODO Metata.resetPending addr | ||
328 | atomically $ closedConnection addr set | ||
329 | |||
330 | -- | Establish connection from scratch. If this endpoint is already | ||
331 | -- connected, no new connections is created. This function do not block. | ||
332 | connect :: PeerAddr IP -> Session -> IO () | ||
333 | connect addr = runConnection (connectWire addr) (return ()) addr | ||
334 | |||
335 | -- | Establish connection with already pre-connected endpoint. If this | ||
336 | -- endpoint is already connected, no new connections is created. This | ||
337 | -- function do not block. | ||
338 | -- | ||
339 | -- 'PendingConnection' will be closed automatically, you do not need | ||
340 | -- to call 'closePending'. | ||
341 | establish :: PendingConnection -> Session -> IO () | ||
342 | establish conn = runConnection (acceptWire conn) (closePending conn) | ||
343 | (pendingPeer conn) | ||
344 | |||
345 | -- | Conduit version of 'connect'. | ||
346 | connectSink :: MonadIO m => Session -> Sink [PeerAddr IPv4] m () | ||
347 | connectSink s = C.mapM_ (liftIO . connectBatch) | ||
348 | where | ||
349 | connectBatch = M.mapM_ (\ addr -> connect (IPv4 <$> addr) s) | ||
350 | |||
351 | -- | Why do we need this message? | ||
352 | type BroadcastMessage = ExtendedCaps -> Message | ||
353 | |||
354 | broadcast :: BroadcastMessage -> Session -> IO () | ||
355 | broadcast = error "broadcast" | ||
356 | |||
357 | {----------------------------------------------------------------------- | ||
358 | -- Helpers | ||
359 | -----------------------------------------------------------------------} | ||
360 | |||
361 | waitMVar :: MVar a -> IO () | ||
362 | waitMVar m = withMVar m (const (return ())) | ||
363 | |||
364 | -- This function appear in new GHC "out of box". (moreover it is atomic) | ||
365 | tryReadMVar :: MVar a -> IO (Maybe a) | ||
366 | tryReadMVar m = do | ||
367 | ma <- tryTakeMVar m | ||
368 | maybe (return ()) (putMVar m) ma | ||
369 | return ma | ||
370 | |||
371 | readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) | ||
372 | readBlock bix @ BlockIx {..} s = do | ||
373 | p <- packException (InvalidRequest bix) $ do readPiece ixPiece s | ||
374 | let chunk = BL.take (fromIntegral ixLength) $ | ||
375 | BL.drop (fromIntegral ixOffset) (pieceData p) | ||
376 | if BL.length chunk == fromIntegral ixLength | ||
377 | then return $ Block ixPiece ixOffset chunk | ||
378 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | ||
379 | |||
380 | -- | | ||
381 | tryReadMetadataBlock :: PieceIx | ||
382 | -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) | ||
383 | tryReadMetadataBlock pix = do | ||
384 | Session {..} <- asks connSession | ||
385 | s <- liftIO (readMVar sessionState) | ||
386 | case s of | ||
387 | WaitingMetadata {..} -> error "tryReadMetadataBlock" | ||
388 | HavingMetadata {..} -> error "tryReadMetadataBlock" | ||
389 | |||
390 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () | ||
391 | sendBroadcast msg = do | ||
392 | Session {..} <- asks connSession | ||
393 | error "sendBroadcast" | ||
394 | -- liftIO $ msg `broadcast` sessionConnections | ||
395 | |||
396 | waitMetadata :: Session -> IO InfoDict | ||
397 | waitMetadata Session {..} = do | ||
398 | s <- readMVar sessionState | ||
399 | case s of | ||
400 | WaitingMetadata {..} -> readMVar metadataCompleted | ||
401 | HavingMetadata {..} -> return (cachedValue metadataCache) | ||
402 | |||
403 | takeMetadata :: Session -> IO (Maybe InfoDict) | ||
404 | takeMetadata Session {..} = do | ||
405 | s <- readMVar sessionState | ||
406 | case s of | ||
407 | WaitingMetadata {..} -> return Nothing | ||
408 | HavingMetadata {..} -> return (Just (cachedValue metadataCache)) | ||
409 | |||
410 | {----------------------------------------------------------------------- | ||
411 | -- Triggers | ||
412 | -----------------------------------------------------------------------} | ||
413 | |||
414 | -- | Trigger is the reaction of a handler at some event. | ||
415 | type Trigger = Wire Session () | ||
416 | |||
417 | interesting :: Trigger | ||
418 | interesting = do | ||
419 | addr <- asks connRemoteAddr | ||
420 | sendMessage (Interested True) | ||
421 | sendMessage (Choking False) | ||
422 | tryFillRequestQueue | ||
423 | |||
424 | fillRequestQueue :: Trigger | ||
425 | fillRequestQueue = do | ||
426 | maxN <- lift getMaxQueueLength | ||
427 | rbf <- use connBitfield | ||
428 | addr <- asks connRemoteAddr | ||
429 | -- blks <- withStatusUpdates $ do | ||
430 | -- n <- getRequestQueueLength addr | ||
431 | -- scheduleBlocks addr rbf (maxN - n) | ||
432 | -- mapM_ (sendMessage . Request) blks | ||
433 | return () | ||
434 | |||
435 | tryFillRequestQueue :: Trigger | ||
436 | tryFillRequestQueue = do | ||
437 | allowed <- canDownload <$> use connStatus | ||
438 | when allowed $ do | ||
439 | fillRequestQueue | ||
440 | |||
441 | {----------------------------------------------------------------------- | ||
442 | -- Incoming message handling | ||
443 | -----------------------------------------------------------------------} | ||
444 | |||
445 | type Handler msg = msg -> Wire Session () | ||
446 | |||
447 | handleStatus :: Handler StatusUpdate | ||
448 | handleStatus s = do | ||
449 | connStatus %= over remoteStatus (updateStatus s) | ||
450 | case s of | ||
451 | Interested _ -> return () | ||
452 | Choking True -> do | ||
453 | addr <- asks connRemoteAddr | ||
454 | -- withStatusUpdates (SS.resetPending addr) | ||
455 | return () | ||
456 | Choking False -> tryFillRequestQueue | ||
457 | |||
458 | handleAvailable :: Handler Available | ||
459 | handleAvailable msg = do | ||
460 | connBitfield %= case msg of | ||
461 | Have ix -> BF.insert ix | ||
462 | Bitfield bf -> const bf | ||
463 | |||
464 | --thisBf <- getThisBitfield | ||
465 | thisBf <- undefined | ||
466 | case msg of | ||
467 | Have ix | ||
468 | | ix `BF.member` thisBf -> return () | ||
469 | | otherwise -> interesting | ||
470 | Bitfield bf | ||
471 | | bf `BF.isSubsetOf` thisBf -> return () | ||
472 | | otherwise -> interesting | ||
473 | |||
474 | handleTransfer :: Handler Transfer | ||
475 | handleTransfer (Request bix) = do | ||
476 | Session {..} <- asks connSession | ||
477 | s <- liftIO $ readMVar sessionState | ||
478 | case s of | ||
479 | WaitingMetadata {..} -> return () | ||
480 | HavingMetadata {..} -> do | ||
481 | bitfield <- undefined -- getThisBitfield | ||
482 | upload <- canUpload <$> use connStatus | ||
483 | when (upload && ixPiece bix `BF.member` bitfield) $ do | ||
484 | blk <- liftIO $ readBlock bix contentStorage | ||
485 | sendMessage (Message.Piece blk) | ||
486 | |||
487 | handleTransfer (Message.Piece blk) = do | ||
488 | Session {..} <- asks connSession | ||
489 | s <- liftIO $ readMVar sessionState | ||
490 | case s of | ||
491 | WaitingMetadata {..} -> return () -- TODO (?) break connection | ||
492 | HavingMetadata {..} -> do | ||
493 | isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage) | ||
494 | case isSuccess of | ||
495 | Nothing -> liftIO $ throwIO $ userError "block is not requested" | ||
496 | Just isCompleted -> do | ||
497 | when isCompleted $ do | ||
498 | sendBroadcast (Have (blkPiece blk)) | ||
499 | -- maybe send not interested | ||
500 | tryFillRequestQueue | ||
501 | |||
502 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) | ||
503 | where | ||
504 | transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix | ||
505 | transferResponse _ _ = False | ||
506 | |||
507 | {----------------------------------------------------------------------- | ||
508 | -- Metadata exchange | ||
509 | -----------------------------------------------------------------------} | ||
510 | -- TODO introduce new metadata exchange specific exceptions | ||
511 | |||
512 | waitForMetadata :: Trigger | ||
513 | waitForMetadata = do | ||
514 | Session {..} <- asks connSession | ||
515 | needFetch <- undefined --liftIO (isEmptyMVar infodict) | ||
516 | when needFetch $ do | ||
517 | canFetch <- allowed ExtMetadata <$> use connExtCaps | ||
518 | if canFetch | ||
519 | then tryRequestMetadataBlock | ||
520 | else undefined -- liftIO (waitMVar infodict) | ||
521 | |||
522 | tryRequestMetadataBlock :: Trigger | ||
523 | tryRequestMetadataBlock = do | ||
524 | mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock | ||
525 | case mpix of | ||
526 | Nothing -> error "tryRequestMetadataBlock" | ||
527 | Just pix -> sendMessage (MetadataRequest pix) | ||
528 | |||
529 | handleMetadata :: Handler ExtendedMetadata | ||
530 | handleMetadata (MetadataRequest pix) = | ||
531 | lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse | ||
532 | where | ||
533 | mkResponse Nothing = MetadataReject pix | ||
534 | mkResponse (Just (piece, total)) = MetadataData piece total | ||
535 | |||
536 | handleMetadata (MetadataData {..}) = do | ||
537 | ih <- asks connTopic | ||
538 | mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih) | ||
539 | case mdict of | ||
540 | Nothing -> tryRequestMetadataBlock -- not completed, need all blocks | ||
541 | Just dict -> do -- complete, wake up payload fetch | ||
542 | Session {..} <- asks connSession | ||
543 | liftIO $ modifyMVar_ sessionState (haveMetadata dict) | ||
544 | |||
545 | handleMetadata (MetadataReject pix) = do | ||
546 | lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix) | ||
547 | |||
548 | handleMetadata (MetadataUnknown _ ) = do | ||
549 | logInfoN "Unknown metadata message" | ||
550 | |||
551 | {----------------------------------------------------------------------- | ||
552 | -- Main entry point | ||
553 | -----------------------------------------------------------------------} | ||
554 | |||
555 | acceptRehandshake :: ExtendedHandshake -> Trigger | ||
556 | acceptRehandshake ehs = error "acceptRehandshake" | ||
557 | |||
558 | handleExtended :: Handler ExtendedMessage | ||
559 | handleExtended (EHandshake ehs) = acceptRehandshake ehs | ||
560 | handleExtended (EMetadata _ msg) = handleMetadata msg | ||
561 | handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" | ||
562 | |||
563 | handleMessage :: Handler Message | ||
564 | handleMessage KeepAlive = return () | ||
565 | handleMessage (Status s) = handleStatus s | ||
566 | handleMessage (Available msg) = handleAvailable msg | ||
567 | handleMessage (Transfer msg) = handleTransfer msg | ||
568 | handleMessage (Port n) = error "handleMessage" | ||
569 | handleMessage (Fast _) = error "handleMessage" | ||
570 | handleMessage (Extended msg) = handleExtended msg | ||
571 | |||
572 | exchange :: Wire Session () | ||
573 | exchange = do | ||
574 | waitForMetadata | ||
575 | bf <- undefined --getThisBitfield | ||
576 | sendMessage (Bitfield bf) | ||
577 | awaitForever handleMessage | ||
578 | |||
579 | mainWire :: Wire Session () | ||
580 | mainWire = do | ||
581 | lift establishedConnection | ||
582 | Session {..} <- asks connSession | ||
583 | -- lift $ resizeBitfield (totalPieces storage) | ||
584 | logEvent "Connection established" | ||
585 | iterM logMessage =$= exchange =$= iterM logMessage | ||
586 | lift finishedConnection | ||