diff options
-rw-r--r-- | bittorrent.cabal | 4 | ||||
-rw-r--r-- | src/Network/BitTorrent.hs | 123 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 428 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 446 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions/Types.lhs | 316 |
5 files changed, 3 insertions, 1314 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal index b661cf72..d352cbd0 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal | |||
@@ -53,7 +53,7 @@ library | |||
53 | Data.Torrent.Piece | 53 | Data.Torrent.Piece |
54 | Data.Torrent.Progress | 54 | Data.Torrent.Progress |
55 | Data.Torrent.Tree | 55 | Data.Torrent.Tree |
56 | -- Network.BitTorrent | 56 | Network.BitTorrent |
57 | Network.BitTorrent.Client | 57 | Network.BitTorrent.Client |
58 | Network.BitTorrent.Client.Swarm | 58 | Network.BitTorrent.Client.Swarm |
59 | Network.BitTorrent.Core | 59 | Network.BitTorrent.Core |
@@ -81,8 +81,6 @@ library | |||
81 | Network.BitTorrent.Tracker.RPC.UDP | 81 | Network.BitTorrent.Tracker.RPC.UDP |
82 | Network.BitTorrent.Tracker.Wai | 82 | Network.BitTorrent.Tracker.Wai |
83 | -- Network.BitTorrent.Tracker.Session | 83 | -- Network.BitTorrent.Tracker.Session |
84 | -- Network.BitTorrent.Session | ||
85 | -- Network.BitTorrent.Session.Types | ||
86 | System.Torrent.FileMap | 84 | System.Torrent.FileMap |
87 | System.Torrent.Storage | 85 | System.Torrent.Storage |
88 | other-modules: Paths_bittorrent | 86 | other-modules: Paths_bittorrent |
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index d8888416..21528efd 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs | |||
@@ -7,126 +7,5 @@ | |||
7 | -- | 7 | -- |
8 | {-# LANGUAGE RecordWildCards #-} | 8 | {-# LANGUAGE RecordWildCards #-} |
9 | module Network.BitTorrent | 9 | module Network.BitTorrent |
10 | ( module Data.Torrent | 10 | ( |
11 | |||
12 | , TorrentLoc(..), TorrentMap, Progress(..) | ||
13 | , ThreadCount, SessionCount | ||
14 | |||
15 | , ClientSession( clientPeerId, allowedExtensions ) | ||
16 | , withDefaultClient, defaultThreadCount, defaultPorts | ||
17 | , addTorrent | ||
18 | , removeTorrent | ||
19 | |||
20 | , getCurrentProgress | ||
21 | , getPeerCount | ||
22 | , getSwarmCount | ||
23 | , getSessionCount | ||
24 | , getSwarm | ||
25 | , getStorage | ||
26 | , getTorrentInfo | ||
27 | , getTorrentInfoStr | ||
28 | |||
29 | -- * Torrent Groups | ||
30 | , ClientLoc (..), ppClientLoc | ||
31 | , concreteLoc, concretePath | ||
32 | , addTorrents | ||
33 | , removeTorrents | ||
34 | |||
35 | -- * Extensions | ||
36 | , Extension | ||
37 | , defaultExtensions | ||
38 | , ppExtension | ||
39 | ) where | 11 | ) where |
40 | |||
41 | import Control.Applicative | ||
42 | import Control.Exception | ||
43 | import Control.Monad | ||
44 | import Data.List as L | ||
45 | import Data.HashMap.Strict as HM | ||
46 | import Network | ||
47 | import Text.Read | ||
48 | import Text.PrettyPrint | ||
49 | import System.Directory | ||
50 | import System.FilePath | ||
51 | |||
52 | import Data.Torrent | ||
53 | import Network.BitTorrent.Sessions.Types | ||
54 | import Network.BitTorrent.Sessions | ||
55 | import Network.BitTorrent.Extension | ||
56 | import Network.BitTorrent.Tracker | ||
57 | |||
58 | |||
59 | -- TODO remove fork from Network.BitTorrent.Exchange | ||
60 | -- TODO make all forks in Internal. | ||
61 | |||
62 | -- | Client session with default parameters. Use it for testing only. | ||
63 | withDefaultClient :: PortNumber -> PortNumber -> (ClientSession -> IO ()) -> IO () | ||
64 | withDefaultClient listPort dhtPort action = do | ||
65 | withClientSession defaultThreadCount [] listPort dhtPort action | ||
66 | |||
67 | getTorrentInfoStr :: ClientSession -> String -> IO (Maybe Torrent) | ||
68 | getTorrentInfoStr cs str | ||
69 | | Just infohash <- readMaybe str = getTorrentInfo cs infohash | ||
70 | | otherwise = return Nothing | ||
71 | |||
72 | {----------------------------------------------------------------------- | ||
73 | Torrent management | ||
74 | -----------------------------------------------------------------------} | ||
75 | |||
76 | -- | Register torrent and start downloading. | ||
77 | addTorrent :: ClientSession -> TorrentLoc -> IO () | ||
78 | addTorrent cs loc @ TorrentLoc {..} = do | ||
79 | registerTorrent cs loc | ||
80 | openSwarmSession cs loc | ||
81 | return () | ||
82 | |||
83 | -- | Unregister torrent and stop all running sessions. | ||
84 | removeTorrent :: ClientSession -> InfoHash -> IO () | ||
85 | removeTorrent = unregisterTorrent | ||
86 | |||
87 | {- | ||
88 | -- | The same as 'removeTorrrent' torrent, but delete all torrent | ||
89 | -- content files. | ||
90 | deleteTorrent :: ClientSession -> TorrentLoc -> IO () | ||
91 | deleteTorrent ClientSession {..} TorrentLoc {..} = undefined | ||
92 | -} | ||
93 | |||
94 | {----------------------------------------------------------------------- | ||
95 | Torrent group management | ||
96 | -----------------------------------------------------------------------} | ||
97 | -- TODO better name | ||
98 | |||
99 | data ClientLoc = ClientLoc | ||
100 | { tdir :: FilePath -- ^ Path to directory with .torrent files. | ||
101 | , ddir :: FilePath -- ^ Path to directory to place content. | ||
102 | } deriving (Show, Eq) | ||
103 | |||
104 | ppClientLoc :: ClientLoc -> Doc | ||
105 | ppClientLoc ClientLoc {..} = | ||
106 | text "torrent directory" <+> text tdir $$ | ||
107 | text "data directory" <+> text ddir | ||
108 | |||
109 | concretePath :: ClientLoc -> FilePath -> FilePath | ||
110 | concretePath ClientLoc {..} relPath = tdir </> relPath | ||
111 | |||
112 | concreteLoc :: ClientLoc -> FilePath -> TorrentLoc | ||
113 | concreteLoc loc @ ClientLoc {..} relPath | ||
114 | = TorrentLoc (concretePath loc relPath) ddir | ||
115 | |||
116 | addTorrents :: ClientSession -> ClientLoc -> IO () | ||
117 | addTorrents ses loc @ ClientLoc {..} = do | ||
118 | paths <- L.filter isTorrentPath <$> getDirectoryContents tdir | ||
119 | forM_ paths $ handle handler . addTorrent ses . concreteLoc loc | ||
120 | where | ||
121 | handler :: SomeException -> IO () | ||
122 | handler = print | ||
123 | |||
124 | removeTorrents :: ClientSession -> IO () | ||
125 | removeTorrents cs = do | ||
126 | tm <- getRegistered cs | ||
127 | forM_ (keys tm) (removeTorrent cs) | ||
128 | |||
129 | {- | ||
130 | deleteTorrents :: ClientSession -> IO () | ||
131 | deleteTorrents = undefined | ||
132 | -} \ No newline at end of file | ||
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index c1377449..934c646d 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -1,16 +1,3 @@ | |||
1 | {- TODO turn awaitEvent and yieldEvent to sourcePeer and sinkPeer | ||
2 | |||
3 | sourceSocket sock $= | ||
4 | conduitGet S.get $= | ||
5 | sourcePeer $= | ||
6 | p2p $= | ||
7 | sinkPeer $= | ||
8 | conduitPut S.put $$ | ||
9 | sinkSocket sock | ||
10 | |||
11 | measure performance | ||
12 | -} | ||
13 | |||
14 | -- | | 1 | -- | |
15 | -- Copyright : (c) Sam Truzjan 2013 | 2 | -- Copyright : (c) Sam Truzjan 2013 |
16 | -- License : BSD3 | 3 | -- License : BSD3 |
@@ -18,419 +5,6 @@ | |||
18 | -- Stability : experimental | 5 | -- Stability : experimental |
19 | -- Portability : portable | 6 | -- Portability : portable |
20 | -- | 7 | -- |
21 | -- This module provides P2P communication and aims to hide the | ||
22 | -- following stuff under the hood: | ||
23 | -- | ||
24 | -- * TODO; | ||
25 | -- | ||
26 | -- * /keep alives/ -- ; | ||
27 | -- | ||
28 | -- * /choking mechanism/ -- is used ; | ||
29 | -- | ||
30 | -- * /message broadcasting/ -- ; | ||
31 | -- | ||
32 | -- * /message filtering/ -- due to network latency and concurrency | ||
33 | -- some arriving messages might not make sense in the current | ||
34 | -- session context; | ||
35 | -- | ||
36 | -- * /scatter\/gather pieces/ -- ; | ||
37 | -- | ||
38 | -- * /various P2P protocol extensions/ -- . | ||
39 | -- | ||
40 | -- Finally we get a simple event-based communication model. | ||
41 | -- | ||
42 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
43 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
44 | {-# LANGUAGE BangPatterns #-} | ||
45 | module Network.BitTorrent.Exchange | 8 | module Network.BitTorrent.Exchange |
46 | ( P2P | 9 | ( |
47 | , runP2P | ||
48 | |||
49 | -- * Query | ||
50 | , getHaveCount | ||
51 | , getWantCount | ||
52 | , getPieceCount | ||
53 | , peerOffer | ||
54 | |||
55 | -- * Events | ||
56 | , Event(..) | ||
57 | , awaitEvent | ||
58 | , yieldEvent | ||
59 | , handleEvent | ||
60 | , exchange | ||
61 | , p2p | ||
62 | |||
63 | -- * Exceptions | ||
64 | , disconnect | ||
65 | , protocolError | ||
66 | |||
67 | -- * Block | ||
68 | , Block(..), BlockIx(..) | ||
69 | |||
70 | -- * Status | ||
71 | , PeerStatus(..), SessionStatus(..) | ||
72 | , inverseStatus | ||
73 | , canDownload, canUpload | ||
74 | ) where | 10 | ) where |
75 | |||
76 | import Control.Applicative | ||
77 | import Control.Concurrent.STM | ||
78 | import Control.Exception | ||
79 | import Control.Lens | ||
80 | import Control.Monad.Reader | ||
81 | import Control.Monad.State | ||
82 | import Control.Monad.Trans.Resource | ||
83 | |||
84 | import Data.IORef | ||
85 | import Data.Conduit as C | ||
86 | import Data.Conduit.Cereal as S | ||
87 | import Data.Conduit.Network | ||
88 | import Data.Serialize as S | ||
89 | import Text.PrettyPrint as PP hiding (($$)) | ||
90 | |||
91 | import Network | ||
92 | |||
93 | import Data.Torrent.Block | ||
94 | import Data.Torrent.Bitfield as BF | ||
95 | import Network.BitTorrent.Extension | ||
96 | import Network.BitTorrent.Exchange.Protocol | ||
97 | import Network.BitTorrent.Sessions.Types | ||
98 | import System.Torrent.Storage | ||
99 | |||
100 | |||
101 | {----------------------------------------------------------------------- | ||
102 | Exceptions | ||
103 | -----------------------------------------------------------------------} | ||
104 | |||
105 | -- | Terminate the current 'P2P' session. | ||
106 | disconnect :: P2P a | ||
107 | disconnect = monadThrow PeerDisconnected | ||
108 | |||
109 | -- TODO handle all protocol details here so we can hide this from | ||
110 | -- public interface | | ||
111 | protocolError :: Doc -> P2P a | ||
112 | protocolError = monadThrow . ProtocolError | ||
113 | |||
114 | {----------------------------------------------------------------------- | ||
115 | Helpers | ||
116 | -----------------------------------------------------------------------} | ||
117 | |||
118 | getClientBF :: P2P Bitfield | ||
119 | getClientBF = asks swarmSession >>= liftIO . getClientBitfield | ||
120 | {-# INLINE getClientBF #-} | ||
121 | |||
122 | -- | Count of client /have/ pieces. | ||
123 | getHaveCount :: P2P PieceCount | ||
124 | getHaveCount = haveCount <$> getClientBF | ||
125 | {-# INLINE getHaveCount #-} | ||
126 | |||
127 | -- | Count of client do not /have/ pieces. | ||
128 | getWantCount :: P2P PieceCount | ||
129 | getWantCount = totalCount <$> getClientBF | ||
130 | {-# INLINE getWantCount #-} | ||
131 | |||
132 | -- | Count of both /have/ and /want/ pieces. | ||
133 | getPieceCount :: P2P PieceCount | ||
134 | getPieceCount = asks findPieceCount | ||
135 | {-# INLINE getPieceCount #-} | ||
136 | |||
137 | -- for internal use only | ||
138 | emptyBF :: P2P Bitfield | ||
139 | emptyBF = liftM haveNone getPieceCount | ||
140 | |||
141 | fullBF :: P2P Bitfield | ||
142 | fullBF = liftM haveAll getPieceCount | ||
143 | |||
144 | singletonBF :: PieceIx -> P2P Bitfield | ||
145 | singletonBF i = liftM (BF.singleton i) getPieceCount | ||
146 | |||
147 | adjustBF :: Bitfield -> P2P Bitfield | ||
148 | adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount | ||
149 | |||
150 | peerWant :: P2P Bitfield | ||
151 | peerWant = BF.difference <$> getClientBF <*> use bitfield | ||
152 | |||
153 | clientWant :: P2P Bitfield | ||
154 | clientWant = BF.difference <$> use bitfield <*> getClientBF | ||
155 | |||
156 | peerOffer :: P2P Bitfield | ||
157 | peerOffer = do | ||
158 | sessionStatus <- use status | ||
159 | if canDownload sessionStatus then clientWant else emptyBF | ||
160 | |||
161 | clientOffer :: P2P Bitfield | ||
162 | clientOffer = do | ||
163 | sessionStatus <- use status | ||
164 | if canUpload sessionStatus then peerWant else emptyBF | ||
165 | |||
166 | |||
167 | |||
168 | revise :: P2P Bitfield | ||
169 | revise = do | ||
170 | want <- clientWant | ||
171 | let peerInteresting = not (BF.null want) | ||
172 | clientInterested <- use (status.clientStatus.interested) | ||
173 | |||
174 | when (clientInterested /= peerInteresting) $ do | ||
175 | yieldMessage $ if peerInteresting then Interested else NotInterested | ||
176 | status.clientStatus.interested .= peerInteresting | ||
177 | |||
178 | return want | ||
179 | |||
180 | requireExtension :: Extension -> P2P () | ||
181 | requireExtension required = do | ||
182 | enabled <- asks enabledExtensions | ||
183 | unless (required `elem` enabled) $ | ||
184 | protocolError $ ppExtension required <+> "not enabled" | ||
185 | |||
186 | -- haveMessage bf = do | ||
187 | -- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession | ||
188 | -- if undefined -- ix `member` bf | ||
189 | -- then nextEvent se | ||
190 | -- else undefined -- return $ Available diff | ||
191 | |||
192 | {----------------------------------------------------------------------- | ||
193 | Exchange | ||
194 | -----------------------------------------------------------------------} | ||
195 | |||
196 | |||
197 | -- | The 'Event' occur when either client or a peer change their | ||
198 | -- state. 'Event' are similar to 'Message' but differ in. We could | ||
199 | -- both wait for an event or raise an event using the 'awaitEvent' and | ||
200 | -- 'yieldEvent' functions respectively. | ||
201 | -- | ||
202 | -- | ||
203 | -- 'awaitEvent'\/'yieldEvent' properties: | ||
204 | -- | ||
205 | -- * between any await or yield state of the (another)peer could not change. | ||
206 | -- | ||
207 | data Event | ||
208 | -- | Generalize 'Bitfield', 'Have', 'HaveAll', 'HaveNone', | ||
209 | -- 'SuggestPiece', 'AllowedFast' messages. | ||
210 | = Available Bitfield | ||
211 | |||
212 | -- | Generalize 'Request' and 'Interested' messages. | ||
213 | | Want BlockIx | ||
214 | |||
215 | -- | Generalize 'Piece' and 'Unchoke' messages. | ||
216 | | Fragment Block | ||
217 | deriving Show | ||
218 | |||
219 | -- INVARIANT: | ||
220 | -- | ||
221 | -- * Available Bitfield is never empty | ||
222 | -- | ||
223 | |||
224 | -- | You could think of 'awaitEvent' as wait until something interesting occur. | ||
225 | -- | ||
226 | -- The following table shows which events may occur: | ||
227 | -- | ||
228 | -- > +----------+---------+ | ||
229 | -- > | Leacher | Seeder | | ||
230 | -- > |----------+---------+ | ||
231 | -- > | Available| | | ||
232 | -- > | Want | Want | | ||
233 | -- > | Fragment | | | ||
234 | -- > +----------+---------+ | ||
235 | -- | ||
236 | -- The reason is that seeder is not interested in any piece, and | ||
237 | -- both available or fragment events doesn't make sense in this context. | ||
238 | -- | ||
239 | -- Some properties: | ||
240 | -- | ||
241 | -- forall (Fragment block). isPiece block == True | ||
242 | -- | ||
243 | awaitEvent :: P2P Event | ||
244 | awaitEvent = {-# SCC awaitEvent #-} do | ||
245 | flushPending | ||
246 | msg <- awaitMessage | ||
247 | go msg | ||
248 | where | ||
249 | go KeepAlive = awaitEvent | ||
250 | go Choke = do | ||
251 | status.peerStatus.choking .= True | ||
252 | awaitEvent | ||
253 | |||
254 | go Unchoke = do | ||
255 | status.peerStatus.choking .= False | ||
256 | offer <- peerOffer | ||
257 | if BF.null offer | ||
258 | then awaitEvent | ||
259 | else return (Available offer) | ||
260 | |||
261 | go Interested = do | ||
262 | status.peerStatus.interested .= True | ||
263 | awaitEvent | ||
264 | |||
265 | go NotInterested = do | ||
266 | status.peerStatus.interested .= False | ||
267 | awaitEvent | ||
268 | |||
269 | go (Have idx) = do | ||
270 | bitfield %= have idx | ||
271 | _ <- revise | ||
272 | |||
273 | offer <- peerOffer | ||
274 | if not (BF.null offer) | ||
275 | then return (Available offer) | ||
276 | else awaitEvent | ||
277 | |||
278 | go (Bitfield bf) = do | ||
279 | new <- adjustBF bf | ||
280 | bitfield .= new | ||
281 | _ <- revise | ||
282 | |||
283 | offer <- peerOffer | ||
284 | if not (BF.null offer) | ||
285 | then return (Available offer) | ||
286 | else awaitEvent | ||
287 | |||
288 | go (Request bix) = do | ||
289 | bf <- clientOffer | ||
290 | if ixPiece bix `BF.member` bf | ||
291 | then return (Want bix) | ||
292 | else do | ||
293 | -- check if extension is enabled | ||
294 | -- yieldMessage (RejectRequest bix) | ||
295 | awaitEvent | ||
296 | |||
297 | go (Piece blk) = do | ||
298 | -- this protect us from malicious peers and duplication | ||
299 | wanted <- clientWant | ||
300 | if blkPiece blk `BF.member` wanted | ||
301 | then return (Fragment blk) | ||
302 | else awaitEvent | ||
303 | |||
304 | go (Cancel _) = do | ||
305 | error "cancel message not implemented" | ||
306 | |||
307 | go (Port _) = do | ||
308 | requireExtension ExtDHT | ||
309 | error "port message not implemented" | ||
310 | |||
311 | go HaveAll = do | ||
312 | requireExtension ExtFast | ||
313 | bitfield <~ fullBF | ||
314 | _ <- revise | ||
315 | awaitEvent | ||
316 | |||
317 | go HaveNone = do | ||
318 | requireExtension ExtFast | ||
319 | bitfield <~ emptyBF | ||
320 | _ <- revise | ||
321 | awaitEvent | ||
322 | |||
323 | go (SuggestPiece idx) = do | ||
324 | requireExtension ExtFast | ||
325 | bf <- use bitfield | ||
326 | if idx `BF.notMember` bf | ||
327 | then Available <$> singletonBF idx | ||
328 | else awaitEvent | ||
329 | |||
330 | go (RejectRequest _) = do | ||
331 | requireExtension ExtFast | ||
332 | awaitEvent | ||
333 | |||
334 | go (AllowedFast _) = do | ||
335 | requireExtension ExtFast | ||
336 | awaitEvent | ||
337 | |||
338 | -- TODO minimize number of peerOffer calls | ||
339 | |||
340 | -- | Raise an events which may occur | ||
341 | -- | ||
342 | -- This table shows when a some specific events /makes sense/ to yield: | ||
343 | -- | ||
344 | -- @ | ||
345 | -- +----------+---------+ | ||
346 | -- | Leacher | Seeder | | ||
347 | -- |----------+---------+ | ||
348 | -- | Available| | | ||
349 | -- | Want |Fragment | | ||
350 | -- | Fragment | | | ||
351 | -- +----------+---------+ | ||
352 | -- @ | ||
353 | -- | ||
354 | -- Seeder should not yield: | ||
355 | -- | ||
356 | -- * Available -- seeder could not store anything new. | ||
357 | -- | ||
358 | -- * Want -- seeder alread have everything, no reason to want. | ||
359 | -- | ||
360 | -- Hovewer, it's okay to not obey the rules -- if we are yield some | ||
361 | -- event which doesn't /makes sense/ in the current context then it | ||
362 | -- most likely will be ignored without any network IO. | ||
363 | -- | ||
364 | yieldEvent :: Event -> P2P () | ||
365 | yieldEvent e = {-# SCC yieldEvent #-} do | ||
366 | go e | ||
367 | flushPending | ||
368 | where | ||
369 | go (Available ixs) = do | ||
370 | ses <- asks swarmSession | ||
371 | liftIO $ atomically $ available ixs ses | ||
372 | |||
373 | go (Want bix) = do | ||
374 | offer <- peerOffer | ||
375 | if ixPiece bix `BF.member` offer | ||
376 | then yieldMessage (Request bix) | ||
377 | else return () | ||
378 | |||
379 | go (Fragment blk) = do | ||
380 | offer <- clientOffer | ||
381 | if blkPiece blk `BF.member` offer | ||
382 | then yieldMessage (Piece blk) | ||
383 | else return () | ||
384 | |||
385 | |||
386 | handleEvent :: (Event -> P2P Event) -> P2P () | ||
387 | handleEvent action = awaitEvent >>= action >>= yieldEvent | ||
388 | |||
389 | -- Event translation table looks like: | ||
390 | -- | ||
391 | -- Available -> Want | ||
392 | -- Want -> Fragment | ||
393 | -- Fragment -> Available | ||
394 | -- | ||
395 | -- If we join the chain we get the event loop: | ||
396 | -- | ||
397 | -- Available -> Want -> Fragment --\ | ||
398 | -- /|\ | | ||
399 | -- \---------------------------/ | ||
400 | -- | ||
401 | |||
402 | |||
403 | -- | Default P2P action. | ||
404 | exchange :: Storage -> P2P () | ||
405 | exchange storage = {-# SCC exchange #-} awaitEvent >>= handler | ||
406 | where | ||
407 | handler (Available bf) = do | ||
408 | ixs <- selBlk (findMin bf) storage | ||
409 | mapM_ (yieldEvent . Want) ixs -- TODO yield vectored | ||
410 | |||
411 | handler (Want bix) = do | ||
412 | liftIO $ print bix | ||
413 | blk <- liftIO $ getBlk bix storage | ||
414 | yieldEvent (Fragment blk) | ||
415 | |||
416 | handler (Fragment blk @ Block {..}) = do | ||
417 | done <- liftIO $ putBlk blk storage | ||
418 | when done $ do | ||
419 | yieldEvent $ Available $ singleton blkPiece (succ blkPiece) | ||
420 | |||
421 | -- WARN this is not reliable: if peer do not return all piece | ||
422 | -- block we could slow don't until some other event occured | ||
423 | offer <- peerOffer | ||
424 | if BF.null offer | ||
425 | then return () | ||
426 | else handler (Available offer) | ||
427 | |||
428 | yieldInit :: P2P () | ||
429 | yieldInit = yieldMessage . Bitfield =<< getClientBF | ||
430 | |||
431 | p2p :: P2P () | ||
432 | p2p = do | ||
433 | yieldInit | ||
434 | storage <- asks (storage . swarmSession) | ||
435 | forever $ do | ||
436 | exchange storage \ No newline at end of file | ||
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs deleted file mode 100644 index d558438f..00000000 --- a/src/Network/BitTorrent/Sessions.hs +++ /dev/null | |||
@@ -1,446 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | module Network.BitTorrent.Sessions | ||
9 | ( -- * Progress | ||
10 | Progress(..), startProgress | ||
11 | , ClientService(..) | ||
12 | , startService | ||
13 | , withRunning | ||
14 | |||
15 | -- * Client | ||
16 | , ClientSession ( ClientSession | ||
17 | , clientPeerId, allowedExtensions | ||
18 | ) | ||
19 | , withClientSession | ||
20 | |||
21 | , ThreadCount | ||
22 | , defaultThreadCount | ||
23 | |||
24 | , TorrentLoc(..) | ||
25 | , registerTorrent | ||
26 | , unregisterTorrent | ||
27 | , getRegistered | ||
28 | |||
29 | , getCurrentProgress | ||
30 | , getSwarmCount | ||
31 | , getPeerCount | ||
32 | , getActiveSwarms | ||
33 | , getSwarm | ||
34 | , getStorage | ||
35 | , getTorrentInfo | ||
36 | , openSwarmSession | ||
37 | |||
38 | -- * Swarm | ||
39 | , SwarmSession( SwarmSession, torrentMeta | ||
40 | , clientSession, storage | ||
41 | ) | ||
42 | |||
43 | , SessionCount | ||
44 | , getSessionCount | ||
45 | , getClientBitfield | ||
46 | , getActivePeers | ||
47 | |||
48 | , discover | ||
49 | |||
50 | , PeerSession ( connectedPeerAddr, enabledExtensions ) | ||
51 | , getSessionState | ||
52 | |||
53 | , SessionState (..) | ||
54 | ) where | ||
55 | |||
56 | import Prelude hiding (mapM_, elem) | ||
57 | |||
58 | import Control.Applicative | ||
59 | import Control.Concurrent | ||
60 | import Control.Concurrent.STM | ||
61 | import Control.Concurrent.BoundedChan as BC | ||
62 | import Control.Concurrent.MSem as MSem | ||
63 | import Control.Monad (forever, (>=>)) | ||
64 | import Control.Exception | ||
65 | import Control.Monad.Trans | ||
66 | |||
67 | import Data.IORef | ||
68 | import Data.Map as M | ||
69 | import Data.HashMap.Strict as HM | ||
70 | import Data.Foldable as F | ||
71 | import Data.Set as S | ||
72 | |||
73 | import Network hiding (accept) | ||
74 | import Network.BSD | ||
75 | import Network.Socket | ||
76 | |||
77 | import Data.Torrent.Bitfield as BF | ||
78 | import Data.Torrent | ||
79 | import Network.BitTorrent.Extension | ||
80 | import Network.BitTorrent.Peer | ||
81 | import Network.BitTorrent.Sessions.Types | ||
82 | import Network.BitTorrent.Exchange.Protocol as BT | ||
83 | import Network.BitTorrent.Tracker.Protocol as BT | ||
84 | import Network.BitTorrent.Tracker as BT | ||
85 | import Network.BitTorrent.Exchange as BT | ||
86 | import System.Torrent.Storage | ||
87 | |||
88 | {----------------------------------------------------------------------- | ||
89 | Client Services | ||
90 | -----------------------------------------------------------------------} | ||
91 | |||
92 | startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO () | ||
93 | startService s port m = do | ||
94 | stopService s | ||
95 | putMVar s =<< spawn | ||
96 | where | ||
97 | spawn = ClientService port <$> forkIO (m port) | ||
98 | |||
99 | stopService :: MVar ClientService -> IO () | ||
100 | stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread) | ||
101 | |||
102 | -- | Service A might depend on service B. | ||
103 | withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO () | ||
104 | withRunning dep failure action = tryTakeMVar dep >>= maybe failure action | ||
105 | |||
106 | {----------------------------------------------------------------------- | ||
107 | Torrent presence | ||
108 | -----------------------------------------------------------------------} | ||
109 | |||
110 | data TorrentPresence = Active SwarmSession | ||
111 | | Registered TorrentLoc | ||
112 | | Unknown | ||
113 | |||
114 | torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence | ||
115 | torrentPresence ClientSession {..} ih = do | ||
116 | sws <- readTVarIO swarmSessions | ||
117 | case M.lookup ih sws of | ||
118 | Just ss -> return $ Active ss | ||
119 | Nothing -> do | ||
120 | tm <- readTVarIO torrentMap | ||
121 | return $ maybe Unknown Registered $ HM.lookup ih tm | ||
122 | |||
123 | {----------------------------------------------------------------------- | ||
124 | Client sessions | ||
125 | -----------------------------------------------------------------------} | ||
126 | |||
127 | startListener :: ClientSession -> PortNumber -> IO () | ||
128 | startListener cs @ ClientSession {..} port = | ||
129 | startService peerListener port $ listener cs $ \conn @ (_, PeerSession{..}) -> do | ||
130 | runP2P conn p2p | ||
131 | |||
132 | -- | Create a new client session. The data passed to this function are | ||
133 | -- usually loaded from configuration file. | ||
134 | openClientSession :: SessionCount -> [Extension] -> PortNumber -> PortNumber -> IO ClientSession | ||
135 | openClientSession n exts listenerPort _ = do | ||
136 | cs <- ClientSession | ||
137 | <$> genPeerId | ||
138 | <*> pure exts | ||
139 | <*> newEmptyMVar | ||
140 | <*> MSem.new n | ||
141 | <*> pure n | ||
142 | <*> newTVarIO M.empty | ||
143 | <*> newTVarIO (startProgress 0) | ||
144 | <*> newTVarIO HM.empty | ||
145 | |||
146 | startListener cs listenerPort | ||
147 | return cs | ||
148 | |||
149 | closeClientSession :: ClientSession -> IO () | ||
150 | closeClientSession ClientSession {..} = do | ||
151 | stopService peerListener | ||
152 | |||
153 | sws <- readTVarIO swarmSessions | ||
154 | forM_ sws closeSwarmSession | ||
155 | |||
156 | withClientSession :: SessionCount -> [Extension] | ||
157 | -> PortNumber -> PortNumber | ||
158 | -> (ClientSession -> IO ()) -> IO () | ||
159 | withClientSession c es l d = bracket (openClientSession c es l d) closeClientSession | ||
160 | |||
161 | -- | Get current global progress of the client. This value is usually | ||
162 | -- shown to a user. | ||
163 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
164 | getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
165 | |||
166 | -- | Get number of swarms client aware of. | ||
167 | getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
168 | getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions | ||
169 | |||
170 | -- | Get number of peers the client currently connected to. | ||
171 | getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
172 | getPeerCount ClientSession {..} = liftIO $ do | ||
173 | unused <- peekAvail activeThreads | ||
174 | return (maxActive - unused) | ||
175 | |||
176 | getActiveSwarms :: ClientSession -> IO [SwarmSession] | ||
177 | getActiveSwarms ClientSession {..} = M.elems <$> readTVarIO swarmSessions | ||
178 | |||
179 | getListenerPort :: ClientSession -> IO PortNumber | ||
180 | getListenerPort ClientSession {..} = servPort <$> readMVar peerListener | ||
181 | |||
182 | {----------------------------------------------------------------------- | ||
183 | Swarm session | ||
184 | -----------------------------------------------------------------------} | ||
185 | |||
186 | defSeederConns :: SessionCount | ||
187 | defSeederConns = defaultUnchokeSlots | ||
188 | |||
189 | defLeecherConns :: SessionCount | ||
190 | defLeecherConns = defaultNumWant | ||
191 | |||
192 | -- discovery should hide tracker and DHT communication under the hood | ||
193 | -- thus we can obtain an unified interface | ||
194 | |||
195 | discover :: SwarmSession -> IO () | ||
196 | discover swarm @ SwarmSession {..} = {-# SCC discover #-} do | ||
197 | port <- getListenerPort clientSession | ||
198 | |||
199 | let conn = TConnection { | ||
200 | tconnAnnounce = tAnnounce torrentMeta | ||
201 | , tconnInfoHash = tInfoHash torrentMeta | ||
202 | , tconnPeerId = clientPeerId clientSession | ||
203 | , tconnPort = port | ||
204 | } | ||
205 | |||
206 | let progress = currentProgress clientSession | ||
207 | ch <- newBoundedChan 100 -- TODO | ||
208 | tid <- forkIO $ tracker ch progress conn | ||
209 | forever $ do | ||
210 | addr <- BC.readChan ch | ||
211 | forkThrottle swarm $ do | ||
212 | initiatePeerSession swarm addr $ \pconn -> do | ||
213 | print addr | ||
214 | runP2P pconn p2p | ||
215 | |||
216 | registerSwarmSession :: SwarmSession -> STM () | ||
217 | registerSwarmSession ss @ SwarmSession {..} = | ||
218 | modifyTVar' (swarmSessions clientSession) $ | ||
219 | M.insert (tInfoHash torrentMeta) ss | ||
220 | |||
221 | unregisterSwarmSession :: SwarmSession -> STM () | ||
222 | unregisterSwarmSession SwarmSession {..} = | ||
223 | modifyTVar' (swarmSessions clientSession) $ | ||
224 | M.delete $ tInfoHash torrentMeta | ||
225 | |||
226 | openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession | ||
227 | openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do | ||
228 | t <- validateLocation loc | ||
229 | let bf = haveNone $ pieceCount $ tInfo t | ||
230 | |||
231 | ss <- SwarmSession t cs | ||
232 | <$> MSem.new defLeecherConns | ||
233 | <*> openStorage t dataDirPath bf | ||
234 | <*> newTVarIO S.empty | ||
235 | <*> newBroadcastTChanIO | ||
236 | |||
237 | atomically $ do | ||
238 | modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t | ||
239 | registerSwarmSession ss | ||
240 | |||
241 | _ <- forkIO $ discover ss | ||
242 | |||
243 | return ss | ||
244 | |||
245 | closeSwarmSession :: SwarmSession -> IO () | ||
246 | closeSwarmSession se @ SwarmSession {..} = do | ||
247 | atomically $ unregisterSwarmSession se | ||
248 | -- TODO stop discovery | ||
249 | -- TODO killall peer sessions | ||
250 | -- TODO the order is important! | ||
251 | closeStorage storage | ||
252 | |||
253 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
254 | getSwarm cs @ ClientSession {..} ih = do | ||
255 | tstatus <- torrentPresence cs ih | ||
256 | case tstatus of | ||
257 | Unknown -> throw $ UnknownTorrent ih | ||
258 | Active sw -> return sw | ||
259 | Registered loc -> openSwarmSession cs loc | ||
260 | |||
261 | -- TODO do not spawn session! | ||
262 | getStorage :: ClientSession -> InfoHash -> IO Storage | ||
263 | getStorage cs ih = storage <$> getSwarm cs ih | ||
264 | |||
265 | -- TODO keep sorted? | ||
266 | getActivePeers :: SwarmSession -> IO [PeerSession] | ||
267 | getActivePeers SwarmSession {..} = S.toList <$> readTVarIO connectedPeers | ||
268 | |||
269 | getTorrentInfo :: ClientSession -> InfoHash -> IO (Maybe Torrent) | ||
270 | getTorrentInfo cs ih = do | ||
271 | tstatus <- torrentPresence cs ih | ||
272 | case tstatus of | ||
273 | Unknown -> return Nothing | ||
274 | Active (SwarmSession {..}) -> return $ Just torrentMeta | ||
275 | Registered (TorrentLoc {..}) -> Just <$> fromFile metafilePath | ||
276 | |||
277 | -- | Get the number of connected peers in the given swarm. | ||
278 | getSessionCount :: SwarmSession -> IO SessionCount | ||
279 | getSessionCount SwarmSession {..} = do | ||
280 | S.size <$> readTVarIO connectedPeers | ||
281 | |||
282 | swarmHandshake :: SwarmSession -> Handshake | ||
283 | swarmHandshake SwarmSession {..} = Handshake { | ||
284 | hsProtocol = defaultBTProtocol | ||
285 | , hsReserved = encodeExts $ allowedExtensions $ clientSession | ||
286 | , hsInfoHash = tInfoHash torrentMeta | ||
287 | , hsPeerId = clientPeerId $ clientSession | ||
288 | } | ||
289 | |||
290 | {----------------------------------------------------------------------- | ||
291 | Peer sessions throttling | ||
292 | -----------------------------------------------------------------------} | ||
293 | |||
294 | -- | The number of threads suitable for a typical BT client. | ||
295 | defaultThreadCount :: ThreadCount | ||
296 | defaultThreadCount = 1000 | ||
297 | |||
298 | enterSwarm :: SwarmSession -> IO () | ||
299 | enterSwarm SwarmSession {..} = do | ||
300 | MSem.wait (activeThreads clientSession) | ||
301 | MSem.wait vacantPeers `onException` | ||
302 | MSem.signal (activeThreads clientSession) | ||
303 | |||
304 | leaveSwarm :: SwarmSession -> IO () | ||
305 | leaveSwarm SwarmSession {..} = mask_ $ do | ||
306 | MSem.signal vacantPeers | ||
307 | MSem.signal (activeThreads clientSession) | ||
308 | |||
309 | forkThrottle :: SwarmSession -> IO () -> IO ThreadId | ||
310 | forkThrottle se action = do | ||
311 | enterSwarm se | ||
312 | (forkIO $ do | ||
313 | action `finally` leaveSwarm se) | ||
314 | `onException` leaveSwarm se | ||
315 | |||
316 | -- TODO: check content files location; | ||
317 | validateLocation :: TorrentLoc -> IO Torrent | ||
318 | validateLocation = fromFile . metafilePath | ||
319 | |||
320 | registerTorrent :: ClientSession -> TorrentLoc -> IO () | ||
321 | registerTorrent ClientSession {..} loc @ TorrentLoc {..} = do | ||
322 | torrent <- fromFile metafilePath | ||
323 | atomically $ modifyTVar' torrentMap $ HM.insert (tInfoHash torrent) loc | ||
324 | |||
325 | -- TODO kill sessions | ||
326 | unregisterTorrent :: ClientSession -> InfoHash -> IO () | ||
327 | unregisterTorrent ClientSession {..} ih = do | ||
328 | atomically $ modifyTVar' torrentMap $ HM.delete ih | ||
329 | |||
330 | getRegistered :: ClientSession -> IO TorrentMap | ||
331 | getRegistered ClientSession {..} = readTVarIO torrentMap | ||
332 | |||
333 | {----------------------------------------------------------------------- | ||
334 | Peer session creation | ||
335 | ------------------------------------------------------------------------ | ||
336 | The peer session cycle looks like: | ||
337 | |||
338 | * acquire vacant session and vacant thread slot; | ||
339 | * (fork could be here, but not necessary) | ||
340 | * establish peer connection; | ||
341 | * register peer session; | ||
342 | * ... exchange process ... | ||
343 | * unregister peer session; | ||
344 | * close peer connection; | ||
345 | * release acquired session and thread slot. | ||
346 | |||
347 | TODO: explain why this order | ||
348 | TODO: thread throttling | ||
349 | TODO: check if it connected yet peer | ||
350 | TODO: utilize peer Id. | ||
351 | TODO: use STM semaphore | ||
352 | -----------------------------------------------------------------------} | ||
353 | |||
354 | registerPeerSession :: PeerSession -> IO () | ||
355 | registerPeerSession ps @ PeerSession {..} = | ||
356 | atomically $ modifyTVar' (connectedPeers swarmSession) (S.insert ps) | ||
357 | |||
358 | unregisterPeerSession :: PeerSession -> IO () | ||
359 | unregisterPeerSession ps @ PeerSession {..} = | ||
360 | atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps) | ||
361 | |||
362 | openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | ||
363 | openSession ss @ SwarmSession {..} addr Handshake {..} = do | ||
364 | let clientCaps = encodeExts $ allowedExtensions $ clientSession | ||
365 | let enabled = decodeExts (enabledCaps clientCaps hsReserved) | ||
366 | |||
367 | bf <- getClientBitfield ss | ||
368 | ps <- PeerSession addr ss enabled | ||
369 | <$> atomically (dupTChan broadcastMessages) | ||
370 | <*> newIORef (initialSessionState (totalCount bf)) | ||
371 | -- TODO we could implement more interesting throtling scheme | ||
372 | -- using connected peer information | ||
373 | registerPeerSession ps | ||
374 | return ps | ||
375 | |||
376 | -- TODO kill thread | ||
377 | closeSession :: PeerSession -> IO () | ||
378 | closeSession = unregisterPeerSession | ||
379 | |||
380 | type PeerConn = (Socket, PeerSession) | ||
381 | type Exchange = PeerConn -> IO () | ||
382 | |||
383 | -- | Exchange action depends on session and socket, whereas session depends | ||
384 | -- on socket: | ||
385 | -- | ||
386 | -- socket------>-----exchange | ||
387 | -- | | | ||
388 | -- \-->--session-->--/ | ||
389 | -- | ||
390 | -- To handle exceptions properly we double bracket socket and session | ||
391 | -- then joining the resources and also ignoring session local exceptions. | ||
392 | -- | ||
393 | runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO () | ||
394 | runSession connector opener action = | ||
395 | handle isSessionException $ | ||
396 | bracket connector close $ \sock -> | ||
397 | bracket (opener sock) closeSession $ \ses -> | ||
398 | action (sock, ses) | ||
399 | |||
400 | -- | Used then the client want to connect to a peer. | ||
401 | initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | ||
402 | initiatePeerSession ss @ SwarmSession {..} addr | ||
403 | = runSession (connectToPeer addr) initiated | ||
404 | where | ||
405 | initiated sock = do | ||
406 | phs <- handshake sock (swarmHandshake ss) | ||
407 | ps <- openSession ss addr phs | ||
408 | return ps | ||
409 | |||
410 | -- | Used the a peer want to connect to the client. | ||
411 | acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO () | ||
412 | acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted | ||
413 | where | ||
414 | accepted sock = do | ||
415 | phs <- recvHandshake sock | ||
416 | swarm <- getSwarm cs $ hsInfoHash phs | ||
417 | ps <- openSession swarm addr phs | ||
418 | sendHandshake sock $ Handshake { | ||
419 | hsProtocol = defaultBTProtocol | ||
420 | , hsReserved = encodeExts $ enabledExtensions ps | ||
421 | , hsInfoHash = hsInfoHash phs | ||
422 | , hsPeerId = clientPeerId | ||
423 | } | ||
424 | return ps | ||
425 | |||
426 | listener :: ClientSession -> Exchange -> PortNumber -> IO () | ||
427 | listener cs action serverPort = bracket openListener close loop | ||
428 | where | ||
429 | loop sock = forever $ handle isIOError $ do | ||
430 | (conn, addr) <- accept sock | ||
431 | putStrLn "accepted" | ||
432 | case addr of | ||
433 | SockAddrInet port host -> do | ||
434 | _ <- forkIO $ do | ||
435 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
436 | return () | ||
437 | _ -> return () | ||
438 | |||
439 | isIOError :: IOError -> IO () | ||
440 | isIOError _ = return () | ||
441 | |||
442 | openListener = do | ||
443 | sock <- socket AF_INET Stream =<< getProtocolNumber "tcp" | ||
444 | bindSocket sock (SockAddrInet serverPort iNADDR_ANY) | ||
445 | listen sock maxListenQueue | ||
446 | return sock | ||
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs deleted file mode 100644 index dea47405..00000000 --- a/src/Network/BitTorrent/Sessions/Types.lhs +++ /dev/null | |||
@@ -1,316 +0,0 @@ | |||
1 | > -- | | ||
2 | > -- Copyright : (c) Sam Truzjan 2013 | ||
3 | > -- License : BSD3 | ||
4 | > -- Maintainer : pxqr.sta@gmail.com | ||
5 | > -- Stability : experimental | ||
6 | > -- Portability : portable | ||
7 | > -- | ||
8 | > | ||
9 | > {-# LANGUAGE RecordWildCards #-} | ||
10 | > {-# LANGUAGE ViewPatterns #-} | ||
11 | > {-# LANGUAGE TemplateHaskell #-} | ||
12 | > {-# LANGUAGE DeriveDataTypeable #-} | ||
13 | > | ||
14 | > module Network.BitTorrent.Sessions.Types | ||
15 | > ( ClientService(..) | ||
16 | > , ThreadCount | ||
17 | > , TorrentLoc (..) | ||
18 | > , TorrentMap | ||
19 | > | ||
20 | > , ClientSession (..) | ||
21 | > | ||
22 | > , SwarmSession (..) | ||
23 | > , getClientBitfield | ||
24 | > , getPending, available | ||
25 | > | ||
26 | > , PeerSession (..) | ||
27 | > , SessionCount | ||
28 | > , findPieceCount | ||
29 | > | ||
30 | > , SessionState (..) | ||
31 | > , status, bitfield | ||
32 | > , initialSessionState | ||
33 | > , getSessionState | ||
34 | > | ||
35 | > , SessionException (..) | ||
36 | > , isSessionException, putSessionException | ||
37 | > ) where | ||
38 | |||
39 | > import Control.Applicative | ||
40 | > import Control.Concurrent | ||
41 | > import Control.Concurrent.STM | ||
42 | > import Control.Concurrent.MSem as MSem | ||
43 | > import Control.Lens | ||
44 | > import Control.Exception | ||
45 | |||
46 | > import Data.IORef | ||
47 | > import Data.Default | ||
48 | > import Data.Function | ||
49 | > import Data.Map as M | ||
50 | > import Data.HashMap.Strict as HM | ||
51 | > import Data.Ord | ||
52 | > import Data.Set as S | ||
53 | > import Data.Typeable | ||
54 | > import Text.PrettyPrint | ||
55 | |||
56 | > import Network | ||
57 | |||
58 | > import Data.Torrent.Bitfield as BF | ||
59 | > import Network.BitTorrent.Extension | ||
60 | > import Network.BitTorrent.Peer | ||
61 | > import Network.BitTorrent.Exchange.Protocol as BT | ||
62 | > import System.Torrent.Storage | ||
63 | |||
64 | Thread layout | ||
65 | ------------------------------------------------------------------------ | ||
66 | |||
67 | When client session created 2 new threads appear: | ||
68 | |||
69 | * Peer listener - accept new P2P connection initiated by other | ||
70 | peers; | ||
71 | |||
72 | * Tracker announcer - announce that the peer have this torrent. | ||
73 | |||
74 | * OPTIONAL: DHT listener - replies to DHT requests; | ||
75 | |||
76 | When swarn session created 3 new threads appear: | ||
77 | |||
78 | * DHT request loop asks for new peers; | ||
79 | |||
80 | * Tracker request loop asks for new peers; | ||
81 | |||
82 | * controller which fork new avaand manage running P2P sessions. | ||
83 | |||
84 | Peer session is one always forked thread. | ||
85 | |||
86 | When client\/swarm\/peer session gets closed kill the corresponding | ||
87 | threads, but flush data to disc. (for e.g. storage block map) | ||
88 | |||
89 | So for e.g., in order to obtain our first block we need to spawn at | ||
90 | least 7 threads: main thread, 2 client session threads, 3 swarm session | ||
91 | threads and PeerSession thread. | ||
92 | |||
93 | Thread throttling | ||
94 | ------------------------------------------------------------------------ | ||
95 | |||
96 | If we will not restrict number of threads we could end up | ||
97 | with thousands of connected swarm and make no particular progress. | ||
98 | |||
99 | Note also we do not bound number of swarms! This is not optimal | ||
100 | strategy because each swarm might have say 1 thread and we could end | ||
101 | up bounded by the meaningless limit. Bounding global number of p2p | ||
102 | sessions should work better, and simpler. | ||
103 | |||
104 | **TODO:** priority based throttling: leecher thread have more priority | ||
105 | than seeder threads. | ||
106 | |||
107 | > -- | Each client might have a limited number of threads. | ||
108 | > type ThreadCount = Int | ||
109 | |||
110 | Client Services | ||
111 | ------------------------------------------------------------------------ | ||
112 | |||
113 | There are two servers started as client start: | ||
114 | |||
115 | * DHT node listener - needed by other peers to discover | ||
116 | * Peer listener - need by other peers to join this client. | ||
117 | |||
118 | Thus any client (assuming DHT is enabled) provides at least 2 services | ||
119 | so we can abstract out into ClientService: | ||
120 | |||
121 | > data ClientService = ClientService { | ||
122 | > servPort :: !PortNumber | ||
123 | > , servThread :: !ThreadId | ||
124 | > } deriving Show | ||
125 | |||
126 | Torrent Map | ||
127 | ------------------------------------------------------------------------ | ||
128 | |||
129 | TODO: keep track global peer have piece set. | ||
130 | |||
131 | Keeping all seeding torrent metafiles in memory is a _bad_ idea: for | ||
132 | 1TB of data we need at least 100MB of metadata. (using 256KB piece | ||
133 | size). This solution do not scale further. | ||
134 | |||
135 | To avoid this we keep just *metainfo* about *metainfo*: | ||
136 | |||
137 | > -- | Local info about torrent location. | ||
138 | > data TorrentLoc = TorrentLoc { | ||
139 | > -- | Full path to .torrent metafile. | ||
140 | > metafilePath :: FilePath | ||
141 | > -- | Full path to directory contating content files associated | ||
142 | > -- with the metafile. | ||
143 | > , dataDirPath :: FilePath | ||
144 | > } deriving Show | ||
145 | |||
146 | TorrentMap is used to keep track all known torrents for the | ||
147 | client. When some peer trying to connect to us it's necessary to | ||
148 | dispatch appropriate 'SwarmSession' (or start new one if there are | ||
149 | none) in the listener loop: we only know 'InfoHash' from 'Handshake' | ||
150 | but nothing more. So to accept new 'PeerSession' we need to lookup | ||
151 | torrent metainfo and content files (if there are some) by the | ||
152 | 'InfoHash' and only after that enter exchange loop. | ||
153 | |||
154 | Solution with TorrentLoc is much better and takes much more less | ||
155 | space, moreover it depends on count of torrents but not on count of | ||
156 | data itself. To scale further, in future we might add something like | ||
157 | database (for e.g. sqlite) for this kind of things. | ||
158 | |||
159 | > -- | Used to find torrent info and data in order to accept connection. | ||
160 | > type TorrentMap = HashMap InfoHash TorrentLoc | ||
161 | |||
162 | While *registering* torrent we need to check if torrent metafile is | ||
163 | correct, all the files are present in the filesystem and so | ||
164 | forth. However content validation using hashes will take a long time, | ||
165 | so we need to do this on demand: if a peer asks for a block, we | ||
166 | validate corresponding piece and only after read and send the block | ||
167 | back. | ||
168 | |||
169 | Client Sessions | ||
170 | ------------------------------------------------------------------------ | ||
171 | |||
172 | Basically, client session should contain options which user | ||
173 | application store in configuration files and related to the | ||
174 | protocol. Moreover it should contain the all client identification | ||
175 | info, for e.g. DHT. | ||
176 | |||
177 | Client session is the basic unit of bittorrent network, it has: | ||
178 | |||
179 | * The /peer ID/ used as unique identifier of the client in | ||
180 | network. Obviously, this value is not changed during client session. | ||
181 | |||
182 | * The number of /protocol extensions/ it might use. This value is | ||
183 | static as well, but if you want to dynamically reconfigure the client | ||
184 | you might kill the end the current session and create a new with the | ||
185 | fresh required extensions. | ||
186 | |||
187 | * The number of /swarms/ to join, each swarm described by the | ||
188 | 'SwarmSession'. | ||
189 | |||
190 | Normally, you would have one client session, however, if we needed, in | ||
191 | one application we could have many clients with different peer ID's | ||
192 | and different enabled extensions at the same time. | ||
193 | |||
194 | > -- | | ||
195 | > data ClientSession = ClientSession { | ||
196 | > -- | Used in handshakes and discovery mechanism. | ||
197 | > clientPeerId :: !PeerId | ||
198 | |||
199 | > -- | Extensions we should try to use. Hovewer some particular peer | ||
200 | > -- might not support some extension, so we keep enabledExtension in | ||
201 | > -- 'PeerSession'. | ||
202 | > , allowedExtensions :: [Extension] | ||
203 | |||
204 | > , peerListener :: !(MVar ClientService) | ||
205 | |||
206 | > -- | Semaphor used to bound number of active P2P sessions. | ||
207 | > , activeThreads :: !(MSem ThreadCount) | ||
208 | |||
209 | > -- | Max number of active connections. | ||
210 | > , maxActive :: !ThreadCount | ||
211 | |||
212 | > -- | Used to traverse the swarm session. | ||
213 | > , swarmSessions :: !(TVar (Map InfoHash SwarmSession)) | ||
214 | |||
215 | > -- | Used to keep track global client progress. | ||
216 | > , currentProgress :: !(TVar Progress) | ||
217 | |||
218 | > -- | Used to keep track available torrents. | ||
219 | > , torrentMap :: !(TVar TorrentMap) | ||
220 | > } | ||
221 | |||
222 | NOTE: currentProgress field is reduntant: progress depends on the all swarm | ||
223 | bitfields maybe we can remove the 'currentProgress' and compute it on | ||
224 | demand? | ||
225 | |||
226 | > instance Eq ClientSession where | ||
227 | > (==) = (==) `on` clientPeerId | ||
228 | |||
229 | > instance Ord ClientSession where | ||
230 | > compare = comparing clientPeerId | ||
231 | |||
232 | Swarm sessions | ||
233 | ------------------------------------------------------------------------ | ||
234 | |||
235 | NOTE: If client is a leecher then there is NO particular reason to | ||
236 | set max sessions count more than the_number_of_unchoke_slots * k: | ||
237 | |||
238 | * thread slot(activeThread semaphore) | ||
239 | * will take but no | ||
240 | |||
241 | So if client is a leecher then max sessions count depends on the | ||
242 | number of unchoke slots. | ||
243 | |||
244 | > -- | Used to bound the number of simultaneous connections and, which | ||
245 | > -- is the same, P2P sessions within the swarm session. | ||
246 | > type SessionCount = Int | ||
247 | |||
248 | However if client is a seeder then the value depends on . | ||
249 | |||
250 | > -- | Swarm session is | ||
251 | > data SwarmSession = SwarmSession { | ||
252 | > torrentMeta :: !Torrent | ||
253 | |||
254 | > , clientSession :: !ClientSession | ||
255 | |||
256 | TODO: lower "vacantPeers" when client becomes seeder according to | ||
257 | throttling policy. | ||
258 | |||
259 | Represent count of peers we _currently_ can connect to in the | ||
260 | swarm. Used to bound number of concurrent threads. See also *Thread | ||
261 | Throttling* section. | ||
262 | |||
263 | > , vacantPeers :: !(MSem SessionCount) | ||
264 | |||
265 | Client bitfield is used to keep track "the client have" piece set. | ||
266 | Modify this carefully always updating global progress. | ||
267 | |||
268 | > , storage :: !Storage | ||
269 | |||
270 | We keep set of the all connected peers for the each particular torrent | ||
271 | to prevent duplicated and therefore reduntant TCP connections. For | ||
272 | example consider the following very simle and realistic scenario: | ||
273 | |||
274 | * Peer A lookup tracker for peers. | ||
275 | |||
276 | * Peer B lookup tracker for peers. | ||
277 | |||
278 | * Finally, Peer A connect to B and Peer B connect to peer A | ||
279 | simultaneously. | ||
280 | |||
281 | There some other situation the problem may occur: duplicates in | ||
282 | successive tracker responses, tracker and DHT returns. So without any | ||
283 | protection we end up with two session between the same peers. That's | ||
284 | bad because this could lead: | ||
285 | |||
286 | * Reduced throughput - multiple sessions between the same peers will | ||
287 | mutiply control overhead (control messages, session state). | ||
288 | |||
289 | * Thread occupation - duplicated sessions will eat thread slots and | ||
290 | discourage other, possible more useful, peers to establish connection. | ||
291 | |||
292 | To avoid this we could check, into the one transaction, if a peer is | ||
293 | already connected and add a connection only if it is not. | ||
294 | |||
295 | > , connectedPeers :: !(TVar (Set PeerSession)) | ||
296 | |||
297 | TODO: use bounded broadcast chan with priority queue and drop old entries. | ||
298 | |||
299 | Channel used for replicate messages across all peers in swarm. For | ||
300 | exsample if we get some piece we should sent to all connected (and | ||
301 | interested in) peers HAVE message. | ||
302 | |||
303 | > , broadcastMessages :: !(TChan Message) | ||
304 | > } | ||
305 | |||
306 | INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers | ||
307 | |||
308 | > instance Eq SwarmSession where | ||
309 | > (==) = (==) `on` (tInfoHash . torrentMeta) | ||
310 | |||
311 | > instance Ord SwarmSession where | ||
312 | > compare = comparing (tInfoHash . torrentMeta) | ||
313 | |||
314 | > getClientBitfield :: SwarmSession -> IO Bitfield | ||
315 | > getClientBitfield SwarmSession {..} = atomically $ getCompleteBitfield storage | ||
316 | |||