diff options
Diffstat (limited to 'src/Network/BitTorrent/Internal.lhs')
-rw-r--r-- | src/Network/BitTorrent/Internal.lhs | 686 |
1 files changed, 686 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs new file mode 100644 index 00000000..3f8f0371 --- /dev/null +++ b/src/Network/BitTorrent/Internal.lhs | |||
@@ -0,0 +1,686 @@ | |||
1 | > -- | | ||
2 | > -- Copyright : (c) Sam T. 2013 | ||
3 | > -- License : MIT | ||
4 | > -- Maintainer : pxqr.sta@gmail.com | ||
5 | > -- Stability : experimental | ||
6 | > -- Portability : portable | ||
7 | > -- | ||
8 | > -- This module implement opaque broadcast message passing. It | ||
9 | > -- provides sessions needed by Network.BitTorrent and | ||
10 | > -- Network.BitTorrent.Exchange and modules. To hide some internals | ||
11 | > -- of this module we detach it from Exchange. | ||
12 | > -- | ||
13 | > -- | ||
14 | > -- NOTE: expose only static data in data field lists, all dynamic | ||
15 | > -- data should be modified through standalone functions. | ||
16 | > -- | ||
17 | > {-# LANGUAGE OverloadedStrings #-} | ||
18 | > {-# LANGUAGE RecordWildCards #-} | ||
19 | > {-# LANGUAGE ViewPatterns #-} | ||
20 | > {-# LANGUAGE TemplateHaskell #-} | ||
21 | > {-# LANGUAGE DeriveDataTypeable #-} | ||
22 | > module Network.BitTorrent.Internal | ||
23 | > ( Progress(..), startProgress | ||
24 | > -- * Client | ||
25 | > , ClientSession (clientPeerId, allowedExtensions, listenerPort) | ||
26 | > | ||
27 | > , ThreadCount | ||
28 | > , defaultThreadCount | ||
29 | > | ||
30 | > , TorrentLoc(..) | ||
31 | > , registerTorrent | ||
32 | > , unregisterTorrent | ||
33 | > , newClient | ||
34 | |||
35 | > , getCurrentProgress | ||
36 | > , getSwarmCount | ||
37 | > , getPeerCount | ||
38 | |||
39 | |||
40 | |||
41 | > -- * Swarm | ||
42 | > , SwarmSession( SwarmSession, torrentMeta, clientSession ) | ||
43 | |||
44 | > , SessionCount | ||
45 | > , getSessionCount | ||
46 | |||
47 | > , newLeecher | ||
48 | > , newSeeder | ||
49 | > , getClientBitfield | ||
50 | |||
51 | > , enterSwarm | ||
52 | > , leaveSwarm | ||
53 | > , waitVacancy | ||
54 | |||
55 | > , pieceLength | ||
56 | |||
57 | > -- * Peer | ||
58 | > , PeerSession( PeerSession, connectedPeerAddr | ||
59 | > , swarmSession, enabledExtensions | ||
60 | > , sessionState | ||
61 | > ) | ||
62 | > , SessionState | ||
63 | > , withPeerSession | ||
64 | |||
65 | > -- ** Broadcasting | ||
66 | > , available | ||
67 | > , getPending | ||
68 | |||
69 | > -- ** Exceptions | ||
70 | > , SessionException(..) | ||
71 | > , isSessionException | ||
72 | > , putSessionException | ||
73 | |||
74 | > -- ** Properties | ||
75 | > , bitfield, status | ||
76 | > , findPieceCount | ||
77 | |||
78 | > -- * Timeouts | ||
79 | > , updateIncoming, updateOutcoming | ||
80 | > ) where | ||
81 | |||
82 | > import Prelude hiding (mapM_) | ||
83 | |||
84 | > import Control.Applicative | ||
85 | > import Control.Concurrent | ||
86 | > import Control.Concurrent.STM | ||
87 | > import Control.Concurrent.MSem as MSem | ||
88 | > import Control.Lens | ||
89 | > import Control.Exception | ||
90 | > import Control.Monad.Trans | ||
91 | |||
92 | > import Data.IORef | ||
93 | > import Data.Default | ||
94 | > import Data.Function | ||
95 | > import Data.Foldable (mapM_) | ||
96 | > import Data.HashMap.Strict as HM | ||
97 | > import Data.Ord | ||
98 | > import Data.Set as S | ||
99 | > import Data.Typeable | ||
100 | |||
101 | > import Data.Serialize hiding (get) | ||
102 | > import Text.PrettyPrint | ||
103 | |||
104 | > import Network | ||
105 | > import Network.Socket | ||
106 | > import Network.Socket.ByteString | ||
107 | |||
108 | > import GHC.Event as Ev | ||
109 | |||
110 | > import Data.Bitfield as BF | ||
111 | > import Data.Torrent | ||
112 | > import Network.BitTorrent.Extension | ||
113 | > import Network.BitTorrent.Peer | ||
114 | > import Network.BitTorrent.Exchange.Protocol as BT | ||
115 | > import Network.BitTorrent.Tracker.Protocol as BT | ||
116 | |||
117 | > {----------------------------------------------------------------------- | ||
118 | > Progress | ||
119 | > -----------------------------------------------------------------------} | ||
120 | |||
121 | > -- | 'Progress' contains upload/download/left stats about | ||
122 | > -- current client state and used to notify the tracker. | ||
123 | > -- | ||
124 | > -- This data is considered as dynamic within one client | ||
125 | > -- session. This data also should be shared across client | ||
126 | > -- application sessions (e.g. files), otherwise use 'startProgress' | ||
127 | > -- to get initial 'Progress'. | ||
128 | > -- | ||
129 | > data Progress = Progress { | ||
130 | > _uploaded :: !Integer -- ^ Total amount of bytes uploaded. | ||
131 | > , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. | ||
132 | > , _left :: !Integer -- ^ Total amount of bytes left. | ||
133 | > } deriving (Show, Read, Eq) | ||
134 | |||
135 | > -- TODO use atomic bits and Word64 | ||
136 | |||
137 | > $(makeLenses ''Progress) | ||
138 | |||
139 | > -- | Initial progress is used when there are no session before. | ||
140 | > -- | ||
141 | > -- Please note that tracker might penalize client some way if the do | ||
142 | > -- not accumulate progress. If possible and save 'Progress' between | ||
143 | > -- client sessions to avoid that. | ||
144 | > -- | ||
145 | > startProgress :: Integer -> Progress | ||
146 | > startProgress = Progress 0 0 | ||
147 | |||
148 | > -- | Used when the client download some data from /any/ peer. | ||
149 | > downloadedProgress :: Int -> Progress -> Progress | ||
150 | > downloadedProgress (fromIntegral -> amount) | ||
151 | > = (left -~ amount) | ||
152 | > . (downloaded +~ amount) | ||
153 | > {-# INLINE downloadedProgress #-} | ||
154 | |||
155 | > -- | Used when the client upload some data to /any/ peer. | ||
156 | > uploadedProgress :: Int -> Progress -> Progress | ||
157 | > uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
158 | > {-# INLINE uploadedProgress #-} | ||
159 | |||
160 | > -- | Used when leecher join client session. | ||
161 | > enqueuedProgress :: Integer -> Progress -> Progress | ||
162 | > enqueuedProgress amount = left +~ amount | ||
163 | > {-# INLINE enqueuedProgress #-} | ||
164 | |||
165 | > -- | Used when leecher leave client session. | ||
166 | > -- (e.g. user deletes not completed torrent) | ||
167 | > dequeuedProgress :: Integer -> Progress -> Progress | ||
168 | > dequeuedProgress amount = left -~ amount | ||
169 | > {-# INLINE dequeuedProgress #-} | ||
170 | |||
171 | |||
172 | Thread layout | ||
173 | ------------- | ||
174 | |||
175 | When client session created 2 new threads appear: | ||
176 | |||
177 | * DHT listener - replies to DHT requests; | ||
178 | |||
179 | * Peer listener - accept new P2P connection initiated by other | ||
180 | peers. | ||
181 | |||
182 | When swarn session created 3 new threads appear: | ||
183 | |||
184 | * DHT request loop asks for new peers; | ||
185 | |||
186 | * Tracker request loop asks for new peers; | ||
187 | |||
188 | * controller which fork new avaand manage running P2P sessions. | ||
189 | |||
190 | Peer session is one always forked thread. | ||
191 | |||
192 | When client\/swarm\/peer session gets closed kill the corresponding | ||
193 | threads, but flush data to disc. (for e.g. storage block map) | ||
194 | |||
195 | So for e.g., in order to obtain our first block we need to run at | ||
196 | least 7 threads: main thread, 2 client session thread, 3 swarm session | ||
197 | threads and PeerSession thread. | ||
198 | |||
199 | > {----------------------------------------------------------------------- | ||
200 | > Client session | ||
201 | > -----------------------------------------------------------------------} | ||
202 | |||
203 | > {- NOTE: If we will not restrict number of threads we could end up | ||
204 | > with thousands of connected swarm and make no particular progress. | ||
205 | > | ||
206 | > Note also we do not bound number of swarms! This is not optimal | ||
207 | > strategy because each swarm might have say 1 thread and we could end | ||
208 | > up bounded by the meaningless limit. Bounding global number of p2p | ||
209 | > sessions should work better, and simpler.-} | ||
210 | |||
211 | > -- | Each client might have a limited number of threads. | ||
212 | > type ThreadCount = Int | ||
213 | |||
214 | > -- | The number of threads suitable for a typical BT client. | ||
215 | > defaultThreadCount :: ThreadCount | ||
216 | > defaultThreadCount = 1000 | ||
217 | |||
218 | > {- PERFORMANCE NOTE: keeping torrent metafiles in memory is a _bad_ | ||
219 | > idea: for 1TB of data we need at least 100MB of metadata. (using 256KB | ||
220 | > piece size). This solution do not scale further. Solution with | ||
221 | > TorrentLoc is much better and takes much more less space, moreover it | ||
222 | > depends on count of torrents but not on count of data itself. To scale | ||
223 | > further, in future we might add something like database (for | ||
224 | > e.g. sqlite) for this kind of things.-} | ||
225 | |||
226 | > -- | Identifies location of | ||
227 | > data TorrentLoc = TorrentLoc { | ||
228 | > metafilePath :: FilePath | ||
229 | > , dataPath :: FilePath | ||
230 | > } | ||
231 | |||
232 | > validateTorrent :: TorrentLoc -> IO () | ||
233 | > validateTorrent = error "validateTorrent: not implemented" | ||
234 | |||
235 | > -- | TorrentMap is used to keep track all known torrents for the | ||
236 | > -- client. When some peer trying to connect to us it's necessary to | ||
237 | > -- dispatch appropriate 'SwarmSession' (or start new one if there are | ||
238 | > -- none) in the listener loop: we only know 'InfoHash' from | ||
239 | > -- 'Handshake' but nothing more. So to accept new 'PeerSession' we | ||
240 | > -- need to lookup torrent metainfo and content files (if there are | ||
241 | > -- some) by the 'InfoHash' and only after that enter exchange loop. | ||
242 | > -- | ||
243 | > type TorrentMap = HashMap InfoHash TorrentLoc | ||
244 | |||
245 | > {- NOTE: basically, client session should contain options which user | ||
246 | > app store in configuration files. (related to the protocol) Moreover | ||
247 | > it should contain the all client identification info. (e.g. DHT) -} | ||
248 | |||
249 | > -- | Client session is the basic unit of bittorrent network, it has: | ||
250 | > -- | ||
251 | > -- * The /peer ID/ used as unique identifier of the client in | ||
252 | > -- network. Obviously, this value is not changed during client | ||
253 | > -- session. | ||
254 | > -- | ||
255 | > -- * The number of /protocol extensions/ it might use. This value | ||
256 | > -- is static as well, but if you want to dynamically reconfigure | ||
257 | > -- the client you might kill the end the current session and | ||
258 | > -- create a new with the fresh required extensions. | ||
259 | > -- | ||
260 | > -- * The number of /swarms/ to join, each swarm described by the | ||
261 | > -- 'SwarmSession'. | ||
262 | > -- | ||
263 | > -- Normally, you would have one client session, however, if we need, | ||
264 | > -- in one application we could have many clients with different peer | ||
265 | > -- ID's and different enabled extensions at the same time. | ||
266 | > -- | ||
267 | > data ClientSession = ClientSession { | ||
268 | > -- | Used in handshakes and discovery mechanism. | ||
269 | > clientPeerId :: !PeerId | ||
270 | |||
271 | > -- | Extensions we should try to use. Hovewer some particular peer | ||
272 | > -- might not support some extension, so we keep enabledExtension in | ||
273 | > -- 'PeerSession'. | ||
274 | > , allowedExtensions :: [Extension] | ||
275 | |||
276 | > -- | Port where client listen for other peers | ||
277 | > , listenerPort :: PortNumber | ||
278 | > -- TODO restart listener if it fail | ||
279 | |||
280 | > -- | Semaphor used to bound number of active P2P sessions. | ||
281 | > , activeThreads :: !(MSem ThreadCount) | ||
282 | |||
283 | > -- | Max number of active connections. | ||
284 | > , maxActive :: !ThreadCount | ||
285 | |||
286 | > -- | Used to traverse the swarm session. | ||
287 | > , swarmSessions :: !(TVar (Set SwarmSession)) | ||
288 | |||
289 | > , eventManager :: !EventManager | ||
290 | |||
291 | > -- | Used to keep track global client progress. | ||
292 | > , currentProgress :: !(TVar Progress) | ||
293 | |||
294 | > -- | Used to keep track available torrents. | ||
295 | > , torrentMap :: !(TVar TorrentMap) | ||
296 | > } | ||
297 | |||
298 | > -- currentProgress field is reduntant: progress depends on the all swarm bitfields | ||
299 | > -- maybe we can remove the 'currentProgress' and compute it on demand? | ||
300 | |||
301 | |||
302 | > instance Eq ClientSession where | ||
303 | > (==) = (==) `on` clientPeerId | ||
304 | |||
305 | > instance Ord ClientSession where | ||
306 | > compare = comparing clientPeerId | ||
307 | |||
308 | > -- | Get current global progress of the client. This value is usually | ||
309 | > -- shown to a user. | ||
310 | > getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
311 | > getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
312 | |||
313 | > -- | Get number of swarms client aware of. | ||
314 | > getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
315 | > getSwarmCount ClientSession {..} = liftIO $ | ||
316 | > S.size <$> readTVarIO swarmSessions | ||
317 | |||
318 | > -- | Get number of peers the client currently connected to. | ||
319 | > getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
320 | > getPeerCount ClientSession {..} = liftIO $ do | ||
321 | > unused <- peekAvail activeThreads | ||
322 | > return (maxActive - unused) | ||
323 | |||
324 | > -- | Create a new client session. The data passed to this function are | ||
325 | > -- usually loaded from configuration file. | ||
326 | > newClient :: SessionCount -- ^ Maximum count of active P2P Sessions. | ||
327 | > -> [Extension] -- ^ Extensions allowed to use. | ||
328 | > -> IO ClientSession -- ^ Client with unique peer ID. | ||
329 | |||
330 | > newClient n exts = do | ||
331 | > mgr <- Ev.new | ||
332 | > -- TODO kill this thread when leave client | ||
333 | > _ <- forkIO $ loop mgr | ||
334 | |||
335 | > ClientSession | ||
336 | > <$> newPeerId | ||
337 | > <*> pure exts | ||
338 | > <*> forkListener (error "listener") | ||
339 | > <*> MSem.new n | ||
340 | > <*> pure n | ||
341 | > <*> newTVarIO S.empty | ||
342 | > <*> pure mgr | ||
343 | > <*> newTVarIO (startProgress 0) | ||
344 | > <*> newTVarIO HM.empty | ||
345 | |||
346 | > registerTorrent :: ClientSession -> InfoHash -> TorrentLoc -> STM () | ||
347 | > registerTorrent ClientSession {..} ih tl = do | ||
348 | > modifyTVar' torrentMap $ HM.insert ih tl | ||
349 | |||
350 | > unregisterTorrent :: ClientSession -> InfoHash -> STM () | ||
351 | > unregisterTorrent ClientSession {..} ih = do | ||
352 | > modifyTVar' torrentMap $ HM.delete ih | ||
353 | |||
354 | > {----------------------------------------------------------------------- | ||
355 | > Swarm session | ||
356 | > -----------------------------------------------------------------------} | ||
357 | |||
358 | > {- NOTE: If client is a leecher then there is NO particular reason to | ||
359 | > set max sessions count more than the_number_of_unchoke_slots * k: | ||
360 | |||
361 | > * thread slot(activeThread semaphore) | ||
362 | > * will take but no | ||
363 | |||
364 | > So if client is a leecher then max sessions count depends on the | ||
365 | > number of unchoke slots. | ||
366 | |||
367 | > However if client is a seeder then the value depends on . | ||
368 | > -} | ||
369 | |||
370 | > -- | Used to bound the number of simultaneous connections and, which | ||
371 | > -- is the same, P2P sessions within the swarm session. | ||
372 | > type SessionCount = Int | ||
373 | |||
374 | > defSeederConns :: SessionCount | ||
375 | > defSeederConns = defaultUnchokeSlots | ||
376 | |||
377 | > defLeacherConns :: SessionCount | ||
378 | > defLeacherConns = defaultNumWant | ||
379 | |||
380 | > -- | Swarm session is | ||
381 | > data SwarmSession = SwarmSession { | ||
382 | > torrentMeta :: !Torrent | ||
383 | |||
384 | > -- | | ||
385 | > , clientSession :: !ClientSession | ||
386 | |||
387 | > -- | Represent count of peers we _currently_ can connect to in the | ||
388 | > -- swarm. Used to bound number of concurrent threads. | ||
389 | > , vacantPeers :: !(MSem SessionCount) | ||
390 | |||
391 | > -- | Modify this carefully updating global progress. | ||
392 | > , clientBitfield :: !(TVar Bitfield) | ||
393 | |||
394 | > , connectedPeers :: !(TVar (Set PeerSession)) | ||
395 | |||
396 | > -- TODO use bounded broadcast chan with priority queue and drop old entries | ||
397 | > -- | Channel used for replicate messages across all peers in | ||
398 | > -- swarm. For exsample if we get some piece we should sent to all | ||
399 | > -- connected (and interested in) peers HAVE message. | ||
400 | > -- | ||
401 | > , broadcastMessages :: !(TChan Message) | ||
402 | > } | ||
403 | |||
404 | > -- INVARIANT: | ||
405 | > -- max_sessions_count - sizeof connectedPeers = value vacantPeers | ||
406 | |||
407 | > instance Eq SwarmSession where | ||
408 | > (==) = (==) `on` (tInfoHash . torrentMeta) | ||
409 | |||
410 | > instance Ord SwarmSession where | ||
411 | > compare = comparing (tInfoHash . torrentMeta) | ||
412 | |||
413 | > newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | ||
414 | > -> IO SwarmSession | ||
415 | > newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
416 | > = SwarmSession <$> pure t | ||
417 | > <*> pure cs | ||
418 | > <*> MSem.new n | ||
419 | > <*> newTVarIO bf | ||
420 | > <*> newTVarIO S.empty | ||
421 | > <*> newBroadcastTChanIO | ||
422 | |||
423 | > -- | New swarm session in which the client allowed to upload only. | ||
424 | > newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
425 | > newSeeder cs t @ Torrent {..} | ||
426 | > = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
427 | |||
428 | > -- | New swarm in which the client allowed both download and upload. | ||
429 | > newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
430 | > newLeecher cs t @ Torrent {..} = do | ||
431 | > se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
432 | > atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
433 | > return se | ||
434 | |||
435 | > --isLeacher :: SwarmSession -> IO Bool | ||
436 | > --isLeacher = undefined | ||
437 | |||
438 | > -- | Get the number of connected peers in the given swarm. | ||
439 | > getSessionCount :: SwarmSession -> IO SessionCount | ||
440 | > getSessionCount SwarmSession {..} = do | ||
441 | > S.size <$> readTVarIO connectedPeers | ||
442 | |||
443 | > getClientBitfield :: SwarmSession -> IO Bitfield | ||
444 | > getClientBitfield = readTVarIO . clientBitfield | ||
445 | |||
446 | > {- | ||
447 | > haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () | ||
448 | > haveDone ix = | ||
449 | > liftIO $ atomically $ do | ||
450 | > bf <- readTVar clientBitfield | ||
451 | > writeTVar (have ix bf) | ||
452 | > currentProgress | ||
453 | > -} | ||
454 | |||
455 | > -- acquire/release mechanism: for internal use only | ||
456 | |||
457 | > enterSwarm :: SwarmSession -> IO () | ||
458 | > enterSwarm SwarmSession {..} = do | ||
459 | > MSem.wait (activeThreads clientSession) | ||
460 | > MSem.wait vacantPeers | ||
461 | |||
462 | > leaveSwarm :: SwarmSession -> IO () | ||
463 | > leaveSwarm SwarmSession {..} = do | ||
464 | > MSem.signal vacantPeers | ||
465 | > MSem.signal (activeThreads clientSession) | ||
466 | |||
467 | > waitVacancy :: SwarmSession -> IO () -> IO () | ||
468 | > waitVacancy se = | ||
469 | > bracket (enterSwarm se) (const (leaveSwarm se)) | ||
470 | > . const | ||
471 | |||
472 | > pieceLength :: SwarmSession -> Int | ||
473 | > pieceLength = ciPieceLength . tInfo . torrentMeta | ||
474 | > {-# INLINE pieceLength #-} | ||
475 | |||
476 | > {----------------------------------------------------------------------- | ||
477 | > Peer session | ||
478 | > -----------------------------------------------------------------------} | ||
479 | |||
480 | > -- | Peer session contain all data necessary for peer to peer | ||
481 | > -- communication. | ||
482 | > data PeerSession = PeerSession { | ||
483 | > -- | Used as unique 'PeerSession' identifier within one | ||
484 | > -- 'SwarmSession'. | ||
485 | > connectedPeerAddr :: !PeerAddr | ||
486 | |||
487 | > -- | The swarm to which both end points belong to. | ||
488 | > , swarmSession :: !SwarmSession | ||
489 | |||
490 | > -- | Extensions such that both peer and client support. | ||
491 | > , enabledExtensions :: [Extension] | ||
492 | |||
493 | > -- | To dissconnect from died peers appropriately we should check | ||
494 | > -- if a peer do not sent the KA message within given interval. If | ||
495 | > -- yes, we should throw an exception in 'TimeoutCallback' and | ||
496 | > -- close session between peers. | ||
497 | > -- | ||
498 | > -- We should update timeout if we /receive/ any message within | ||
499 | > -- timeout interval to keep connection up. | ||
500 | > , incomingTimeout :: !TimeoutKey | ||
501 | |||
502 | > -- | To send KA message appropriately we should know when was last | ||
503 | > -- time we sent a message to a peer. To do that we keep registered | ||
504 | > -- timeout in event manager and if we do not sent any message to | ||
505 | > -- the peer within given interval then we send KA message in | ||
506 | > -- 'TimeoutCallback'. | ||
507 | > -- | ||
508 | > -- We should update timeout if we /send/ any message within timeout | ||
509 | > -- to avoid reduntant KA messages. | ||
510 | > -- | ||
511 | > , outcomingTimeout :: !TimeoutKey | ||
512 | |||
513 | > -- TODO use dupChan for broadcasting | ||
514 | > -- | Broadcast messages waiting to be sent to peer. | ||
515 | > , pendingMessages :: !(TChan Message) | ||
516 | |||
517 | > -- | Dymanic P2P data. | ||
518 | > , sessionState :: !(IORef SessionState) | ||
519 | > } | ||
520 | |||
521 | > -- TODO unpack some fields | ||
522 | |||
523 | > data SessionState = SessionState { | ||
524 | > _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | ||
525 | > , _status :: !SessionStatus -- ^ Status of both peers. | ||
526 | > } deriving (Show, Eq) | ||
527 | |||
528 | > $(makeLenses ''SessionState) | ||
529 | |||
530 | > instance Eq PeerSession where | ||
531 | > (==) = (==) `on` connectedPeerAddr | ||
532 | |||
533 | > instance Ord PeerSession where | ||
534 | > compare = comparing connectedPeerAddr | ||
535 | |||
536 | > -- | Exceptions used to interrupt the current P2P session. This | ||
537 | > -- exceptions will NOT affect other P2P sessions, DHT, peer <-> | ||
538 | > -- tracker, or any other session. | ||
539 | > -- | ||
540 | > data SessionException = PeerDisconnected | ||
541 | > | ProtocolError Doc | ||
542 | > deriving (Show, Typeable) | ||
543 | |||
544 | > instance Exception SessionException | ||
545 | |||
546 | |||
547 | > -- | Do nothing with exception, used with 'handle' or 'try'. | ||
548 | > isSessionException :: Monad m => SessionException -> m () | ||
549 | > isSessionException _ = return () | ||
550 | |||
551 | > -- | The same as 'isSessionException' but output to stdout the catched | ||
552 | > -- exception, for debugging purposes only. | ||
553 | > putSessionException :: SessionException -> IO () | ||
554 | > putSessionException = print | ||
555 | |||
556 | > -- TODO modify such that we can use this in listener loop | ||
557 | > -- TODO check if it connected yet peer | ||
558 | > withPeerSession :: SwarmSession -> PeerAddr | ||
559 | > -> ((Socket, PeerSession) -> IO ()) | ||
560 | > -> IO () | ||
561 | |||
562 | > withPeerSession ss @ SwarmSession {..} addr | ||
563 | > = handle isSessionException . bracket openSession closeSession | ||
564 | > where | ||
565 | > openSession = do | ||
566 | > let caps = encodeExts $ allowedExtensions $ clientSession | ||
567 | > let ihash = tInfoHash torrentMeta | ||
568 | > let pid = clientPeerId $ clientSession | ||
569 | > let chs = Handshake defaultBTProtocol caps ihash pid | ||
570 | |||
571 | > sock <- connectToPeer addr | ||
572 | > phs <- handshake sock chs `onException` close sock | ||
573 | |||
574 | > cbf <- readTVarIO clientBitfield | ||
575 | > sendAll sock (encode (Bitfield cbf)) | ||
576 | |||
577 | > let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) | ||
578 | > ps <- PeerSession addr ss enabled | ||
579 | > <$> registerTimeout (eventManager clientSession) | ||
580 | > maxIncomingTime (return ()) | ||
581 | > <*> registerTimeout (eventManager clientSession) | ||
582 | > maxOutcomingTime (sendKA sock) | ||
583 | > <*> atomically (dupTChan broadcastMessages) | ||
584 | > <*> do { | ||
585 | > ; tc <- totalCount <$> readTVarIO clientBitfield | ||
586 | > ; newIORef (SessionState (haveNone tc) def) | ||
587 | > } | ||
588 | |||
589 | > atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
590 | |||
591 | > return (sock, ps) | ||
592 | |||
593 | > closeSession (sock, ps) = do | ||
594 | > atomically $ modifyTVar' connectedPeers (S.delete ps) | ||
595 | > close sock | ||
596 | |||
597 | > findPieceCount :: PeerSession -> PieceCount | ||
598 | > findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | ||
599 | |||
600 | > {----------------------------------------------------------------------- | ||
601 | > Broadcasting: Have, Cancel, Bitfield, SuggestPiece | ||
602 | > -----------------------------------------------------------------------} | ||
603 | |||
604 | > -- here we should enqueue broadcast messages and keep in mind that: | ||
605 | > -- | ||
606 | > -- * We should enqueue broadcast events as they are appear. | ||
607 | > -- * We should yield broadcast messages as fast as we get them. | ||
608 | > -- | ||
609 | > -- these 2 phases might differ in time significantly | ||
610 | |||
611 | > -- TODO do this; but only when it'll be clean which other broadcast | ||
612 | > -- messages & events we should send | ||
613 | |||
614 | > -- 1. Update client have bitfield --\____ in one transaction; | ||
615 | > -- 2. Update downloaded stats --/ | ||
616 | > -- 3. Signal to the all other peer about this. | ||
617 | |||
618 | > available :: Bitfield -> SwarmSession -> IO () | ||
619 | > available bf se @ SwarmSession {..} = {-# SCC available #-} do | ||
620 | > mark >> atomically broadcast | ||
621 | > where | ||
622 | > mark = do | ||
623 | > let bytes = pieceLength se * BF.haveCount bf | ||
624 | > atomically $ do | ||
625 | > modifyTVar' clientBitfield (BF.union bf) | ||
626 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
627 | |||
628 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
629 | |||
630 | |||
631 | > -- TODO compute size of messages: if it's faster to send Bitfield | ||
632 | > -- instead many Have do that | ||
633 | > -- | ||
634 | > -- also if there is single Have message in queue then the | ||
635 | > -- corresponding piece is likely still in memory or disc cache, | ||
636 | > -- when we can send SuggestPiece | ||
637 | |||
638 | > -- | Get pending messages queue appeared in result of asynchronously | ||
639 | > -- changed client state. Resulting queue should be sent to a peer | ||
640 | > -- immediately. | ||
641 | > getPending :: PeerSession -> IO [Message] | ||
642 | > getPending PeerSession {..} = {-# SCC getPending #-} do | ||
643 | > atomically (readAvail pendingMessages) | ||
644 | |||
645 | > readAvail :: TChan a -> STM [a] | ||
646 | > readAvail chan = do | ||
647 | > m <- tryReadTChan chan | ||
648 | > case m of | ||
649 | > Just a -> (:) <$> pure a <*> readAvail chan | ||
650 | > Nothing -> return [] | ||
651 | |||
652 | > {----------------------------------------------------------------------- | ||
653 | > Timeouts | ||
654 | > -----------------------------------------------------------------------} | ||
655 | |||
656 | > -- for internal use only | ||
657 | |||
658 | > sec :: Int | ||
659 | > sec = 1000 * 1000 | ||
660 | |||
661 | > maxIncomingTime :: Int | ||
662 | > maxIncomingTime = 120 * sec | ||
663 | |||
664 | > maxOutcomingTime :: Int | ||
665 | > maxOutcomingTime = 1 * sec | ||
666 | |||
667 | > -- | Should be called after we have received any message from a peer. | ||
668 | > updateIncoming :: PeerSession -> IO () | ||
669 | > updateIncoming PeerSession {..} = do | ||
670 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
671 | > incomingTimeout maxIncomingTime | ||
672 | |||
673 | > -- | Should be called before we have send any message to a peer. | ||
674 | > updateOutcoming :: PeerSession -> IO () | ||
675 | > updateOutcoming PeerSession {..} = | ||
676 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
677 | > outcomingTimeout maxOutcomingTime | ||
678 | |||
679 | > sendKA :: Socket -> IO () | ||
680 | > sendKA sock {- SwarmSession {..} -} = do | ||
681 | > return () | ||
682 | > -- print "I'm sending keep alive." | ||
683 | > -- sendAll sock (encode BT.KeepAlive) | ||
684 | > -- let mgr = eventManager clientSession | ||
685 | > -- updateTimeout mgr | ||
686 | > -- print "Done.." | ||