diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 428 |
1 files changed, 1 insertions, 427 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index c1377449..934c646d 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -1,16 +1,3 @@ | |||
1 | {- TODO turn awaitEvent and yieldEvent to sourcePeer and sinkPeer | ||
2 | |||
3 | sourceSocket sock $= | ||
4 | conduitGet S.get $= | ||
5 | sourcePeer $= | ||
6 | p2p $= | ||
7 | sinkPeer $= | ||
8 | conduitPut S.put $$ | ||
9 | sinkSocket sock | ||
10 | |||
11 | measure performance | ||
12 | -} | ||
13 | |||
14 | -- | | 1 | -- | |
15 | -- Copyright : (c) Sam Truzjan 2013 | 2 | -- Copyright : (c) Sam Truzjan 2013 |
16 | -- License : BSD3 | 3 | -- License : BSD3 |
@@ -18,419 +5,6 @@ | |||
18 | -- Stability : experimental | 5 | -- Stability : experimental |
19 | -- Portability : portable | 6 | -- Portability : portable |
20 | -- | 7 | -- |
21 | -- This module provides P2P communication and aims to hide the | ||
22 | -- following stuff under the hood: | ||
23 | -- | ||
24 | -- * TODO; | ||
25 | -- | ||
26 | -- * /keep alives/ -- ; | ||
27 | -- | ||
28 | -- * /choking mechanism/ -- is used ; | ||
29 | -- | ||
30 | -- * /message broadcasting/ -- ; | ||
31 | -- | ||
32 | -- * /message filtering/ -- due to network latency and concurrency | ||
33 | -- some arriving messages might not make sense in the current | ||
34 | -- session context; | ||
35 | -- | ||
36 | -- * /scatter\/gather pieces/ -- ; | ||
37 | -- | ||
38 | -- * /various P2P protocol extensions/ -- . | ||
39 | -- | ||
40 | -- Finally we get a simple event-based communication model. | ||
41 | -- | ||
42 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
43 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
44 | {-# LANGUAGE BangPatterns #-} | ||
45 | module Network.BitTorrent.Exchange | 8 | module Network.BitTorrent.Exchange |
46 | ( P2P | 9 | ( |
47 | , runP2P | ||
48 | |||
49 | -- * Query | ||
50 | , getHaveCount | ||
51 | , getWantCount | ||
52 | , getPieceCount | ||
53 | , peerOffer | ||
54 | |||
55 | -- * Events | ||
56 | , Event(..) | ||
57 | , awaitEvent | ||
58 | , yieldEvent | ||
59 | , handleEvent | ||
60 | , exchange | ||
61 | , p2p | ||
62 | |||
63 | -- * Exceptions | ||
64 | , disconnect | ||
65 | , protocolError | ||
66 | |||
67 | -- * Block | ||
68 | , Block(..), BlockIx(..) | ||
69 | |||
70 | -- * Status | ||
71 | , PeerStatus(..), SessionStatus(..) | ||
72 | , inverseStatus | ||
73 | , canDownload, canUpload | ||
74 | ) where | 10 | ) where |
75 | |||
76 | import Control.Applicative | ||
77 | import Control.Concurrent.STM | ||
78 | import Control.Exception | ||
79 | import Control.Lens | ||
80 | import Control.Monad.Reader | ||
81 | import Control.Monad.State | ||
82 | import Control.Monad.Trans.Resource | ||
83 | |||
84 | import Data.IORef | ||
85 | import Data.Conduit as C | ||
86 | import Data.Conduit.Cereal as S | ||
87 | import Data.Conduit.Network | ||
88 | import Data.Serialize as S | ||
89 | import Text.PrettyPrint as PP hiding (($$)) | ||
90 | |||
91 | import Network | ||
92 | |||
93 | import Data.Torrent.Block | ||
94 | import Data.Torrent.Bitfield as BF | ||
95 | import Network.BitTorrent.Extension | ||
96 | import Network.BitTorrent.Exchange.Protocol | ||
97 | import Network.BitTorrent.Sessions.Types | ||
98 | import System.Torrent.Storage | ||
99 | |||
100 | |||
101 | {----------------------------------------------------------------------- | ||
102 | Exceptions | ||
103 | -----------------------------------------------------------------------} | ||
104 | |||
105 | -- | Terminate the current 'P2P' session. | ||
106 | disconnect :: P2P a | ||
107 | disconnect = monadThrow PeerDisconnected | ||
108 | |||
109 | -- TODO handle all protocol details here so we can hide this from | ||
110 | -- public interface | | ||
111 | protocolError :: Doc -> P2P a | ||
112 | protocolError = monadThrow . ProtocolError | ||
113 | |||
114 | {----------------------------------------------------------------------- | ||
115 | Helpers | ||
116 | -----------------------------------------------------------------------} | ||
117 | |||
118 | getClientBF :: P2P Bitfield | ||
119 | getClientBF = asks swarmSession >>= liftIO . getClientBitfield | ||
120 | {-# INLINE getClientBF #-} | ||
121 | |||
122 | -- | Count of client /have/ pieces. | ||
123 | getHaveCount :: P2P PieceCount | ||
124 | getHaveCount = haveCount <$> getClientBF | ||
125 | {-# INLINE getHaveCount #-} | ||
126 | |||
127 | -- | Count of client do not /have/ pieces. | ||
128 | getWantCount :: P2P PieceCount | ||
129 | getWantCount = totalCount <$> getClientBF | ||
130 | {-# INLINE getWantCount #-} | ||
131 | |||
132 | -- | Count of both /have/ and /want/ pieces. | ||
133 | getPieceCount :: P2P PieceCount | ||
134 | getPieceCount = asks findPieceCount | ||
135 | {-# INLINE getPieceCount #-} | ||
136 | |||
137 | -- for internal use only | ||
138 | emptyBF :: P2P Bitfield | ||
139 | emptyBF = liftM haveNone getPieceCount | ||
140 | |||
141 | fullBF :: P2P Bitfield | ||
142 | fullBF = liftM haveAll getPieceCount | ||
143 | |||
144 | singletonBF :: PieceIx -> P2P Bitfield | ||
145 | singletonBF i = liftM (BF.singleton i) getPieceCount | ||
146 | |||
147 | adjustBF :: Bitfield -> P2P Bitfield | ||
148 | adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount | ||
149 | |||
150 | peerWant :: P2P Bitfield | ||
151 | peerWant = BF.difference <$> getClientBF <*> use bitfield | ||
152 | |||
153 | clientWant :: P2P Bitfield | ||
154 | clientWant = BF.difference <$> use bitfield <*> getClientBF | ||
155 | |||
156 | peerOffer :: P2P Bitfield | ||
157 | peerOffer = do | ||
158 | sessionStatus <- use status | ||
159 | if canDownload sessionStatus then clientWant else emptyBF | ||
160 | |||
161 | clientOffer :: P2P Bitfield | ||
162 | clientOffer = do | ||
163 | sessionStatus <- use status | ||
164 | if canUpload sessionStatus then peerWant else emptyBF | ||
165 | |||
166 | |||
167 | |||
168 | revise :: P2P Bitfield | ||
169 | revise = do | ||
170 | want <- clientWant | ||
171 | let peerInteresting = not (BF.null want) | ||
172 | clientInterested <- use (status.clientStatus.interested) | ||
173 | |||
174 | when (clientInterested /= peerInteresting) $ do | ||
175 | yieldMessage $ if peerInteresting then Interested else NotInterested | ||
176 | status.clientStatus.interested .= peerInteresting | ||
177 | |||
178 | return want | ||
179 | |||
180 | requireExtension :: Extension -> P2P () | ||
181 | requireExtension required = do | ||
182 | enabled <- asks enabledExtensions | ||
183 | unless (required `elem` enabled) $ | ||
184 | protocolError $ ppExtension required <+> "not enabled" | ||
185 | |||
186 | -- haveMessage bf = do | ||
187 | -- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession | ||
188 | -- if undefined -- ix `member` bf | ||
189 | -- then nextEvent se | ||
190 | -- else undefined -- return $ Available diff | ||
191 | |||
192 | {----------------------------------------------------------------------- | ||
193 | Exchange | ||
194 | -----------------------------------------------------------------------} | ||
195 | |||
196 | |||
197 | -- | The 'Event' occur when either client or a peer change their | ||
198 | -- state. 'Event' are similar to 'Message' but differ in. We could | ||
199 | -- both wait for an event or raise an event using the 'awaitEvent' and | ||
200 | -- 'yieldEvent' functions respectively. | ||
201 | -- | ||
202 | -- | ||
203 | -- 'awaitEvent'\/'yieldEvent' properties: | ||
204 | -- | ||
205 | -- * between any await or yield state of the (another)peer could not change. | ||
206 | -- | ||
207 | data Event | ||
208 | -- | Generalize 'Bitfield', 'Have', 'HaveAll', 'HaveNone', | ||
209 | -- 'SuggestPiece', 'AllowedFast' messages. | ||
210 | = Available Bitfield | ||
211 | |||
212 | -- | Generalize 'Request' and 'Interested' messages. | ||
213 | | Want BlockIx | ||
214 | |||
215 | -- | Generalize 'Piece' and 'Unchoke' messages. | ||
216 | | Fragment Block | ||
217 | deriving Show | ||
218 | |||
219 | -- INVARIANT: | ||
220 | -- | ||
221 | -- * Available Bitfield is never empty | ||
222 | -- | ||
223 | |||
224 | -- | You could think of 'awaitEvent' as wait until something interesting occur. | ||
225 | -- | ||
226 | -- The following table shows which events may occur: | ||
227 | -- | ||
228 | -- > +----------+---------+ | ||
229 | -- > | Leacher | Seeder | | ||
230 | -- > |----------+---------+ | ||
231 | -- > | Available| | | ||
232 | -- > | Want | Want | | ||
233 | -- > | Fragment | | | ||
234 | -- > +----------+---------+ | ||
235 | -- | ||
236 | -- The reason is that seeder is not interested in any piece, and | ||
237 | -- both available or fragment events doesn't make sense in this context. | ||
238 | -- | ||
239 | -- Some properties: | ||
240 | -- | ||
241 | -- forall (Fragment block). isPiece block == True | ||
242 | -- | ||
243 | awaitEvent :: P2P Event | ||
244 | awaitEvent = {-# SCC awaitEvent #-} do | ||
245 | flushPending | ||
246 | msg <- awaitMessage | ||
247 | go msg | ||
248 | where | ||
249 | go KeepAlive = awaitEvent | ||
250 | go Choke = do | ||
251 | status.peerStatus.choking .= True | ||
252 | awaitEvent | ||
253 | |||
254 | go Unchoke = do | ||
255 | status.peerStatus.choking .= False | ||
256 | offer <- peerOffer | ||
257 | if BF.null offer | ||
258 | then awaitEvent | ||
259 | else return (Available offer) | ||
260 | |||
261 | go Interested = do | ||
262 | status.peerStatus.interested .= True | ||
263 | awaitEvent | ||
264 | |||
265 | go NotInterested = do | ||
266 | status.peerStatus.interested .= False | ||
267 | awaitEvent | ||
268 | |||
269 | go (Have idx) = do | ||
270 | bitfield %= have idx | ||
271 | _ <- revise | ||
272 | |||
273 | offer <- peerOffer | ||
274 | if not (BF.null offer) | ||
275 | then return (Available offer) | ||
276 | else awaitEvent | ||
277 | |||
278 | go (Bitfield bf) = do | ||
279 | new <- adjustBF bf | ||
280 | bitfield .= new | ||
281 | _ <- revise | ||
282 | |||
283 | offer <- peerOffer | ||
284 | if not (BF.null offer) | ||
285 | then return (Available offer) | ||
286 | else awaitEvent | ||
287 | |||
288 | go (Request bix) = do | ||
289 | bf <- clientOffer | ||
290 | if ixPiece bix `BF.member` bf | ||
291 | then return (Want bix) | ||
292 | else do | ||
293 | -- check if extension is enabled | ||
294 | -- yieldMessage (RejectRequest bix) | ||
295 | awaitEvent | ||
296 | |||
297 | go (Piece blk) = do | ||
298 | -- this protect us from malicious peers and duplication | ||
299 | wanted <- clientWant | ||
300 | if blkPiece blk `BF.member` wanted | ||
301 | then return (Fragment blk) | ||
302 | else awaitEvent | ||
303 | |||
304 | go (Cancel _) = do | ||
305 | error "cancel message not implemented" | ||
306 | |||
307 | go (Port _) = do | ||
308 | requireExtension ExtDHT | ||
309 | error "port message not implemented" | ||
310 | |||
311 | go HaveAll = do | ||
312 | requireExtension ExtFast | ||
313 | bitfield <~ fullBF | ||
314 | _ <- revise | ||
315 | awaitEvent | ||
316 | |||
317 | go HaveNone = do | ||
318 | requireExtension ExtFast | ||
319 | bitfield <~ emptyBF | ||
320 | _ <- revise | ||
321 | awaitEvent | ||
322 | |||
323 | go (SuggestPiece idx) = do | ||
324 | requireExtension ExtFast | ||
325 | bf <- use bitfield | ||
326 | if idx `BF.notMember` bf | ||
327 | then Available <$> singletonBF idx | ||
328 | else awaitEvent | ||
329 | |||
330 | go (RejectRequest _) = do | ||
331 | requireExtension ExtFast | ||
332 | awaitEvent | ||
333 | |||
334 | go (AllowedFast _) = do | ||
335 | requireExtension ExtFast | ||
336 | awaitEvent | ||
337 | |||
338 | -- TODO minimize number of peerOffer calls | ||
339 | |||
340 | -- | Raise an events which may occur | ||
341 | -- | ||
342 | -- This table shows when a some specific events /makes sense/ to yield: | ||
343 | -- | ||
344 | -- @ | ||
345 | -- +----------+---------+ | ||
346 | -- | Leacher | Seeder | | ||
347 | -- |----------+---------+ | ||
348 | -- | Available| | | ||
349 | -- | Want |Fragment | | ||
350 | -- | Fragment | | | ||
351 | -- +----------+---------+ | ||
352 | -- @ | ||
353 | -- | ||
354 | -- Seeder should not yield: | ||
355 | -- | ||
356 | -- * Available -- seeder could not store anything new. | ||
357 | -- | ||
358 | -- * Want -- seeder alread have everything, no reason to want. | ||
359 | -- | ||
360 | -- Hovewer, it's okay to not obey the rules -- if we are yield some | ||
361 | -- event which doesn't /makes sense/ in the current context then it | ||
362 | -- most likely will be ignored without any network IO. | ||
363 | -- | ||
364 | yieldEvent :: Event -> P2P () | ||
365 | yieldEvent e = {-# SCC yieldEvent #-} do | ||
366 | go e | ||
367 | flushPending | ||
368 | where | ||
369 | go (Available ixs) = do | ||
370 | ses <- asks swarmSession | ||
371 | liftIO $ atomically $ available ixs ses | ||
372 | |||
373 | go (Want bix) = do | ||
374 | offer <- peerOffer | ||
375 | if ixPiece bix `BF.member` offer | ||
376 | then yieldMessage (Request bix) | ||
377 | else return () | ||
378 | |||
379 | go (Fragment blk) = do | ||
380 | offer <- clientOffer | ||
381 | if blkPiece blk `BF.member` offer | ||
382 | then yieldMessage (Piece blk) | ||
383 | else return () | ||
384 | |||
385 | |||
386 | handleEvent :: (Event -> P2P Event) -> P2P () | ||
387 | handleEvent action = awaitEvent >>= action >>= yieldEvent | ||
388 | |||
389 | -- Event translation table looks like: | ||
390 | -- | ||
391 | -- Available -> Want | ||
392 | -- Want -> Fragment | ||
393 | -- Fragment -> Available | ||
394 | -- | ||
395 | -- If we join the chain we get the event loop: | ||
396 | -- | ||
397 | -- Available -> Want -> Fragment --\ | ||
398 | -- /|\ | | ||
399 | -- \---------------------------/ | ||
400 | -- | ||
401 | |||
402 | |||
403 | -- | Default P2P action. | ||
404 | exchange :: Storage -> P2P () | ||
405 | exchange storage = {-# SCC exchange #-} awaitEvent >>= handler | ||
406 | where | ||
407 | handler (Available bf) = do | ||
408 | ixs <- selBlk (findMin bf) storage | ||
409 | mapM_ (yieldEvent . Want) ixs -- TODO yield vectored | ||
410 | |||
411 | handler (Want bix) = do | ||
412 | liftIO $ print bix | ||
413 | blk <- liftIO $ getBlk bix storage | ||
414 | yieldEvent (Fragment blk) | ||
415 | |||
416 | handler (Fragment blk @ Block {..}) = do | ||
417 | done <- liftIO $ putBlk blk storage | ||
418 | when done $ do | ||
419 | yieldEvent $ Available $ singleton blkPiece (succ blkPiece) | ||
420 | |||
421 | -- WARN this is not reliable: if peer do not return all piece | ||
422 | -- block we could slow don't until some other event occured | ||
423 | offer <- peerOffer | ||
424 | if BF.null offer | ||
425 | then return () | ||
426 | else handler (Available offer) | ||
427 | |||
428 | yieldInit :: P2P () | ||
429 | yieldInit = yieldMessage . Bitfield =<< getClientBF | ||
430 | |||
431 | p2p :: P2P () | ||
432 | p2p = do | ||
433 | yieldInit | ||
434 | storage <- asks (storage . swarmSession) | ||
435 | forever $ do | ||
436 | exchange storage \ No newline at end of file | ||