diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-07-07 03:21:49 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-07-07 03:21:49 +0400 |
commit | c4b95aaa5709a326167a9301a5a5b25500e299c4 (patch) | |
tree | 33996b884741c4ed81903febf148fbff418eb7d0 /src | |
parent | 0fe7e4ebd2f7ffd9f71738d6683427a110e58497 (diff) |
Literate Internal module a bit
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 686 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.lhs | 686 |
2 files changed, 686 insertions, 686 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs deleted file mode 100644 index dd5f3bb7..00000000 --- a/src/Network/BitTorrent/Internal.hs +++ /dev/null | |||
@@ -1,686 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam T. 2013 | ||
3 | -- License : MIT | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module implement opaque broadcast message passing. It | ||
9 | -- provides sessions needed by Network.BitTorrent and | ||
10 | -- Network.BitTorrent.Exchange and modules. To hide some internals | ||
11 | -- of this module we detach it from Exchange. | ||
12 | -- | ||
13 | -- Thread layout | ||
14 | -- | ||
15 | -- When client session created 2 new threads appear: | ||
16 | -- | ||
17 | -- * DHT listener - replies to DHT requests; | ||
18 | -- | ||
19 | -- * Peer listener - accept new P2P connection initiated by other | ||
20 | -- peers. | ||
21 | -- | ||
22 | -- When swarn session created 3 new threads appear: | ||
23 | -- | ||
24 | -- * DHT request loop asks for new peers; | ||
25 | -- | ||
26 | -- * Tracker request loop asks for new peers; | ||
27 | -- | ||
28 | -- * controller which fork new avaand manage running P2P sessions. | ||
29 | -- | ||
30 | -- Peer session is one always forked thread. | ||
31 | -- | ||
32 | -- When client\/swarm\/peer session gets closed kill the | ||
33 | -- corresponding threads, but flush data to disc. (for e.g. storage | ||
34 | -- block map) | ||
35 | -- | ||
36 | -- So for e.g., in order to obtain our first block we need to run at | ||
37 | -- least 7 threads: main thread, 2 client session thread, 3 swarm | ||
38 | -- session threads and PeerSession thread. | ||
39 | -- | ||
40 | -- | ||
41 | -- NOTE: expose only static data in data field lists, all dynamic | ||
42 | -- data should be modified through standalone functions. | ||
43 | -- | ||
44 | {-# LANGUAGE OverloadedStrings #-} | ||
45 | {-# LANGUAGE RecordWildCards #-} | ||
46 | {-# LANGUAGE ViewPatterns #-} | ||
47 | {-# LANGUAGE TemplateHaskell #-} | ||
48 | {-# LANGUAGE DeriveDataTypeable #-} | ||
49 | module Network.BitTorrent.Internal | ||
50 | ( Progress(..), startProgress | ||
51 | |||
52 | -- * Client | ||
53 | , ClientSession (clientPeerId, allowedExtensions, listenerPort) | ||
54 | |||
55 | , ThreadCount | ||
56 | , defaultThreadCount | ||
57 | |||
58 | , TorrentLoc(..) | ||
59 | , registerTorrent | ||
60 | , unregisterTorrent | ||
61 | |||
62 | , newClient | ||
63 | |||
64 | , getCurrentProgress | ||
65 | , getSwarmCount | ||
66 | , getPeerCount | ||
67 | |||
68 | |||
69 | -- * Swarm | ||
70 | , SwarmSession( SwarmSession, torrentMeta, clientSession ) | ||
71 | |||
72 | , SessionCount | ||
73 | , getSessionCount | ||
74 | |||
75 | , newLeecher | ||
76 | , newSeeder | ||
77 | , getClientBitfield | ||
78 | |||
79 | , enterSwarm | ||
80 | , leaveSwarm | ||
81 | , waitVacancy | ||
82 | |||
83 | , pieceLength | ||
84 | |||
85 | -- * Peer | ||
86 | , PeerSession( PeerSession, connectedPeerAddr | ||
87 | , swarmSession, enabledExtensions | ||
88 | , sessionState | ||
89 | ) | ||
90 | , SessionState | ||
91 | , withPeerSession | ||
92 | |||
93 | -- ** Broadcasting | ||
94 | , available | ||
95 | , getPending | ||
96 | |||
97 | -- ** Exceptions | ||
98 | , SessionException(..) | ||
99 | , isSessionException | ||
100 | , putSessionException | ||
101 | |||
102 | -- ** Properties | ||
103 | , bitfield, status | ||
104 | , findPieceCount | ||
105 | |||
106 | -- * Timeouts | ||
107 | , updateIncoming, updateOutcoming | ||
108 | ) where | ||
109 | |||
110 | import Prelude hiding (mapM_) | ||
111 | |||
112 | import Control.Applicative | ||
113 | import Control.Concurrent | ||
114 | import Control.Concurrent.STM | ||
115 | import Control.Concurrent.MSem as MSem | ||
116 | import Control.Lens | ||
117 | import Control.Exception | ||
118 | import Control.Monad.Trans | ||
119 | |||
120 | import Data.IORef | ||
121 | import Data.Default | ||
122 | import Data.Function | ||
123 | import Data.Foldable (mapM_) | ||
124 | import Data.HashMap.Strict as HM | ||
125 | import Data.Ord | ||
126 | import Data.Set as S | ||
127 | import Data.Typeable | ||
128 | |||
129 | import Data.Serialize hiding (get) | ||
130 | import Text.PrettyPrint | ||
131 | |||
132 | import Network | ||
133 | import Network.Socket | ||
134 | import Network.Socket.ByteString | ||
135 | |||
136 | import GHC.Event as Ev | ||
137 | |||
138 | import Data.Bitfield as BF | ||
139 | import Data.Torrent | ||
140 | import Network.BitTorrent.Extension | ||
141 | import Network.BitTorrent.Peer | ||
142 | import Network.BitTorrent.Exchange.Protocol as BT | ||
143 | import Network.BitTorrent.Tracker.Protocol as BT | ||
144 | |||
145 | {----------------------------------------------------------------------- | ||
146 | Progress | ||
147 | -----------------------------------------------------------------------} | ||
148 | |||
149 | -- | 'Progress' contains upload/download/left stats about | ||
150 | -- current client state and used to notify the tracker. | ||
151 | -- | ||
152 | -- This data is considered as dynamic within one client | ||
153 | -- session. This data also should be shared across client | ||
154 | -- application sessions (e.g. files), otherwise use 'startProgress' | ||
155 | -- to get initial 'Progress'. | ||
156 | -- | ||
157 | data Progress = Progress { | ||
158 | _uploaded :: !Integer -- ^ Total amount of bytes uploaded. | ||
159 | , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. | ||
160 | , _left :: !Integer -- ^ Total amount of bytes left. | ||
161 | } deriving (Show, Read, Eq) | ||
162 | |||
163 | -- TODO use atomic bits and Word64 | ||
164 | |||
165 | $(makeLenses ''Progress) | ||
166 | |||
167 | -- | Initial progress is used when there are no session before. | ||
168 | -- | ||
169 | -- Please note that tracker might penalize client some way if the do | ||
170 | -- not accumulate progress. If possible and save 'Progress' between | ||
171 | -- client sessions to avoid that. | ||
172 | -- | ||
173 | startProgress :: Integer -> Progress | ||
174 | startProgress = Progress 0 0 | ||
175 | |||
176 | -- | Used when the client download some data from /any/ peer. | ||
177 | downloadedProgress :: Int -> Progress -> Progress | ||
178 | downloadedProgress (fromIntegral -> amount) | ||
179 | = (left -~ amount) | ||
180 | . (downloaded +~ amount) | ||
181 | {-# INLINE downloadedProgress #-} | ||
182 | |||
183 | -- | Used when the client upload some data to /any/ peer. | ||
184 | uploadedProgress :: Int -> Progress -> Progress | ||
185 | uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
186 | {-# INLINE uploadedProgress #-} | ||
187 | |||
188 | -- | Used when leecher join client session. | ||
189 | enqueuedProgress :: Integer -> Progress -> Progress | ||
190 | enqueuedProgress amount = left +~ amount | ||
191 | {-# INLINE enqueuedProgress #-} | ||
192 | |||
193 | -- | Used when leecher leave client session. | ||
194 | -- (e.g. user deletes not completed torrent) | ||
195 | dequeuedProgress :: Integer -> Progress -> Progress | ||
196 | dequeuedProgress amount = left -~ amount | ||
197 | {-# INLINE dequeuedProgress #-} | ||
198 | |||
199 | {----------------------------------------------------------------------- | ||
200 | Client session | ||
201 | -----------------------------------------------------------------------} | ||
202 | |||
203 | {- NOTE: If we will not restrict number of threads we could end up | ||
204 | with thousands of connected swarm and make no particular progress. | ||
205 | |||
206 | Note also we do not bound number of swarms! This is not optimal | ||
207 | strategy because each swarm might have say 1 thread and we could end | ||
208 | up bounded by the meaningless limit. Bounding global number of p2p | ||
209 | sessions should work better, and simpler.-} | ||
210 | |||
211 | -- | Each client might have a limited number of threads. | ||
212 | type ThreadCount = Int | ||
213 | |||
214 | -- | The number of threads suitable for a typical BT client. | ||
215 | defaultThreadCount :: ThreadCount | ||
216 | defaultThreadCount = 1000 | ||
217 | |||
218 | {- PERFORMANCE NOTE: keeping torrent metafiles in memory is a _bad_ | ||
219 | idea: for 1TB of data we need at least 100MB of metadata. (using 256KB | ||
220 | piece size). This solution do not scale further. Solution with | ||
221 | TorrentLoc is much better and takes much more less space, moreover it | ||
222 | depends on count of torrents but not on count of data itself. To scale | ||
223 | further, in future we might add something like database (for | ||
224 | e.g. sqlite) for this kind of things.-} | ||
225 | |||
226 | -- | Identifies location of | ||
227 | data TorrentLoc = TorrentLoc { | ||
228 | metafilePath :: FilePath | ||
229 | , dataPath :: FilePath | ||
230 | } | ||
231 | |||
232 | validateTorrent :: TorrentLoc -> IO () | ||
233 | validateTorrent = error "validateTorrent: not implemented" | ||
234 | |||
235 | -- | TorrentMap is used to keep track all known torrents for the | ||
236 | -- client. When some peer trying to connect to us it's necessary to | ||
237 | -- dispatch appropriate 'SwarmSession' (or start new one if there are | ||
238 | -- none) in the listener loop: we only know 'InfoHash' from | ||
239 | -- 'Handshake' but nothing more. So to accept new 'PeerSession' we | ||
240 | -- need to lookup torrent metainfo and content files (if there are | ||
241 | -- some) by the 'InfoHash' and only after that enter exchange loop. | ||
242 | -- | ||
243 | type TorrentMap = HashMap InfoHash TorrentLoc | ||
244 | |||
245 | {- NOTE: basically, client session should contain options which user | ||
246 | app store in configuration files. (related to the protocol) Moreover | ||
247 | it should contain the all client identification info. (e.g. DHT) -} | ||
248 | |||
249 | -- | Client session is the basic unit of bittorrent network, it has: | ||
250 | -- | ||
251 | -- * The /peer ID/ used as unique identifier of the client in | ||
252 | -- network. Obviously, this value is not changed during client | ||
253 | -- session. | ||
254 | -- | ||
255 | -- * The number of /protocol extensions/ it might use. This value | ||
256 | -- is static as well, but if you want to dynamically reconfigure | ||
257 | -- the client you might kill the end the current session and | ||
258 | -- create a new with the fresh required extensions. | ||
259 | -- | ||
260 | -- * The number of /swarms/ to join, each swarm described by the | ||
261 | -- 'SwarmSession'. | ||
262 | -- | ||
263 | -- Normally, you would have one client session, however, if we need, | ||
264 | -- in one application we could have many clients with different peer | ||
265 | -- ID's and different enabled extensions at the same time. | ||
266 | -- | ||
267 | data ClientSession = ClientSession { | ||
268 | -- | Used in handshakes and discovery mechanism. | ||
269 | clientPeerId :: !PeerId | ||
270 | |||
271 | -- | Extensions we should try to use. Hovewer some particular peer | ||
272 | -- might not support some extension, so we keep enabledExtension in | ||
273 | -- 'PeerSession'. | ||
274 | , allowedExtensions :: [Extension] | ||
275 | |||
276 | -- | Port where client listen for other peers | ||
277 | , listenerPort :: PortNumber | ||
278 | -- TODO restart listener if it fail | ||
279 | |||
280 | -- | Semaphor used to bound number of active P2P sessions. | ||
281 | , activeThreads :: !(MSem ThreadCount) | ||
282 | |||
283 | -- | Max number of active connections. | ||
284 | , maxActive :: !ThreadCount | ||
285 | |||
286 | -- | Used to traverse the swarm session. | ||
287 | , swarmSessions :: !(TVar (Set SwarmSession)) | ||
288 | |||
289 | , eventManager :: !EventManager | ||
290 | |||
291 | -- | Used to keep track global client progress. | ||
292 | , currentProgress :: !(TVar Progress) | ||
293 | |||
294 | -- | Used to keep track available torrents. | ||
295 | , torrentMap :: !(TVar TorrentMap) | ||
296 | } | ||
297 | |||
298 | -- currentProgress field is reduntant: progress depends on the all swarm bitfields | ||
299 | -- maybe we can remove the 'currentProgress' and compute it on demand? | ||
300 | |||
301 | |||
302 | instance Eq ClientSession where | ||
303 | (==) = (==) `on` clientPeerId | ||
304 | |||
305 | instance Ord ClientSession where | ||
306 | compare = comparing clientPeerId | ||
307 | |||
308 | -- | Get current global progress of the client. This value is usually | ||
309 | -- shown to a user. | ||
310 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
311 | getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
312 | |||
313 | -- | Get number of swarms client aware of. | ||
314 | getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
315 | getSwarmCount ClientSession {..} = liftIO $ | ||
316 | S.size <$> readTVarIO swarmSessions | ||
317 | |||
318 | -- | Get number of peers the client currently connected to. | ||
319 | getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
320 | getPeerCount ClientSession {..} = liftIO $ do | ||
321 | unused <- peekAvail activeThreads | ||
322 | return (maxActive - unused) | ||
323 | |||
324 | -- | Create a new client session. The data passed to this function are | ||
325 | -- usually loaded from configuration file. | ||
326 | newClient :: SessionCount -- ^ Maximum count of active P2P Sessions. | ||
327 | -> [Extension] -- ^ Extensions allowed to use. | ||
328 | -> IO ClientSession -- ^ Client with unique peer ID. | ||
329 | |||
330 | newClient n exts = do | ||
331 | mgr <- Ev.new | ||
332 | -- TODO kill this thread when leave client | ||
333 | _ <- forkIO $ loop mgr | ||
334 | |||
335 | ClientSession | ||
336 | <$> newPeerId | ||
337 | <*> pure exts | ||
338 | <*> forkListener (error "listener") | ||
339 | <*> MSem.new n | ||
340 | <*> pure n | ||
341 | <*> newTVarIO S.empty | ||
342 | <*> pure mgr | ||
343 | <*> newTVarIO (startProgress 0) | ||
344 | <*> newTVarIO HM.empty | ||
345 | |||
346 | registerTorrent :: ClientSession -> InfoHash -> TorrentLoc -> STM () | ||
347 | registerTorrent ClientSession {..} ih tl = do | ||
348 | modifyTVar' torrentMap $ HM.insert ih tl | ||
349 | |||
350 | unregisterTorrent :: ClientSession -> InfoHash -> STM () | ||
351 | unregisterTorrent ClientSession {..} ih = do | ||
352 | modifyTVar' torrentMap $ HM.delete ih | ||
353 | |||
354 | {----------------------------------------------------------------------- | ||
355 | Swarm session | ||
356 | -----------------------------------------------------------------------} | ||
357 | |||
358 | {- NOTE: If client is a leecher then there is NO particular reason to | ||
359 | set max sessions count more than the_number_of_unchoke_slots * k: | ||
360 | |||
361 | * thread slot(activeThread semaphore) | ||
362 | * will take but no | ||
363 | |||
364 | So if client is a leecher then max sessions count depends on the | ||
365 | number of unchoke slots. | ||
366 | |||
367 | However if client is a seeder then the value depends on . | ||
368 | -} | ||
369 | |||
370 | -- | Used to bound the number of simultaneous connections and, which | ||
371 | -- is the same, P2P sessions within the swarm session. | ||
372 | type SessionCount = Int | ||
373 | |||
374 | defSeederConns :: SessionCount | ||
375 | defSeederConns = defaultUnchokeSlots | ||
376 | |||
377 | defLeacherConns :: SessionCount | ||
378 | defLeacherConns = defaultNumWant | ||
379 | |||
380 | -- | Swarm session is | ||
381 | data SwarmSession = SwarmSession { | ||
382 | torrentMeta :: !Torrent | ||
383 | |||
384 | -- | | ||
385 | , clientSession :: !ClientSession | ||
386 | |||
387 | -- | Represent count of peers we _currently_ can connect to in the | ||
388 | -- swarm. Used to bound number of concurrent threads. | ||
389 | , vacantPeers :: !(MSem SessionCount) | ||
390 | |||
391 | -- | Modify this carefully updating global progress. | ||
392 | , clientBitfield :: !(TVar Bitfield) | ||
393 | |||
394 | , connectedPeers :: !(TVar (Set PeerSession)) | ||
395 | |||
396 | -- TODO use bounded broadcast chan with priority queue and drop old entries | ||
397 | -- | Channel used for replicate messages across all peers in | ||
398 | -- swarm. For exsample if we get some piece we should sent to all | ||
399 | -- connected (and interested in) peers HAVE message. | ||
400 | -- | ||
401 | , broadcastMessages :: !(TChan Message) | ||
402 | } | ||
403 | |||
404 | -- INVARIANT: | ||
405 | -- max_sessions_count - sizeof connectedPeers = value vacantPeers | ||
406 | |||
407 | instance Eq SwarmSession where | ||
408 | (==) = (==) `on` (tInfoHash . torrentMeta) | ||
409 | |||
410 | instance Ord SwarmSession where | ||
411 | compare = comparing (tInfoHash . torrentMeta) | ||
412 | |||
413 | newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | ||
414 | -> IO SwarmSession | ||
415 | newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
416 | = SwarmSession <$> pure t | ||
417 | <*> pure cs | ||
418 | <*> MSem.new n | ||
419 | <*> newTVarIO bf | ||
420 | <*> newTVarIO S.empty | ||
421 | <*> newBroadcastTChanIO | ||
422 | |||
423 | -- | New swarm session in which the client allowed to upload only. | ||
424 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
425 | newSeeder cs t @ Torrent {..} | ||
426 | = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
427 | |||
428 | -- | New swarm in which the client allowed both download and upload. | ||
429 | newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
430 | newLeecher cs t @ Torrent {..} = do | ||
431 | se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
432 | atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
433 | return se | ||
434 | |||
435 | --isLeacher :: SwarmSession -> IO Bool | ||
436 | --isLeacher = undefined | ||
437 | |||
438 | -- | Get the number of connected peers in the given swarm. | ||
439 | getSessionCount :: SwarmSession -> IO SessionCount | ||
440 | getSessionCount SwarmSession {..} = do | ||
441 | S.size <$> readTVarIO connectedPeers | ||
442 | |||
443 | getClientBitfield :: SwarmSession -> IO Bitfield | ||
444 | getClientBitfield = readTVarIO . clientBitfield | ||
445 | |||
446 | {- | ||
447 | haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () | ||
448 | haveDone ix = | ||
449 | liftIO $ atomically $ do | ||
450 | bf <- readTVar clientBitfield | ||
451 | writeTVar (have ix bf) | ||
452 | currentProgress | ||
453 | -} | ||
454 | |||
455 | -- acquire/release mechanism: for internal use only | ||
456 | |||
457 | enterSwarm :: SwarmSession -> IO () | ||
458 | enterSwarm SwarmSession {..} = do | ||
459 | MSem.wait (activeThreads clientSession) | ||
460 | MSem.wait vacantPeers | ||
461 | |||
462 | leaveSwarm :: SwarmSession -> IO () | ||
463 | leaveSwarm SwarmSession {..} = do | ||
464 | MSem.signal vacantPeers | ||
465 | MSem.signal (activeThreads clientSession) | ||
466 | |||
467 | waitVacancy :: SwarmSession -> IO () -> IO () | ||
468 | waitVacancy se = | ||
469 | bracket (enterSwarm se) (const (leaveSwarm se)) | ||
470 | . const | ||
471 | |||
472 | pieceLength :: SwarmSession -> Int | ||
473 | pieceLength = ciPieceLength . tInfo . torrentMeta | ||
474 | {-# INLINE pieceLength #-} | ||
475 | |||
476 | {----------------------------------------------------------------------- | ||
477 | Peer session | ||
478 | -----------------------------------------------------------------------} | ||
479 | |||
480 | -- | Peer session contain all data necessary for peer to peer | ||
481 | -- communication. | ||
482 | data PeerSession = PeerSession { | ||
483 | -- | Used as unique 'PeerSession' identifier within one | ||
484 | -- 'SwarmSession'. | ||
485 | connectedPeerAddr :: !PeerAddr | ||
486 | |||
487 | -- | The swarm to which both end points belong to. | ||
488 | , swarmSession :: !SwarmSession | ||
489 | |||
490 | -- | Extensions such that both peer and client support. | ||
491 | , enabledExtensions :: [Extension] | ||
492 | |||
493 | -- | To dissconnect from died peers appropriately we should check | ||
494 | -- if a peer do not sent the KA message within given interval. If | ||
495 | -- yes, we should throw an exception in 'TimeoutCallback' and | ||
496 | -- close session between peers. | ||
497 | -- | ||
498 | -- We should update timeout if we /receive/ any message within | ||
499 | -- timeout interval to keep connection up. | ||
500 | , incomingTimeout :: !TimeoutKey | ||
501 | |||
502 | -- | To send KA message appropriately we should know when was last | ||
503 | -- time we sent a message to a peer. To do that we keep registered | ||
504 | -- timeout in event manager and if we do not sent any message to | ||
505 | -- the peer within given interval then we send KA message in | ||
506 | -- 'TimeoutCallback'. | ||
507 | -- | ||
508 | -- We should update timeout if we /send/ any message within timeout | ||
509 | -- to avoid reduntant KA messages. | ||
510 | -- | ||
511 | , outcomingTimeout :: !TimeoutKey | ||
512 | |||
513 | -- TODO use dupChan for broadcasting | ||
514 | -- | Broadcast messages waiting to be sent to peer. | ||
515 | , pendingMessages :: !(TChan Message) | ||
516 | |||
517 | -- | Dymanic P2P data. | ||
518 | , sessionState :: !(IORef SessionState) | ||
519 | } | ||
520 | |||
521 | -- TODO unpack some fields | ||
522 | |||
523 | data SessionState = SessionState { | ||
524 | _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | ||
525 | , _status :: !SessionStatus -- ^ Status of both peers. | ||
526 | } deriving (Show, Eq) | ||
527 | |||
528 | $(makeLenses ''SessionState) | ||
529 | |||
530 | instance Eq PeerSession where | ||
531 | (==) = (==) `on` connectedPeerAddr | ||
532 | |||
533 | instance Ord PeerSession where | ||
534 | compare = comparing connectedPeerAddr | ||
535 | |||
536 | -- | Exceptions used to interrupt the current P2P session. This | ||
537 | -- exceptions will NOT affect other P2P sessions, DHT, peer <-> | ||
538 | -- tracker, or any other session. | ||
539 | -- | ||
540 | data SessionException = PeerDisconnected | ||
541 | | ProtocolError Doc | ||
542 | deriving (Show, Typeable) | ||
543 | |||
544 | instance Exception SessionException | ||
545 | |||
546 | |||
547 | -- | Do nothing with exception, used with 'handle' or 'try'. | ||
548 | isSessionException :: Monad m => SessionException -> m () | ||
549 | isSessionException _ = return () | ||
550 | |||
551 | -- | The same as 'isSessionException' but output to stdout the catched | ||
552 | -- exception, for debugging purposes only. | ||
553 | putSessionException :: SessionException -> IO () | ||
554 | putSessionException = print | ||
555 | |||
556 | -- TODO modify such that we can use this in listener loop | ||
557 | -- TODO check if it connected yet peer | ||
558 | withPeerSession :: SwarmSession -> PeerAddr | ||
559 | -> ((Socket, PeerSession) -> IO ()) | ||
560 | -> IO () | ||
561 | |||
562 | withPeerSession ss @ SwarmSession {..} addr | ||
563 | = handle isSessionException . bracket openSession closeSession | ||
564 | where | ||
565 | openSession = do | ||
566 | let caps = encodeExts $ allowedExtensions $ clientSession | ||
567 | let ihash = tInfoHash torrentMeta | ||
568 | let pid = clientPeerId $ clientSession | ||
569 | let chs = Handshake defaultBTProtocol caps ihash pid | ||
570 | |||
571 | sock <- connectToPeer addr | ||
572 | phs <- handshake sock chs `onException` close sock | ||
573 | |||
574 | cbf <- readTVarIO clientBitfield | ||
575 | sendAll sock (encode (Bitfield cbf)) | ||
576 | |||
577 | let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) | ||
578 | ps <- PeerSession addr ss enabled | ||
579 | <$> registerTimeout (eventManager clientSession) | ||
580 | maxIncomingTime (return ()) | ||
581 | <*> registerTimeout (eventManager clientSession) | ||
582 | maxOutcomingTime (sendKA sock) | ||
583 | <*> atomically (dupTChan broadcastMessages) | ||
584 | <*> do { | ||
585 | ; tc <- totalCount <$> readTVarIO clientBitfield | ||
586 | ; newIORef (SessionState (haveNone tc) def) | ||
587 | } | ||
588 | |||
589 | atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
590 | |||
591 | return (sock, ps) | ||
592 | |||
593 | closeSession (sock, ps) = do | ||
594 | atomically $ modifyTVar' connectedPeers (S.delete ps) | ||
595 | close sock | ||
596 | |||
597 | findPieceCount :: PeerSession -> PieceCount | ||
598 | findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | ||
599 | |||
600 | {----------------------------------------------------------------------- | ||
601 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece | ||
602 | -----------------------------------------------------------------------} | ||
603 | |||
604 | -- here we should enqueue broadcast messages and keep in mind that: | ||
605 | -- | ||
606 | -- * We should enqueue broadcast events as they are appear. | ||
607 | -- * We should yield broadcast messages as fast as we get them. | ||
608 | -- | ||
609 | -- these 2 phases might differ in time significantly | ||
610 | |||
611 | -- TODO do this; but only when it'll be clean which other broadcast | ||
612 | -- messages & events we should send | ||
613 | |||
614 | -- 1. Update client have bitfield --\____ in one transaction; | ||
615 | -- 2. Update downloaded stats --/ | ||
616 | -- 3. Signal to the all other peer about this. | ||
617 | |||
618 | available :: Bitfield -> SwarmSession -> IO () | ||
619 | available bf se @ SwarmSession {..} = {-# SCC available #-} do | ||
620 | mark >> atomically broadcast | ||
621 | where | ||
622 | mark = do | ||
623 | let bytes = pieceLength se * BF.haveCount bf | ||
624 | atomically $ do | ||
625 | modifyTVar' clientBitfield (BF.union bf) | ||
626 | modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
627 | |||
628 | broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
629 | |||
630 | |||
631 | -- TODO compute size of messages: if it's faster to send Bitfield | ||
632 | -- instead many Have do that | ||
633 | -- | ||
634 | -- also if there is single Have message in queue then the | ||
635 | -- corresponding piece is likely still in memory or disc cache, | ||
636 | -- when we can send SuggestPiece | ||
637 | |||
638 | -- | Get pending messages queue appeared in result of asynchronously | ||
639 | -- changed client state. Resulting queue should be sent to a peer | ||
640 | -- immediately. | ||
641 | getPending :: PeerSession -> IO [Message] | ||
642 | getPending PeerSession {..} = {-# SCC getPending #-} do | ||
643 | atomically (readAvail pendingMessages) | ||
644 | |||
645 | readAvail :: TChan a -> STM [a] | ||
646 | readAvail chan = do | ||
647 | m <- tryReadTChan chan | ||
648 | case m of | ||
649 | Just a -> (:) <$> pure a <*> readAvail chan | ||
650 | Nothing -> return [] | ||
651 | |||
652 | {----------------------------------------------------------------------- | ||
653 | Timeouts | ||
654 | -----------------------------------------------------------------------} | ||
655 | |||
656 | -- for internal use only | ||
657 | |||
658 | sec :: Int | ||
659 | sec = 1000 * 1000 | ||
660 | |||
661 | maxIncomingTime :: Int | ||
662 | maxIncomingTime = 120 * sec | ||
663 | |||
664 | maxOutcomingTime :: Int | ||
665 | maxOutcomingTime = 1 * sec | ||
666 | |||
667 | -- | Should be called after we have received any message from a peer. | ||
668 | updateIncoming :: PeerSession -> IO () | ||
669 | updateIncoming PeerSession {..} = do | ||
670 | updateTimeout (eventManager (clientSession swarmSession)) | ||
671 | incomingTimeout maxIncomingTime | ||
672 | |||
673 | -- | Should be called before we have send any message to a peer. | ||
674 | updateOutcoming :: PeerSession -> IO () | ||
675 | updateOutcoming PeerSession {..} = | ||
676 | updateTimeout (eventManager (clientSession swarmSession)) | ||
677 | outcomingTimeout maxOutcomingTime | ||
678 | |||
679 | sendKA :: Socket -> IO () | ||
680 | sendKA sock {- SwarmSession {..} -} = do | ||
681 | return () | ||
682 | -- print "I'm sending keep alive." | ||
683 | -- sendAll sock (encode BT.KeepAlive) | ||
684 | -- let mgr = eventManager clientSession | ||
685 | -- updateTimeout mgr | ||
686 | -- print "Done.." | ||
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs new file mode 100644 index 00000000..3f8f0371 --- /dev/null +++ b/src/Network/BitTorrent/Internal.lhs | |||
@@ -0,0 +1,686 @@ | |||
1 | > -- | | ||
2 | > -- Copyright : (c) Sam T. 2013 | ||
3 | > -- License : MIT | ||
4 | > -- Maintainer : pxqr.sta@gmail.com | ||
5 | > -- Stability : experimental | ||
6 | > -- Portability : portable | ||
7 | > -- | ||
8 | > -- This module implement opaque broadcast message passing. It | ||
9 | > -- provides sessions needed by Network.BitTorrent and | ||
10 | > -- Network.BitTorrent.Exchange and modules. To hide some internals | ||
11 | > -- of this module we detach it from Exchange. | ||
12 | > -- | ||
13 | > -- | ||
14 | > -- NOTE: expose only static data in data field lists, all dynamic | ||
15 | > -- data should be modified through standalone functions. | ||
16 | > -- | ||
17 | > {-# LANGUAGE OverloadedStrings #-} | ||
18 | > {-# LANGUAGE RecordWildCards #-} | ||
19 | > {-# LANGUAGE ViewPatterns #-} | ||
20 | > {-# LANGUAGE TemplateHaskell #-} | ||
21 | > {-# LANGUAGE DeriveDataTypeable #-} | ||
22 | > module Network.BitTorrent.Internal | ||
23 | > ( Progress(..), startProgress | ||
24 | > -- * Client | ||
25 | > , ClientSession (clientPeerId, allowedExtensions, listenerPort) | ||
26 | > | ||
27 | > , ThreadCount | ||
28 | > , defaultThreadCount | ||
29 | > | ||
30 | > , TorrentLoc(..) | ||
31 | > , registerTorrent | ||
32 | > , unregisterTorrent | ||
33 | > , newClient | ||
34 | |||
35 | > , getCurrentProgress | ||
36 | > , getSwarmCount | ||
37 | > , getPeerCount | ||
38 | |||
39 | |||
40 | |||
41 | > -- * Swarm | ||
42 | > , SwarmSession( SwarmSession, torrentMeta, clientSession ) | ||
43 | |||
44 | > , SessionCount | ||
45 | > , getSessionCount | ||
46 | |||
47 | > , newLeecher | ||
48 | > , newSeeder | ||
49 | > , getClientBitfield | ||
50 | |||
51 | > , enterSwarm | ||
52 | > , leaveSwarm | ||
53 | > , waitVacancy | ||
54 | |||
55 | > , pieceLength | ||
56 | |||
57 | > -- * Peer | ||
58 | > , PeerSession( PeerSession, connectedPeerAddr | ||
59 | > , swarmSession, enabledExtensions | ||
60 | > , sessionState | ||
61 | > ) | ||
62 | > , SessionState | ||
63 | > , withPeerSession | ||
64 | |||
65 | > -- ** Broadcasting | ||
66 | > , available | ||
67 | > , getPending | ||
68 | |||
69 | > -- ** Exceptions | ||
70 | > , SessionException(..) | ||
71 | > , isSessionException | ||
72 | > , putSessionException | ||
73 | |||
74 | > -- ** Properties | ||
75 | > , bitfield, status | ||
76 | > , findPieceCount | ||
77 | |||
78 | > -- * Timeouts | ||
79 | > , updateIncoming, updateOutcoming | ||
80 | > ) where | ||
81 | |||
82 | > import Prelude hiding (mapM_) | ||
83 | |||
84 | > import Control.Applicative | ||
85 | > import Control.Concurrent | ||
86 | > import Control.Concurrent.STM | ||
87 | > import Control.Concurrent.MSem as MSem | ||
88 | > import Control.Lens | ||
89 | > import Control.Exception | ||
90 | > import Control.Monad.Trans | ||
91 | |||
92 | > import Data.IORef | ||
93 | > import Data.Default | ||
94 | > import Data.Function | ||
95 | > import Data.Foldable (mapM_) | ||
96 | > import Data.HashMap.Strict as HM | ||
97 | > import Data.Ord | ||
98 | > import Data.Set as S | ||
99 | > import Data.Typeable | ||
100 | |||
101 | > import Data.Serialize hiding (get) | ||
102 | > import Text.PrettyPrint | ||
103 | |||
104 | > import Network | ||
105 | > import Network.Socket | ||
106 | > import Network.Socket.ByteString | ||
107 | |||
108 | > import GHC.Event as Ev | ||
109 | |||
110 | > import Data.Bitfield as BF | ||
111 | > import Data.Torrent | ||
112 | > import Network.BitTorrent.Extension | ||
113 | > import Network.BitTorrent.Peer | ||
114 | > import Network.BitTorrent.Exchange.Protocol as BT | ||
115 | > import Network.BitTorrent.Tracker.Protocol as BT | ||
116 | |||
117 | > {----------------------------------------------------------------------- | ||
118 | > Progress | ||
119 | > -----------------------------------------------------------------------} | ||
120 | |||
121 | > -- | 'Progress' contains upload/download/left stats about | ||
122 | > -- current client state and used to notify the tracker. | ||
123 | > -- | ||
124 | > -- This data is considered as dynamic within one client | ||
125 | > -- session. This data also should be shared across client | ||
126 | > -- application sessions (e.g. files), otherwise use 'startProgress' | ||
127 | > -- to get initial 'Progress'. | ||
128 | > -- | ||
129 | > data Progress = Progress { | ||
130 | > _uploaded :: !Integer -- ^ Total amount of bytes uploaded. | ||
131 | > , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. | ||
132 | > , _left :: !Integer -- ^ Total amount of bytes left. | ||
133 | > } deriving (Show, Read, Eq) | ||
134 | |||
135 | > -- TODO use atomic bits and Word64 | ||
136 | |||
137 | > $(makeLenses ''Progress) | ||
138 | |||
139 | > -- | Initial progress is used when there are no session before. | ||
140 | > -- | ||
141 | > -- Please note that tracker might penalize client some way if the do | ||
142 | > -- not accumulate progress. If possible and save 'Progress' between | ||
143 | > -- client sessions to avoid that. | ||
144 | > -- | ||
145 | > startProgress :: Integer -> Progress | ||
146 | > startProgress = Progress 0 0 | ||
147 | |||
148 | > -- | Used when the client download some data from /any/ peer. | ||
149 | > downloadedProgress :: Int -> Progress -> Progress | ||
150 | > downloadedProgress (fromIntegral -> amount) | ||
151 | > = (left -~ amount) | ||
152 | > . (downloaded +~ amount) | ||
153 | > {-# INLINE downloadedProgress #-} | ||
154 | |||
155 | > -- | Used when the client upload some data to /any/ peer. | ||
156 | > uploadedProgress :: Int -> Progress -> Progress | ||
157 | > uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
158 | > {-# INLINE uploadedProgress #-} | ||
159 | |||
160 | > -- | Used when leecher join client session. | ||
161 | > enqueuedProgress :: Integer -> Progress -> Progress | ||
162 | > enqueuedProgress amount = left +~ amount | ||
163 | > {-# INLINE enqueuedProgress #-} | ||
164 | |||
165 | > -- | Used when leecher leave client session. | ||
166 | > -- (e.g. user deletes not completed torrent) | ||
167 | > dequeuedProgress :: Integer -> Progress -> Progress | ||
168 | > dequeuedProgress amount = left -~ amount | ||
169 | > {-# INLINE dequeuedProgress #-} | ||
170 | |||
171 | |||
172 | Thread layout | ||
173 | ------------- | ||
174 | |||
175 | When client session created 2 new threads appear: | ||
176 | |||
177 | * DHT listener - replies to DHT requests; | ||
178 | |||
179 | * Peer listener - accept new P2P connection initiated by other | ||
180 | peers. | ||
181 | |||
182 | When swarn session created 3 new threads appear: | ||
183 | |||
184 | * DHT request loop asks for new peers; | ||
185 | |||
186 | * Tracker request loop asks for new peers; | ||
187 | |||
188 | * controller which fork new avaand manage running P2P sessions. | ||
189 | |||
190 | Peer session is one always forked thread. | ||
191 | |||
192 | When client\/swarm\/peer session gets closed kill the corresponding | ||
193 | threads, but flush data to disc. (for e.g. storage block map) | ||
194 | |||
195 | So for e.g., in order to obtain our first block we need to run at | ||
196 | least 7 threads: main thread, 2 client session thread, 3 swarm session | ||
197 | threads and PeerSession thread. | ||
198 | |||
199 | > {----------------------------------------------------------------------- | ||
200 | > Client session | ||
201 | > -----------------------------------------------------------------------} | ||
202 | |||
203 | > {- NOTE: If we will not restrict number of threads we could end up | ||
204 | > with thousands of connected swarm and make no particular progress. | ||
205 | > | ||
206 | > Note also we do not bound number of swarms! This is not optimal | ||
207 | > strategy because each swarm might have say 1 thread and we could end | ||
208 | > up bounded by the meaningless limit. Bounding global number of p2p | ||
209 | > sessions should work better, and simpler.-} | ||
210 | |||
211 | > -- | Each client might have a limited number of threads. | ||
212 | > type ThreadCount = Int | ||
213 | |||
214 | > -- | The number of threads suitable for a typical BT client. | ||
215 | > defaultThreadCount :: ThreadCount | ||
216 | > defaultThreadCount = 1000 | ||
217 | |||
218 | > {- PERFORMANCE NOTE: keeping torrent metafiles in memory is a _bad_ | ||
219 | > idea: for 1TB of data we need at least 100MB of metadata. (using 256KB | ||
220 | > piece size). This solution do not scale further. Solution with | ||
221 | > TorrentLoc is much better and takes much more less space, moreover it | ||
222 | > depends on count of torrents but not on count of data itself. To scale | ||
223 | > further, in future we might add something like database (for | ||
224 | > e.g. sqlite) for this kind of things.-} | ||
225 | |||
226 | > -- | Identifies location of | ||
227 | > data TorrentLoc = TorrentLoc { | ||
228 | > metafilePath :: FilePath | ||
229 | > , dataPath :: FilePath | ||
230 | > } | ||
231 | |||
232 | > validateTorrent :: TorrentLoc -> IO () | ||
233 | > validateTorrent = error "validateTorrent: not implemented" | ||
234 | |||
235 | > -- | TorrentMap is used to keep track all known torrents for the | ||
236 | > -- client. When some peer trying to connect to us it's necessary to | ||
237 | > -- dispatch appropriate 'SwarmSession' (or start new one if there are | ||
238 | > -- none) in the listener loop: we only know 'InfoHash' from | ||
239 | > -- 'Handshake' but nothing more. So to accept new 'PeerSession' we | ||
240 | > -- need to lookup torrent metainfo and content files (if there are | ||
241 | > -- some) by the 'InfoHash' and only after that enter exchange loop. | ||
242 | > -- | ||
243 | > type TorrentMap = HashMap InfoHash TorrentLoc | ||
244 | |||
245 | > {- NOTE: basically, client session should contain options which user | ||
246 | > app store in configuration files. (related to the protocol) Moreover | ||
247 | > it should contain the all client identification info. (e.g. DHT) -} | ||
248 | |||
249 | > -- | Client session is the basic unit of bittorrent network, it has: | ||
250 | > -- | ||
251 | > -- * The /peer ID/ used as unique identifier of the client in | ||
252 | > -- network. Obviously, this value is not changed during client | ||
253 | > -- session. | ||
254 | > -- | ||
255 | > -- * The number of /protocol extensions/ it might use. This value | ||
256 | > -- is static as well, but if you want to dynamically reconfigure | ||
257 | > -- the client you might kill the end the current session and | ||
258 | > -- create a new with the fresh required extensions. | ||
259 | > -- | ||
260 | > -- * The number of /swarms/ to join, each swarm described by the | ||
261 | > -- 'SwarmSession'. | ||
262 | > -- | ||
263 | > -- Normally, you would have one client session, however, if we need, | ||
264 | > -- in one application we could have many clients with different peer | ||
265 | > -- ID's and different enabled extensions at the same time. | ||
266 | > -- | ||
267 | > data ClientSession = ClientSession { | ||
268 | > -- | Used in handshakes and discovery mechanism. | ||
269 | > clientPeerId :: !PeerId | ||
270 | |||
271 | > -- | Extensions we should try to use. Hovewer some particular peer | ||
272 | > -- might not support some extension, so we keep enabledExtension in | ||
273 | > -- 'PeerSession'. | ||
274 | > , allowedExtensions :: [Extension] | ||
275 | |||
276 | > -- | Port where client listen for other peers | ||
277 | > , listenerPort :: PortNumber | ||
278 | > -- TODO restart listener if it fail | ||
279 | |||
280 | > -- | Semaphor used to bound number of active P2P sessions. | ||
281 | > , activeThreads :: !(MSem ThreadCount) | ||
282 | |||
283 | > -- | Max number of active connections. | ||
284 | > , maxActive :: !ThreadCount | ||
285 | |||
286 | > -- | Used to traverse the swarm session. | ||
287 | > , swarmSessions :: !(TVar (Set SwarmSession)) | ||
288 | |||
289 | > , eventManager :: !EventManager | ||
290 | |||
291 | > -- | Used to keep track global client progress. | ||
292 | > , currentProgress :: !(TVar Progress) | ||
293 | |||
294 | > -- | Used to keep track available torrents. | ||
295 | > , torrentMap :: !(TVar TorrentMap) | ||
296 | > } | ||
297 | |||
298 | > -- currentProgress field is reduntant: progress depends on the all swarm bitfields | ||
299 | > -- maybe we can remove the 'currentProgress' and compute it on demand? | ||
300 | |||
301 | |||
302 | > instance Eq ClientSession where | ||
303 | > (==) = (==) `on` clientPeerId | ||
304 | |||
305 | > instance Ord ClientSession where | ||
306 | > compare = comparing clientPeerId | ||
307 | |||
308 | > -- | Get current global progress of the client. This value is usually | ||
309 | > -- shown to a user. | ||
310 | > getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
311 | > getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
312 | |||
313 | > -- | Get number of swarms client aware of. | ||
314 | > getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
315 | > getSwarmCount ClientSession {..} = liftIO $ | ||
316 | > S.size <$> readTVarIO swarmSessions | ||
317 | |||
318 | > -- | Get number of peers the client currently connected to. | ||
319 | > getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
320 | > getPeerCount ClientSession {..} = liftIO $ do | ||
321 | > unused <- peekAvail activeThreads | ||
322 | > return (maxActive - unused) | ||
323 | |||
324 | > -- | Create a new client session. The data passed to this function are | ||
325 | > -- usually loaded from configuration file. | ||
326 | > newClient :: SessionCount -- ^ Maximum count of active P2P Sessions. | ||
327 | > -> [Extension] -- ^ Extensions allowed to use. | ||
328 | > -> IO ClientSession -- ^ Client with unique peer ID. | ||
329 | |||
330 | > newClient n exts = do | ||
331 | > mgr <- Ev.new | ||
332 | > -- TODO kill this thread when leave client | ||
333 | > _ <- forkIO $ loop mgr | ||
334 | |||
335 | > ClientSession | ||
336 | > <$> newPeerId | ||
337 | > <*> pure exts | ||
338 | > <*> forkListener (error "listener") | ||
339 | > <*> MSem.new n | ||
340 | > <*> pure n | ||
341 | > <*> newTVarIO S.empty | ||
342 | > <*> pure mgr | ||
343 | > <*> newTVarIO (startProgress 0) | ||
344 | > <*> newTVarIO HM.empty | ||
345 | |||
346 | > registerTorrent :: ClientSession -> InfoHash -> TorrentLoc -> STM () | ||
347 | > registerTorrent ClientSession {..} ih tl = do | ||
348 | > modifyTVar' torrentMap $ HM.insert ih tl | ||
349 | |||
350 | > unregisterTorrent :: ClientSession -> InfoHash -> STM () | ||
351 | > unregisterTorrent ClientSession {..} ih = do | ||
352 | > modifyTVar' torrentMap $ HM.delete ih | ||
353 | |||
354 | > {----------------------------------------------------------------------- | ||
355 | > Swarm session | ||
356 | > -----------------------------------------------------------------------} | ||
357 | |||
358 | > {- NOTE: If client is a leecher then there is NO particular reason to | ||
359 | > set max sessions count more than the_number_of_unchoke_slots * k: | ||
360 | |||
361 | > * thread slot(activeThread semaphore) | ||
362 | > * will take but no | ||
363 | |||
364 | > So if client is a leecher then max sessions count depends on the | ||
365 | > number of unchoke slots. | ||
366 | |||
367 | > However if client is a seeder then the value depends on . | ||
368 | > -} | ||
369 | |||
370 | > -- | Used to bound the number of simultaneous connections and, which | ||
371 | > -- is the same, P2P sessions within the swarm session. | ||
372 | > type SessionCount = Int | ||
373 | |||
374 | > defSeederConns :: SessionCount | ||
375 | > defSeederConns = defaultUnchokeSlots | ||
376 | |||
377 | > defLeacherConns :: SessionCount | ||
378 | > defLeacherConns = defaultNumWant | ||
379 | |||
380 | > -- | Swarm session is | ||
381 | > data SwarmSession = SwarmSession { | ||
382 | > torrentMeta :: !Torrent | ||
383 | |||
384 | > -- | | ||
385 | > , clientSession :: !ClientSession | ||
386 | |||
387 | > -- | Represent count of peers we _currently_ can connect to in the | ||
388 | > -- swarm. Used to bound number of concurrent threads. | ||
389 | > , vacantPeers :: !(MSem SessionCount) | ||
390 | |||
391 | > -- | Modify this carefully updating global progress. | ||
392 | > , clientBitfield :: !(TVar Bitfield) | ||
393 | |||
394 | > , connectedPeers :: !(TVar (Set PeerSession)) | ||
395 | |||
396 | > -- TODO use bounded broadcast chan with priority queue and drop old entries | ||
397 | > -- | Channel used for replicate messages across all peers in | ||
398 | > -- swarm. For exsample if we get some piece we should sent to all | ||
399 | > -- connected (and interested in) peers HAVE message. | ||
400 | > -- | ||
401 | > , broadcastMessages :: !(TChan Message) | ||
402 | > } | ||
403 | |||
404 | > -- INVARIANT: | ||
405 | > -- max_sessions_count - sizeof connectedPeers = value vacantPeers | ||
406 | |||
407 | > instance Eq SwarmSession where | ||
408 | > (==) = (==) `on` (tInfoHash . torrentMeta) | ||
409 | |||
410 | > instance Ord SwarmSession where | ||
411 | > compare = comparing (tInfoHash . torrentMeta) | ||
412 | |||
413 | > newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | ||
414 | > -> IO SwarmSession | ||
415 | > newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
416 | > = SwarmSession <$> pure t | ||
417 | > <*> pure cs | ||
418 | > <*> MSem.new n | ||
419 | > <*> newTVarIO bf | ||
420 | > <*> newTVarIO S.empty | ||
421 | > <*> newBroadcastTChanIO | ||
422 | |||
423 | > -- | New swarm session in which the client allowed to upload only. | ||
424 | > newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
425 | > newSeeder cs t @ Torrent {..} | ||
426 | > = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
427 | |||
428 | > -- | New swarm in which the client allowed both download and upload. | ||
429 | > newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
430 | > newLeecher cs t @ Torrent {..} = do | ||
431 | > se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
432 | > atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
433 | > return se | ||
434 | |||
435 | > --isLeacher :: SwarmSession -> IO Bool | ||
436 | > --isLeacher = undefined | ||
437 | |||
438 | > -- | Get the number of connected peers in the given swarm. | ||
439 | > getSessionCount :: SwarmSession -> IO SessionCount | ||
440 | > getSessionCount SwarmSession {..} = do | ||
441 | > S.size <$> readTVarIO connectedPeers | ||
442 | |||
443 | > getClientBitfield :: SwarmSession -> IO Bitfield | ||
444 | > getClientBitfield = readTVarIO . clientBitfield | ||
445 | |||
446 | > {- | ||
447 | > haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () | ||
448 | > haveDone ix = | ||
449 | > liftIO $ atomically $ do | ||
450 | > bf <- readTVar clientBitfield | ||
451 | > writeTVar (have ix bf) | ||
452 | > currentProgress | ||
453 | > -} | ||
454 | |||
455 | > -- acquire/release mechanism: for internal use only | ||
456 | |||
457 | > enterSwarm :: SwarmSession -> IO () | ||
458 | > enterSwarm SwarmSession {..} = do | ||
459 | > MSem.wait (activeThreads clientSession) | ||
460 | > MSem.wait vacantPeers | ||
461 | |||
462 | > leaveSwarm :: SwarmSession -> IO () | ||
463 | > leaveSwarm SwarmSession {..} = do | ||
464 | > MSem.signal vacantPeers | ||
465 | > MSem.signal (activeThreads clientSession) | ||
466 | |||
467 | > waitVacancy :: SwarmSession -> IO () -> IO () | ||
468 | > waitVacancy se = | ||
469 | > bracket (enterSwarm se) (const (leaveSwarm se)) | ||
470 | > . const | ||
471 | |||
472 | > pieceLength :: SwarmSession -> Int | ||
473 | > pieceLength = ciPieceLength . tInfo . torrentMeta | ||
474 | > {-# INLINE pieceLength #-} | ||
475 | |||
476 | > {----------------------------------------------------------------------- | ||
477 | > Peer session | ||
478 | > -----------------------------------------------------------------------} | ||
479 | |||
480 | > -- | Peer session contain all data necessary for peer to peer | ||
481 | > -- communication. | ||
482 | > data PeerSession = PeerSession { | ||
483 | > -- | Used as unique 'PeerSession' identifier within one | ||
484 | > -- 'SwarmSession'. | ||
485 | > connectedPeerAddr :: !PeerAddr | ||
486 | |||
487 | > -- | The swarm to which both end points belong to. | ||
488 | > , swarmSession :: !SwarmSession | ||
489 | |||
490 | > -- | Extensions such that both peer and client support. | ||
491 | > , enabledExtensions :: [Extension] | ||
492 | |||
493 | > -- | To dissconnect from died peers appropriately we should check | ||
494 | > -- if a peer do not sent the KA message within given interval. If | ||
495 | > -- yes, we should throw an exception in 'TimeoutCallback' and | ||
496 | > -- close session between peers. | ||
497 | > -- | ||
498 | > -- We should update timeout if we /receive/ any message within | ||
499 | > -- timeout interval to keep connection up. | ||
500 | > , incomingTimeout :: !TimeoutKey | ||
501 | |||
502 | > -- | To send KA message appropriately we should know when was last | ||
503 | > -- time we sent a message to a peer. To do that we keep registered | ||
504 | > -- timeout in event manager and if we do not sent any message to | ||
505 | > -- the peer within given interval then we send KA message in | ||
506 | > -- 'TimeoutCallback'. | ||
507 | > -- | ||
508 | > -- We should update timeout if we /send/ any message within timeout | ||
509 | > -- to avoid reduntant KA messages. | ||
510 | > -- | ||
511 | > , outcomingTimeout :: !TimeoutKey | ||
512 | |||
513 | > -- TODO use dupChan for broadcasting | ||
514 | > -- | Broadcast messages waiting to be sent to peer. | ||
515 | > , pendingMessages :: !(TChan Message) | ||
516 | |||
517 | > -- | Dymanic P2P data. | ||
518 | > , sessionState :: !(IORef SessionState) | ||
519 | > } | ||
520 | |||
521 | > -- TODO unpack some fields | ||
522 | |||
523 | > data SessionState = SessionState { | ||
524 | > _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | ||
525 | > , _status :: !SessionStatus -- ^ Status of both peers. | ||
526 | > } deriving (Show, Eq) | ||
527 | |||
528 | > $(makeLenses ''SessionState) | ||
529 | |||
530 | > instance Eq PeerSession where | ||
531 | > (==) = (==) `on` connectedPeerAddr | ||
532 | |||
533 | > instance Ord PeerSession where | ||
534 | > compare = comparing connectedPeerAddr | ||
535 | |||
536 | > -- | Exceptions used to interrupt the current P2P session. This | ||
537 | > -- exceptions will NOT affect other P2P sessions, DHT, peer <-> | ||
538 | > -- tracker, or any other session. | ||
539 | > -- | ||
540 | > data SessionException = PeerDisconnected | ||
541 | > | ProtocolError Doc | ||
542 | > deriving (Show, Typeable) | ||
543 | |||
544 | > instance Exception SessionException | ||
545 | |||
546 | |||
547 | > -- | Do nothing with exception, used with 'handle' or 'try'. | ||
548 | > isSessionException :: Monad m => SessionException -> m () | ||
549 | > isSessionException _ = return () | ||
550 | |||
551 | > -- | The same as 'isSessionException' but output to stdout the catched | ||
552 | > -- exception, for debugging purposes only. | ||
553 | > putSessionException :: SessionException -> IO () | ||
554 | > putSessionException = print | ||
555 | |||
556 | > -- TODO modify such that we can use this in listener loop | ||
557 | > -- TODO check if it connected yet peer | ||
558 | > withPeerSession :: SwarmSession -> PeerAddr | ||
559 | > -> ((Socket, PeerSession) -> IO ()) | ||
560 | > -> IO () | ||
561 | |||
562 | > withPeerSession ss @ SwarmSession {..} addr | ||
563 | > = handle isSessionException . bracket openSession closeSession | ||
564 | > where | ||
565 | > openSession = do | ||
566 | > let caps = encodeExts $ allowedExtensions $ clientSession | ||
567 | > let ihash = tInfoHash torrentMeta | ||
568 | > let pid = clientPeerId $ clientSession | ||
569 | > let chs = Handshake defaultBTProtocol caps ihash pid | ||
570 | |||
571 | > sock <- connectToPeer addr | ||
572 | > phs <- handshake sock chs `onException` close sock | ||
573 | |||
574 | > cbf <- readTVarIO clientBitfield | ||
575 | > sendAll sock (encode (Bitfield cbf)) | ||
576 | |||
577 | > let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) | ||
578 | > ps <- PeerSession addr ss enabled | ||
579 | > <$> registerTimeout (eventManager clientSession) | ||
580 | > maxIncomingTime (return ()) | ||
581 | > <*> registerTimeout (eventManager clientSession) | ||
582 | > maxOutcomingTime (sendKA sock) | ||
583 | > <*> atomically (dupTChan broadcastMessages) | ||
584 | > <*> do { | ||
585 | > ; tc <- totalCount <$> readTVarIO clientBitfield | ||
586 | > ; newIORef (SessionState (haveNone tc) def) | ||
587 | > } | ||
588 | |||
589 | > atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
590 | |||
591 | > return (sock, ps) | ||
592 | |||
593 | > closeSession (sock, ps) = do | ||
594 | > atomically $ modifyTVar' connectedPeers (S.delete ps) | ||
595 | > close sock | ||
596 | |||
597 | > findPieceCount :: PeerSession -> PieceCount | ||
598 | > findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | ||
599 | |||
600 | > {----------------------------------------------------------------------- | ||
601 | > Broadcasting: Have, Cancel, Bitfield, SuggestPiece | ||
602 | > -----------------------------------------------------------------------} | ||
603 | |||
604 | > -- here we should enqueue broadcast messages and keep in mind that: | ||
605 | > -- | ||
606 | > -- * We should enqueue broadcast events as they are appear. | ||
607 | > -- * We should yield broadcast messages as fast as we get them. | ||
608 | > -- | ||
609 | > -- these 2 phases might differ in time significantly | ||
610 | |||
611 | > -- TODO do this; but only when it'll be clean which other broadcast | ||
612 | > -- messages & events we should send | ||
613 | |||
614 | > -- 1. Update client have bitfield --\____ in one transaction; | ||
615 | > -- 2. Update downloaded stats --/ | ||
616 | > -- 3. Signal to the all other peer about this. | ||
617 | |||
618 | > available :: Bitfield -> SwarmSession -> IO () | ||
619 | > available bf se @ SwarmSession {..} = {-# SCC available #-} do | ||
620 | > mark >> atomically broadcast | ||
621 | > where | ||
622 | > mark = do | ||
623 | > let bytes = pieceLength se * BF.haveCount bf | ||
624 | > atomically $ do | ||
625 | > modifyTVar' clientBitfield (BF.union bf) | ||
626 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
627 | |||
628 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
629 | |||
630 | |||
631 | > -- TODO compute size of messages: if it's faster to send Bitfield | ||
632 | > -- instead many Have do that | ||
633 | > -- | ||
634 | > -- also if there is single Have message in queue then the | ||
635 | > -- corresponding piece is likely still in memory or disc cache, | ||
636 | > -- when we can send SuggestPiece | ||
637 | |||
638 | > -- | Get pending messages queue appeared in result of asynchronously | ||
639 | > -- changed client state. Resulting queue should be sent to a peer | ||
640 | > -- immediately. | ||
641 | > getPending :: PeerSession -> IO [Message] | ||
642 | > getPending PeerSession {..} = {-# SCC getPending #-} do | ||
643 | > atomically (readAvail pendingMessages) | ||
644 | |||
645 | > readAvail :: TChan a -> STM [a] | ||
646 | > readAvail chan = do | ||
647 | > m <- tryReadTChan chan | ||
648 | > case m of | ||
649 | > Just a -> (:) <$> pure a <*> readAvail chan | ||
650 | > Nothing -> return [] | ||
651 | |||
652 | > {----------------------------------------------------------------------- | ||
653 | > Timeouts | ||
654 | > -----------------------------------------------------------------------} | ||
655 | |||
656 | > -- for internal use only | ||
657 | |||
658 | > sec :: Int | ||
659 | > sec = 1000 * 1000 | ||
660 | |||
661 | > maxIncomingTime :: Int | ||
662 | > maxIncomingTime = 120 * sec | ||
663 | |||
664 | > maxOutcomingTime :: Int | ||
665 | > maxOutcomingTime = 1 * sec | ||
666 | |||
667 | > -- | Should be called after we have received any message from a peer. | ||
668 | > updateIncoming :: PeerSession -> IO () | ||
669 | > updateIncoming PeerSession {..} = do | ||
670 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
671 | > incomingTimeout maxIncomingTime | ||
672 | |||
673 | > -- | Should be called before we have send any message to a peer. | ||
674 | > updateOutcoming :: PeerSession -> IO () | ||
675 | > updateOutcoming PeerSession {..} = | ||
676 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
677 | > outcomingTimeout maxOutcomingTime | ||
678 | |||
679 | > sendKA :: Socket -> IO () | ||
680 | > sendKA sock {- SwarmSession {..} -} = do | ||
681 | > return () | ||
682 | > -- print "I'm sending keep alive." | ||
683 | > -- sendAll sock (encode BT.KeepAlive) | ||
684 | > -- let mgr = eventManager clientSession | ||
685 | > -- updateTimeout mgr | ||
686 | > -- print "Done.." | ||