diff options
Diffstat (limited to 'src/Network/BitTorrent/Internal.lhs')
-rw-r--r-- | src/Network/BitTorrent/Internal.lhs | 579 |
1 files changed, 0 insertions, 579 deletions
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs deleted file mode 100644 index ef4165e3..00000000 --- a/src/Network/BitTorrent/Internal.lhs +++ /dev/null | |||
@@ -1,579 +0,0 @@ | |||
1 | > -- | | ||
2 | > -- Copyright : (c) Sam T. 2013 | ||
3 | > -- License : MIT | ||
4 | > -- Maintainer : pxqr.sta@gmail.com | ||
5 | > -- Stability : experimental | ||
6 | > -- Portability : portable | ||
7 | > -- | ||
8 | > -- This module implement opaque broadcast message passing. It | ||
9 | > -- provides sessions needed by Network.BitTorrent and | ||
10 | > -- Network.BitTorrent.Exchange and modules. To hide some internals | ||
11 | > -- of this module we detach it from Exchange. | ||
12 | > -- NOTE: Expose only static data in data field lists, all dynamic | ||
13 | > -- data should be modified through standalone functions. | ||
14 | > -- | ||
15 | > | ||
16 | > {-# LANGUAGE OverloadedStrings #-} | ||
17 | > {-# LANGUAGE RecordWildCards #-} | ||
18 | > {-# LANGUAGE ViewPatterns #-} | ||
19 | > | ||
20 | > module Network.BitTorrent.Internal | ||
21 | > ( -- * Progress | ||
22 | > Progress(..), startProgress | ||
23 | > | ||
24 | > , ClientService(..) | ||
25 | > , startService | ||
26 | > , withRunning | ||
27 | > | ||
28 | > -- * Client | ||
29 | > , ClientSession ( ClientSession | ||
30 | > , clientPeerId, allowedExtensions | ||
31 | > , nodeListener, peerListener | ||
32 | > ) | ||
33 | > , withClientSession | ||
34 | > , listenerPort, dhtPort | ||
35 | > | ||
36 | > , ThreadCount | ||
37 | > , defaultThreadCount | ||
38 | > | ||
39 | > , TorrentLoc(..) | ||
40 | > , registerTorrent | ||
41 | > , unregisterTorrent | ||
42 | > | ||
43 | > , getCurrentProgress | ||
44 | > , getSwarmCount | ||
45 | > , getPeerCount | ||
46 | > | ||
47 | > -- * Swarm | ||
48 | > , SwarmSession( SwarmSession, torrentMeta, clientSession ) | ||
49 | > | ||
50 | > , SessionCount | ||
51 | > , getSessionCount | ||
52 | > | ||
53 | > , newLeecher | ||
54 | > , newSeeder | ||
55 | > , getClientBitfield | ||
56 | > | ||
57 | > , waitVacancy | ||
58 | > , forkThrottle | ||
59 | > | ||
60 | > -- * Peer | ||
61 | > , PeerSession( PeerSession, connectedPeerAddr | ||
62 | > , swarmSession, enabledExtensions | ||
63 | > , sessionState | ||
64 | > ) | ||
65 | > , SessionState | ||
66 | > , initiatePeerSession | ||
67 | > , acceptPeerSession | ||
68 | > , listener | ||
69 | > | ||
70 | > -- ** Broadcasting | ||
71 | > , available | ||
72 | > , getPending | ||
73 | > | ||
74 | > -- ** Exceptions | ||
75 | > , SessionException(..) | ||
76 | > , isSessionException | ||
77 | > , putSessionException | ||
78 | > | ||
79 | > -- ** Properties | ||
80 | > , bitfield, status | ||
81 | > , findPieceCount | ||
82 | > | ||
83 | > -- * Timeouts | ||
84 | > , updateIncoming, updateOutcoming | ||
85 | > ) where | ||
86 | |||
87 | > import Prelude hiding (mapM_) | ||
88 | |||
89 | > import Control.Applicative | ||
90 | > import Control.Concurrent | ||
91 | > import Control.Concurrent.STM | ||
92 | > import Control.Concurrent.MSem as MSem | ||
93 | > import Control.Lens | ||
94 | > import Control.Monad (when, forever, (>=>)) | ||
95 | > import Control.Exception | ||
96 | > import Control.Monad.Trans | ||
97 | |||
98 | > import Data.IORef | ||
99 | > import Data.Foldable (mapM_) | ||
100 | > import Data.Map as M | ||
101 | > import Data.HashMap.Strict as HM | ||
102 | > import Data.Set as S | ||
103 | |||
104 | > import Data.Serialize hiding (get) | ||
105 | |||
106 | > import Network hiding (accept) | ||
107 | > import Network.Socket | ||
108 | > import Network.Socket.ByteString | ||
109 | |||
110 | > import GHC.Event as Ev | ||
111 | |||
112 | > import Data.Bitfield as BF | ||
113 | > import Data.Torrent | ||
114 | > import Network.BitTorrent.Extension | ||
115 | > import Network.BitTorrent.Peer | ||
116 | > import Network.BitTorrent.Exchange.Protocol as BT | ||
117 | > import Network.BitTorrent.Tracker.Protocol as BT | ||
118 | > import System.Torrent.Storage | ||
119 | > import Network.BitTorrent.Sessions.Types | ||
120 | |||
121 | |||
122 | > -- | Initial progress is used when there are no session before. | ||
123 | > startProgress :: Integer -> Progress | ||
124 | > startProgress = Progress 0 0 | ||
125 | |||
126 | > -- | Used when the client download some data from /any/ peer. | ||
127 | > downloadedProgress :: Int -> Progress -> Progress | ||
128 | > downloadedProgress (fromIntegral -> amount) | ||
129 | > = (left -~ amount) | ||
130 | > . (downloaded +~ amount) | ||
131 | > {-# INLINE downloadedProgress #-} | ||
132 | |||
133 | > -- | Used when the client upload some data to /any/ peer. | ||
134 | > uploadedProgress :: Int -> Progress -> Progress | ||
135 | > uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
136 | > {-# INLINE uploadedProgress #-} | ||
137 | |||
138 | > -- | Used when leecher join client session. | ||
139 | > enqueuedProgress :: Integer -> Progress -> Progress | ||
140 | > enqueuedProgress amount = left +~ amount | ||
141 | > {-# INLINE enqueuedProgress #-} | ||
142 | |||
143 | > -- | Used when leecher leave client session. | ||
144 | > -- (e.g. user deletes not completed torrent) | ||
145 | > dequeuedProgress :: Integer -> Progress -> Progress | ||
146 | > dequeuedProgress amount = left -~ amount | ||
147 | > {-# INLINE dequeuedProgress #-} | ||
148 | |||
149 | > startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO () | ||
150 | > startService s port m = do | ||
151 | > stopService s | ||
152 | > putMVar s =<< spawn | ||
153 | > where | ||
154 | > spawn = ClientService port <$> forkIO (m port) | ||
155 | |||
156 | > stopService :: MVar ClientService -> IO () | ||
157 | > stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread) | ||
158 | |||
159 | Service A might depend on service B. | ||
160 | |||
161 | > withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO () | ||
162 | > withRunning dep failure action = tryTakeMVar dep >>= maybe failure action | ||
163 | |||
164 | Torrent presence | ||
165 | ------------------------------------------------------------------------ | ||
166 | |||
167 | > data TorrentPresence = Active SwarmSession | ||
168 | > | Registered TorrentLoc | ||
169 | > | Unknown | ||
170 | |||
171 | > torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence | ||
172 | > torrentPresence ClientSession {..} ih = do | ||
173 | > sws <- readTVarIO swarmSessions | ||
174 | > case M.lookup ih sws of | ||
175 | > Just ss -> return $ Active ss | ||
176 | > Nothing -> do | ||
177 | > tm <- readTVarIO torrentMap | ||
178 | > return $ maybe Unknown Registered $ HM.lookup ih tm | ||
179 | |||
180 | Retrieving client info | ||
181 | ------------------------------------------------------------------------ | ||
182 | |||
183 | > -- | Get current global progress of the client. This value is usually | ||
184 | > -- shown to a user. | ||
185 | > getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
186 | > getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
187 | |||
188 | > -- | Get number of swarms client aware of. | ||
189 | > getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
190 | > getSwarmCount ClientSession {..} = liftIO $ | ||
191 | > M.size <$> readTVarIO swarmSessions | ||
192 | |||
193 | > -- | Get number of peers the client currently connected to. | ||
194 | > getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
195 | > getPeerCount ClientSession {..} = liftIO $ do | ||
196 | > unused <- peekAvail activeThreads | ||
197 | > return (maxActive - unused) | ||
198 | |||
199 | > -- | Create a new client session. The data passed to this function are | ||
200 | > -- usually loaded from configuration file. | ||
201 | > openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions. | ||
202 | > -> [Extension] -- ^ Extensions allowed to use. | ||
203 | > -> IO ClientSession -- ^ Client with unique peer ID. | ||
204 | |||
205 | > openClientSession n exts = do | ||
206 | > mgr <- Ev.new | ||
207 | > -- TODO kill this thread when leave client | ||
208 | > _ <- forkIO $ loop mgr | ||
209 | > | ||
210 | > ClientSession | ||
211 | > <$> genPeerId | ||
212 | > <*> pure exts | ||
213 | > <*> newEmptyMVar | ||
214 | > <*> newEmptyMVar | ||
215 | > <*> MSem.new n | ||
216 | > <*> pure n | ||
217 | > <*> newTVarIO M.empty | ||
218 | > <*> pure mgr | ||
219 | > <*> newTVarIO (startProgress 0) | ||
220 | > <*> newTVarIO HM.empty | ||
221 | |||
222 | > closeClientSession :: ClientSession -> IO () | ||
223 | > closeClientSession ClientSession {..} = | ||
224 | > stopService nodeListener `finally` stopService peerListener | ||
225 | |||
226 | > withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO () | ||
227 | > withClientSession c es = bracket (openClientSession c es) closeClientSession | ||
228 | |||
229 | > listenerPort :: ClientSession -> IO PortNumber | ||
230 | > listenerPort ClientSession {..} = servPort <$> readMVar peerListener | ||
231 | |||
232 | > dhtPort :: ClientSession -> IO PortNumber | ||
233 | > dhtPort ClientSession {..} = servPort <$> readMVar nodeListener | ||
234 | |||
235 | |||
236 | > defSeederConns :: SessionCount | ||
237 | > defSeederConns = defaultUnchokeSlots | ||
238 | |||
239 | > defLeacherConns :: SessionCount | ||
240 | > defLeacherConns = defaultNumWant | ||
241 | |||
242 | > newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | ||
243 | > -> IO SwarmSession | ||
244 | > newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
245 | > = SwarmSession t cs | ||
246 | > <$> MSem.new n | ||
247 | > <*> newTVarIO bf | ||
248 | > <*> undefined | ||
249 | > <*> newTVarIO S.empty | ||
250 | > <*> newBroadcastTChanIO | ||
251 | |||
252 | -- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession | ||
253 | -- > openSwarmSession ClientSession {..} ih = do | ||
254 | -- > loc <- HM.lookup <$> readTVarIO torrentMap | ||
255 | -- > torrent <- validateLocation loc | ||
256 | -- > return undefined | ||
257 | |||
258 | > closeSwarmSession :: SwarmSession -> IO () | ||
259 | > closeSwarmSession se @ SwarmSession {..} = do | ||
260 | > unregisterSwarmSession se | ||
261 | > -- TODO stop discovery | ||
262 | > -- TODO killall peer sessions | ||
263 | > -- TODO the order is important! | ||
264 | > closeStorage storage | ||
265 | |||
266 | |||
267 | |||
268 | > unregisterSwarmSession :: SwarmSession -> IO () | ||
269 | > unregisterSwarmSession SwarmSession {..} = | ||
270 | > atomically $ modifyTVar (swarmSessions clientSession) $ | ||
271 | > M.delete $ tInfoHash torrentMeta | ||
272 | |||
273 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
274 | getSwarm cs @ ClientSession {..} ih = do | ||
275 | ss <- readTVarIO $ swarmSessions | ||
276 | case HM.lookup ih ss of | ||
277 | Just sw -> return sw | ||
278 | Nothing -> openSwarm cs | ||
279 | |||
280 | > newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
281 | > newSeeder cs t @ Torrent {..} | ||
282 | > = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
283 | |||
284 | > -- | New swarm in which the client allowed both download and upload. | ||
285 | > newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
286 | > newLeecher cs t @ Torrent {..} = do | ||
287 | > se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
288 | > atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
289 | > return se | ||
290 | |||
291 | > --isLeacher :: SwarmSession -> IO Bool | ||
292 | > --isLeacher = undefined | ||
293 | |||
294 | > -- | Get the number of connected peers in the given swarm. | ||
295 | > getSessionCount :: SwarmSession -> IO SessionCount | ||
296 | > getSessionCount SwarmSession {..} = do | ||
297 | > S.size <$> readTVarIO connectedPeers | ||
298 | |||
299 | > getClientBitfield :: SwarmSession -> IO Bitfield | ||
300 | > getClientBitfield = readTVarIO . clientBitfield | ||
301 | |||
302 | > swarmHandshake :: SwarmSession -> Handshake | ||
303 | > swarmHandshake SwarmSession {..} = Handshake { | ||
304 | > hsProtocol = defaultBTProtocol | ||
305 | > , hsReserved = encodeExts $ allowedExtensions $ clientSession | ||
306 | > , hsInfoHash = tInfoHash torrentMeta | ||
307 | > , hsPeerId = clientPeerId $ clientSession | ||
308 | > } | ||
309 | |||
310 | > {- | ||
311 | > haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () | ||
312 | > haveDone ix = | ||
313 | > liftIO $ atomically $ do | ||
314 | > bf <- readTVar clientBitfield | ||
315 | > writeTVar (have ix bf) | ||
316 | > currentProgress | ||
317 | > -} | ||
318 | |||
319 | Peer sessions throttling | ||
320 | ------------------------------------------------------------------------ | ||
321 | |||
322 | > -- | The number of threads suitable for a typical BT client. | ||
323 | > defaultThreadCount :: ThreadCount | ||
324 | > defaultThreadCount = 1000 | ||
325 | |||
326 | > enterSwarm :: SwarmSession -> IO () | ||
327 | > enterSwarm SwarmSession {..} = do | ||
328 | > MSem.wait (activeThreads clientSession) | ||
329 | > MSem.wait vacantPeers | ||
330 | |||
331 | > leaveSwarm :: SwarmSession -> IO () | ||
332 | > leaveSwarm SwarmSession {..} = do | ||
333 | > MSem.signal vacantPeers | ||
334 | > MSem.signal (activeThreads clientSession) | ||
335 | |||
336 | > waitVacancy :: SwarmSession -> IO () -> IO () | ||
337 | > waitVacancy se = | ||
338 | > bracket (enterSwarm se) (const (leaveSwarm se)) | ||
339 | > . const | ||
340 | |||
341 | > forkThrottle :: SwarmSession -> IO () -> IO ThreadId | ||
342 | > forkThrottle se action = do | ||
343 | > enterSwarm se | ||
344 | > (forkIO $ do | ||
345 | > action `finally` leaveSwarm se) | ||
346 | > `onException` leaveSwarm se | ||
347 | |||
348 | |||
349 | > findPieceCount :: PeerSession -> PieceCount | ||
350 | > findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | ||
351 | |||
352 | TODO: check content files location; | ||
353 | |||
354 | > validateLocation :: TorrentLoc -> IO Torrent | ||
355 | > validateLocation = fromFile . metafilePath | ||
356 | |||
357 | > registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO () | ||
358 | > registerTorrent = error "registerTorrent" | ||
359 | > {- | ||
360 | > Torrent {..} <- validateTorrent tl | ||
361 | > atomically $ modifyTVar' torrentMap $ HM.insert tInfoHash tl | ||
362 | > return (Just t) | ||
363 | > -} | ||
364 | |||
365 | > unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () | ||
366 | > unregisterTorrent = error "unregisterTorrent" | ||
367 | > -- modifyTVar' torrentMap $ HM.delete ih | ||
368 | |||
369 | > torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession | ||
370 | > torrentSwarm _ _ (Active sws) = return sws | ||
371 | > torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc | ||
372 | > torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih | ||
373 | |||
374 | > lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
375 | > lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih | ||
376 | |||
377 | Peer session creation | ||
378 | ------------------------------------------------------------------------ | ||
379 | |||
380 | The peer session cycle looks like: | ||
381 | |||
382 | * acquire vacant session and vacant thread slot; | ||
383 | * (fork could be here, but not necessary) | ||
384 | * establish peer connection; | ||
385 | * register peer session; | ||
386 | * ... exchange process ... | ||
387 | * unregister peer session; | ||
388 | * close peer connection; | ||
389 | * release acquired session and thread slot. | ||
390 | |||
391 | TODO: explain why this order | ||
392 | TODO: thread throttling | ||
393 | TODO: check if it connected yet peer | ||
394 | TODO: utilize peer Id. | ||
395 | TODO: use STM semaphore | ||
396 | |||
397 | > openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | ||
398 | > openSession ss @ SwarmSession {..} addr Handshake {..} = do | ||
399 | > let clientCaps = encodeExts $ allowedExtensions $ clientSession | ||
400 | > let enabled = decodeExts (enabledCaps clientCaps hsReserved) | ||
401 | > ps <- PeerSession addr ss enabled | ||
402 | > <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ()) | ||
403 | > <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ()) | ||
404 | > <*> atomically (dupTChan broadcastMessages) | ||
405 | > <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) | ||
406 | > -- TODO we could implement more interesting throtling scheme | ||
407 | > -- using connected peer information | ||
408 | > atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
409 | > return ps | ||
410 | |||
411 | > closeSession :: PeerSession -> IO () | ||
412 | > closeSession ps @ PeerSession {..} = do | ||
413 | > atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps) | ||
414 | |||
415 | > type PeerConn = (Socket, PeerSession) | ||
416 | > type Exchange = PeerConn -> IO () | ||
417 | |||
418 | > sendClientStatus :: PeerConn -> IO () | ||
419 | > sendClientStatus (sock, PeerSession {..}) = do | ||
420 | > cbf <- readTVarIO $ clientBitfield $ swarmSession | ||
421 | > sendAll sock $ encode $ Bitfield cbf | ||
422 | > | ||
423 | > port <- dhtPort $ clientSession swarmSession | ||
424 | > when (ExtDHT `elem` enabledExtensions) $ do | ||
425 | > sendAll sock $ encode $ Port port | ||
426 | |||
427 | Exchange action depends on session and socket, whereas session depends | ||
428 | on socket: | ||
429 | |||
430 | socket------>-----exchange | ||
431 | | | | ||
432 | \-->--session-->--/ | ||
433 | |||
434 | To handle exceptions properly we double bracket socket and session | ||
435 | then joining the resources and also ignoring session local exceptions. | ||
436 | |||
437 | > runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO () | ||
438 | > runSession connector opener action = | ||
439 | > handle isSessionException $ | ||
440 | > bracket connector close $ \sock -> | ||
441 | > bracket (opener sock) closeSession $ \ses -> | ||
442 | > action (sock, ses) | ||
443 | |||
444 | Used then the client want to connect to a peer. | ||
445 | |||
446 | > initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | ||
447 | > initiatePeerSession ss @ SwarmSession {..} addr | ||
448 | > = runSession (connectToPeer addr) initiated | ||
449 | > where | ||
450 | > initiated sock = do | ||
451 | > phs <- handshake sock (swarmHandshake ss) | ||
452 | > ps <- openSession ss addr phs | ||
453 | > sendClientStatus (sock, ps) | ||
454 | > return ps | ||
455 | |||
456 | Used the a peer want to connect to the client. | ||
457 | |||
458 | > acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO () | ||
459 | > acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted | ||
460 | > where | ||
461 | > accepted sock = do | ||
462 | > phs <- recvHandshake sock | ||
463 | > swarm <- lookupSwarm cs $ hsInfoHash phs | ||
464 | > ps <- openSession swarm addr phs | ||
465 | > sendHandshake sock $ Handshake { | ||
466 | > hsProtocol = defaultBTProtocol | ||
467 | > , hsReserved = encodeExts $ enabledExtensions ps | ||
468 | > , hsInfoHash = hsInfoHash phs | ||
469 | > , hsPeerId = clientPeerId | ||
470 | > } | ||
471 | > sendClientStatus (sock, ps) | ||
472 | > return ps | ||
473 | |||
474 | |||
475 | > listener :: ClientSession -> Exchange -> PortNumber -> IO () | ||
476 | > listener cs action serverPort = bracket openListener close loop | ||
477 | > where | ||
478 | > loop sock = forever $ handle isIOError $ do | ||
479 | > (conn, addr) <- accept sock | ||
480 | > case addr of | ||
481 | > SockAddrInet port host -> do | ||
482 | > acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
483 | > _ -> return () | ||
484 | > | ||
485 | > isIOError :: IOError -> IO () | ||
486 | > isIOError _ = return () | ||
487 | > | ||
488 | > openListener = do | ||
489 | > sock <- socket AF_INET Stream defaultProtocol | ||
490 | > bindSocket sock (SockAddrInet serverPort 0) | ||
491 | > listen sock 1 | ||
492 | > return sock | ||
493 | |||
494 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece | ||
495 | ------------------------------------------------------------------------ | ||
496 | |||
497 | Here we should enqueue broadcast messages and keep in mind that: | ||
498 | * We should enqueue broadcast events as they are appear. | ||
499 | * We should yield broadcast messages as fast as we get them. | ||
500 | |||
501 | these 2 phases might differ in time significantly | ||
502 | |||
503 | **TODO**: do this; but only when it'll be clean which other broadcast | ||
504 | messages & events we should send. | ||
505 | |||
506 | 1. Update client have bitfield --\____ in one transaction; | ||
507 | 2. Update downloaded stats --/ | ||
508 | 3. Signal to the all other peer about this. | ||
509 | |||
510 | > available :: Bitfield -> SwarmSession -> IO () | ||
511 | > available bf se @ SwarmSession {..} = {-# SCC available #-} do | ||
512 | > mark >> atomically broadcast | ||
513 | > where | ||
514 | > mark = do | ||
515 | > let piLen = ciPieceLength $ tInfo $ torrentMeta | ||
516 | > let bytes = piLen * BF.haveCount bf | ||
517 | > atomically $ do | ||
518 | > modifyTVar' clientBitfield (BF.union bf) | ||
519 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
520 | > | ||
521 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
522 | |||
523 | |||
524 | TODO compute size of messages: if it's faster to send Bitfield | ||
525 | instead many Have do that | ||
526 | |||
527 | Also if there is single Have message in queue then the | ||
528 | corresponding piece is likely still in memory or disc cache, | ||
529 | when we can send SuggestPiece. | ||
530 | |||
531 | Get pending messages queue appeared in result of asynchronously | ||
532 | changed client state. Resulting queue should be sent to a peer | ||
533 | immediately. | ||
534 | |||
535 | > getPending :: PeerSession -> IO [Message] | ||
536 | > getPending PeerSession {..} = {-# SCC getPending #-} do | ||
537 | > atomically (readAvail pendingMessages) | ||
538 | |||
539 | > readAvail :: TChan a -> STM [a] | ||
540 | > readAvail chan = do | ||
541 | > m <- tryReadTChan chan | ||
542 | > case m of | ||
543 | > Just a -> (:) <$> pure a <*> readAvail chan | ||
544 | > Nothing -> return [] | ||
545 | |||
546 | Timeouts | ||
547 | ----------------------------------------------------------------------- | ||
548 | |||
549 | for internal use only | ||
550 | |||
551 | > sec :: Int | ||
552 | > sec = 1000 * 1000 | ||
553 | |||
554 | > maxIncomingTime :: Int | ||
555 | > maxIncomingTime = 120 * sec | ||
556 | |||
557 | > maxOutcomingTime :: Int | ||
558 | > maxOutcomingTime = 1 * sec | ||
559 | |||
560 | > -- | Should be called after we have received any message from a peer. | ||
561 | > updateIncoming :: PeerSession -> IO () | ||
562 | > updateIncoming PeerSession {..} = do | ||
563 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
564 | > incomingTimeout maxIncomingTime | ||
565 | |||
566 | > -- | Should be called before we have send any message to a peer. | ||
567 | > updateOutcoming :: PeerSession -> IO () | ||
568 | > updateOutcoming PeerSession {..} = | ||
569 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
570 | > outcomingTimeout maxOutcomingTime | ||
571 | |||
572 | > sendKA :: Socket -> IO () | ||
573 | > sendKA sock {- SwarmSession {..} -} = do | ||
574 | > return () | ||
575 | > -- print "I'm sending keep alive." | ||
576 | > -- sendAll sock (encode BT.KeepAlive) | ||
577 | > -- let mgr = eventManager clientSession | ||
578 | > -- updateTimeout mgr | ||
579 | > -- print "Done.." | ||