diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent.hs | 3 | ||||
-rw-r--r-- | src/Network/BitTorrent/Discovery.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.lhs | 579 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 447 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions/Types.lhs | 517 | ||||
-rw-r--r-- | src/Network/BitTorrent/Tracker.hs | 2 |
7 files changed, 969 insertions, 583 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index acb3700c..c68cceac 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs | |||
@@ -82,7 +82,8 @@ import Network | |||
82 | 82 | ||
83 | import Data.Bitfield as BF | 83 | import Data.Bitfield as BF |
84 | import Data.Torrent | 84 | import Data.Torrent |
85 | import Network.BitTorrent.Internal | 85 | import Network.BitTorrent.Sessions.Types |
86 | import Network.BitTorrent.Sessions | ||
86 | import Network.BitTorrent.Peer | 87 | import Network.BitTorrent.Peer |
87 | import Network.BitTorrent.Extension | 88 | import Network.BitTorrent.Extension |
88 | import Network.BitTorrent.Exchange | 89 | import Network.BitTorrent.Exchange |
diff --git a/src/Network/BitTorrent/Discovery.hs b/src/Network/BitTorrent/Discovery.hs index 222dfe56..8403461c 100644 --- a/src/Network/BitTorrent/Discovery.hs +++ b/src/Network/BitTorrent/Discovery.hs | |||
@@ -11,7 +11,7 @@ import Network.Socket | |||
11 | 11 | ||
12 | import Data.Torrent | 12 | import Data.Torrent |
13 | import Network.BitTorrent.Peer | 13 | import Network.BitTorrent.Peer |
14 | import Network.BitTorrent.Internal | 14 | import Network.BitTorrent.Sessions |
15 | import Network.BitTorrent.Exchange | 15 | import Network.BitTorrent.Exchange |
16 | import Network.BitTorrent.Tracker | 16 | import Network.BitTorrent.Tracker |
17 | import Network.BitTorrent.DHT | 17 | import Network.BitTorrent.DHT |
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 6ba56a22..32ab493d 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -88,10 +88,10 @@ import Text.PrettyPrint as PP hiding (($$)) | |||
88 | import Network | 88 | import Network |
89 | 89 | ||
90 | import Data.Bitfield as BF | 90 | import Data.Bitfield as BF |
91 | import Network.BitTorrent.Internal | ||
92 | import Network.BitTorrent.Extension | 91 | import Network.BitTorrent.Extension |
93 | import Network.BitTorrent.Peer | 92 | import Network.BitTorrent.Peer |
94 | import Network.BitTorrent.Exchange.Protocol | 93 | import Network.BitTorrent.Exchange.Protocol |
94 | import Network.BitTorrent.Sessions.Types | ||
95 | import System.Torrent.Storage | 95 | import System.Torrent.Storage |
96 | 96 | ||
97 | {----------------------------------------------------------------------- | 97 | {----------------------------------------------------------------------- |
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs deleted file mode 100644 index ef4165e3..00000000 --- a/src/Network/BitTorrent/Internal.lhs +++ /dev/null | |||
@@ -1,579 +0,0 @@ | |||
1 | > -- | | ||
2 | > -- Copyright : (c) Sam T. 2013 | ||
3 | > -- License : MIT | ||
4 | > -- Maintainer : pxqr.sta@gmail.com | ||
5 | > -- Stability : experimental | ||
6 | > -- Portability : portable | ||
7 | > -- | ||
8 | > -- This module implement opaque broadcast message passing. It | ||
9 | > -- provides sessions needed by Network.BitTorrent and | ||
10 | > -- Network.BitTorrent.Exchange and modules. To hide some internals | ||
11 | > -- of this module we detach it from Exchange. | ||
12 | > -- NOTE: Expose only static data in data field lists, all dynamic | ||
13 | > -- data should be modified through standalone functions. | ||
14 | > -- | ||
15 | > | ||
16 | > {-# LANGUAGE OverloadedStrings #-} | ||
17 | > {-# LANGUAGE RecordWildCards #-} | ||
18 | > {-# LANGUAGE ViewPatterns #-} | ||
19 | > | ||
20 | > module Network.BitTorrent.Internal | ||
21 | > ( -- * Progress | ||
22 | > Progress(..), startProgress | ||
23 | > | ||
24 | > , ClientService(..) | ||
25 | > , startService | ||
26 | > , withRunning | ||
27 | > | ||
28 | > -- * Client | ||
29 | > , ClientSession ( ClientSession | ||
30 | > , clientPeerId, allowedExtensions | ||
31 | > , nodeListener, peerListener | ||
32 | > ) | ||
33 | > , withClientSession | ||
34 | > , listenerPort, dhtPort | ||
35 | > | ||
36 | > , ThreadCount | ||
37 | > , defaultThreadCount | ||
38 | > | ||
39 | > , TorrentLoc(..) | ||
40 | > , registerTorrent | ||
41 | > , unregisterTorrent | ||
42 | > | ||
43 | > , getCurrentProgress | ||
44 | > , getSwarmCount | ||
45 | > , getPeerCount | ||
46 | > | ||
47 | > -- * Swarm | ||
48 | > , SwarmSession( SwarmSession, torrentMeta, clientSession ) | ||
49 | > | ||
50 | > , SessionCount | ||
51 | > , getSessionCount | ||
52 | > | ||
53 | > , newLeecher | ||
54 | > , newSeeder | ||
55 | > , getClientBitfield | ||
56 | > | ||
57 | > , waitVacancy | ||
58 | > , forkThrottle | ||
59 | > | ||
60 | > -- * Peer | ||
61 | > , PeerSession( PeerSession, connectedPeerAddr | ||
62 | > , swarmSession, enabledExtensions | ||
63 | > , sessionState | ||
64 | > ) | ||
65 | > , SessionState | ||
66 | > , initiatePeerSession | ||
67 | > , acceptPeerSession | ||
68 | > , listener | ||
69 | > | ||
70 | > -- ** Broadcasting | ||
71 | > , available | ||
72 | > , getPending | ||
73 | > | ||
74 | > -- ** Exceptions | ||
75 | > , SessionException(..) | ||
76 | > , isSessionException | ||
77 | > , putSessionException | ||
78 | > | ||
79 | > -- ** Properties | ||
80 | > , bitfield, status | ||
81 | > , findPieceCount | ||
82 | > | ||
83 | > -- * Timeouts | ||
84 | > , updateIncoming, updateOutcoming | ||
85 | > ) where | ||
86 | |||
87 | > import Prelude hiding (mapM_) | ||
88 | |||
89 | > import Control.Applicative | ||
90 | > import Control.Concurrent | ||
91 | > import Control.Concurrent.STM | ||
92 | > import Control.Concurrent.MSem as MSem | ||
93 | > import Control.Lens | ||
94 | > import Control.Monad (when, forever, (>=>)) | ||
95 | > import Control.Exception | ||
96 | > import Control.Monad.Trans | ||
97 | |||
98 | > import Data.IORef | ||
99 | > import Data.Foldable (mapM_) | ||
100 | > import Data.Map as M | ||
101 | > import Data.HashMap.Strict as HM | ||
102 | > import Data.Set as S | ||
103 | |||
104 | > import Data.Serialize hiding (get) | ||
105 | |||
106 | > import Network hiding (accept) | ||
107 | > import Network.Socket | ||
108 | > import Network.Socket.ByteString | ||
109 | |||
110 | > import GHC.Event as Ev | ||
111 | |||
112 | > import Data.Bitfield as BF | ||
113 | > import Data.Torrent | ||
114 | > import Network.BitTorrent.Extension | ||
115 | > import Network.BitTorrent.Peer | ||
116 | > import Network.BitTorrent.Exchange.Protocol as BT | ||
117 | > import Network.BitTorrent.Tracker.Protocol as BT | ||
118 | > import System.Torrent.Storage | ||
119 | > import Network.BitTorrent.Sessions.Types | ||
120 | |||
121 | |||
122 | > -- | Initial progress is used when there are no session before. | ||
123 | > startProgress :: Integer -> Progress | ||
124 | > startProgress = Progress 0 0 | ||
125 | |||
126 | > -- | Used when the client download some data from /any/ peer. | ||
127 | > downloadedProgress :: Int -> Progress -> Progress | ||
128 | > downloadedProgress (fromIntegral -> amount) | ||
129 | > = (left -~ amount) | ||
130 | > . (downloaded +~ amount) | ||
131 | > {-# INLINE downloadedProgress #-} | ||
132 | |||
133 | > -- | Used when the client upload some data to /any/ peer. | ||
134 | > uploadedProgress :: Int -> Progress -> Progress | ||
135 | > uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
136 | > {-# INLINE uploadedProgress #-} | ||
137 | |||
138 | > -- | Used when leecher join client session. | ||
139 | > enqueuedProgress :: Integer -> Progress -> Progress | ||
140 | > enqueuedProgress amount = left +~ amount | ||
141 | > {-# INLINE enqueuedProgress #-} | ||
142 | |||
143 | > -- | Used when leecher leave client session. | ||
144 | > -- (e.g. user deletes not completed torrent) | ||
145 | > dequeuedProgress :: Integer -> Progress -> Progress | ||
146 | > dequeuedProgress amount = left -~ amount | ||
147 | > {-# INLINE dequeuedProgress #-} | ||
148 | |||
149 | > startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO () | ||
150 | > startService s port m = do | ||
151 | > stopService s | ||
152 | > putMVar s =<< spawn | ||
153 | > where | ||
154 | > spawn = ClientService port <$> forkIO (m port) | ||
155 | |||
156 | > stopService :: MVar ClientService -> IO () | ||
157 | > stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread) | ||
158 | |||
159 | Service A might depend on service B. | ||
160 | |||
161 | > withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO () | ||
162 | > withRunning dep failure action = tryTakeMVar dep >>= maybe failure action | ||
163 | |||
164 | Torrent presence | ||
165 | ------------------------------------------------------------------------ | ||
166 | |||
167 | > data TorrentPresence = Active SwarmSession | ||
168 | > | Registered TorrentLoc | ||
169 | > | Unknown | ||
170 | |||
171 | > torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence | ||
172 | > torrentPresence ClientSession {..} ih = do | ||
173 | > sws <- readTVarIO swarmSessions | ||
174 | > case M.lookup ih sws of | ||
175 | > Just ss -> return $ Active ss | ||
176 | > Nothing -> do | ||
177 | > tm <- readTVarIO torrentMap | ||
178 | > return $ maybe Unknown Registered $ HM.lookup ih tm | ||
179 | |||
180 | Retrieving client info | ||
181 | ------------------------------------------------------------------------ | ||
182 | |||
183 | > -- | Get current global progress of the client. This value is usually | ||
184 | > -- shown to a user. | ||
185 | > getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
186 | > getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
187 | |||
188 | > -- | Get number of swarms client aware of. | ||
189 | > getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
190 | > getSwarmCount ClientSession {..} = liftIO $ | ||
191 | > M.size <$> readTVarIO swarmSessions | ||
192 | |||
193 | > -- | Get number of peers the client currently connected to. | ||
194 | > getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
195 | > getPeerCount ClientSession {..} = liftIO $ do | ||
196 | > unused <- peekAvail activeThreads | ||
197 | > return (maxActive - unused) | ||
198 | |||
199 | > -- | Create a new client session. The data passed to this function are | ||
200 | > -- usually loaded from configuration file. | ||
201 | > openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions. | ||
202 | > -> [Extension] -- ^ Extensions allowed to use. | ||
203 | > -> IO ClientSession -- ^ Client with unique peer ID. | ||
204 | |||
205 | > openClientSession n exts = do | ||
206 | > mgr <- Ev.new | ||
207 | > -- TODO kill this thread when leave client | ||
208 | > _ <- forkIO $ loop mgr | ||
209 | > | ||
210 | > ClientSession | ||
211 | > <$> genPeerId | ||
212 | > <*> pure exts | ||
213 | > <*> newEmptyMVar | ||
214 | > <*> newEmptyMVar | ||
215 | > <*> MSem.new n | ||
216 | > <*> pure n | ||
217 | > <*> newTVarIO M.empty | ||
218 | > <*> pure mgr | ||
219 | > <*> newTVarIO (startProgress 0) | ||
220 | > <*> newTVarIO HM.empty | ||
221 | |||
222 | > closeClientSession :: ClientSession -> IO () | ||
223 | > closeClientSession ClientSession {..} = | ||
224 | > stopService nodeListener `finally` stopService peerListener | ||
225 | |||
226 | > withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO () | ||
227 | > withClientSession c es = bracket (openClientSession c es) closeClientSession | ||
228 | |||
229 | > listenerPort :: ClientSession -> IO PortNumber | ||
230 | > listenerPort ClientSession {..} = servPort <$> readMVar peerListener | ||
231 | |||
232 | > dhtPort :: ClientSession -> IO PortNumber | ||
233 | > dhtPort ClientSession {..} = servPort <$> readMVar nodeListener | ||
234 | |||
235 | |||
236 | > defSeederConns :: SessionCount | ||
237 | > defSeederConns = defaultUnchokeSlots | ||
238 | |||
239 | > defLeacherConns :: SessionCount | ||
240 | > defLeacherConns = defaultNumWant | ||
241 | |||
242 | > newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | ||
243 | > -> IO SwarmSession | ||
244 | > newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
245 | > = SwarmSession t cs | ||
246 | > <$> MSem.new n | ||
247 | > <*> newTVarIO bf | ||
248 | > <*> undefined | ||
249 | > <*> newTVarIO S.empty | ||
250 | > <*> newBroadcastTChanIO | ||
251 | |||
252 | -- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession | ||
253 | -- > openSwarmSession ClientSession {..} ih = do | ||
254 | -- > loc <- HM.lookup <$> readTVarIO torrentMap | ||
255 | -- > torrent <- validateLocation loc | ||
256 | -- > return undefined | ||
257 | |||
258 | > closeSwarmSession :: SwarmSession -> IO () | ||
259 | > closeSwarmSession se @ SwarmSession {..} = do | ||
260 | > unregisterSwarmSession se | ||
261 | > -- TODO stop discovery | ||
262 | > -- TODO killall peer sessions | ||
263 | > -- TODO the order is important! | ||
264 | > closeStorage storage | ||
265 | |||
266 | |||
267 | |||
268 | > unregisterSwarmSession :: SwarmSession -> IO () | ||
269 | > unregisterSwarmSession SwarmSession {..} = | ||
270 | > atomically $ modifyTVar (swarmSessions clientSession) $ | ||
271 | > M.delete $ tInfoHash torrentMeta | ||
272 | |||
273 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
274 | getSwarm cs @ ClientSession {..} ih = do | ||
275 | ss <- readTVarIO $ swarmSessions | ||
276 | case HM.lookup ih ss of | ||
277 | Just sw -> return sw | ||
278 | Nothing -> openSwarm cs | ||
279 | |||
280 | > newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
281 | > newSeeder cs t @ Torrent {..} | ||
282 | > = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
283 | |||
284 | > -- | New swarm in which the client allowed both download and upload. | ||
285 | > newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
286 | > newLeecher cs t @ Torrent {..} = do | ||
287 | > se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
288 | > atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
289 | > return se | ||
290 | |||
291 | > --isLeacher :: SwarmSession -> IO Bool | ||
292 | > --isLeacher = undefined | ||
293 | |||
294 | > -- | Get the number of connected peers in the given swarm. | ||
295 | > getSessionCount :: SwarmSession -> IO SessionCount | ||
296 | > getSessionCount SwarmSession {..} = do | ||
297 | > S.size <$> readTVarIO connectedPeers | ||
298 | |||
299 | > getClientBitfield :: SwarmSession -> IO Bitfield | ||
300 | > getClientBitfield = readTVarIO . clientBitfield | ||
301 | |||
302 | > swarmHandshake :: SwarmSession -> Handshake | ||
303 | > swarmHandshake SwarmSession {..} = Handshake { | ||
304 | > hsProtocol = defaultBTProtocol | ||
305 | > , hsReserved = encodeExts $ allowedExtensions $ clientSession | ||
306 | > , hsInfoHash = tInfoHash torrentMeta | ||
307 | > , hsPeerId = clientPeerId $ clientSession | ||
308 | > } | ||
309 | |||
310 | > {- | ||
311 | > haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () | ||
312 | > haveDone ix = | ||
313 | > liftIO $ atomically $ do | ||
314 | > bf <- readTVar clientBitfield | ||
315 | > writeTVar (have ix bf) | ||
316 | > currentProgress | ||
317 | > -} | ||
318 | |||
319 | Peer sessions throttling | ||
320 | ------------------------------------------------------------------------ | ||
321 | |||
322 | > -- | The number of threads suitable for a typical BT client. | ||
323 | > defaultThreadCount :: ThreadCount | ||
324 | > defaultThreadCount = 1000 | ||
325 | |||
326 | > enterSwarm :: SwarmSession -> IO () | ||
327 | > enterSwarm SwarmSession {..} = do | ||
328 | > MSem.wait (activeThreads clientSession) | ||
329 | > MSem.wait vacantPeers | ||
330 | |||
331 | > leaveSwarm :: SwarmSession -> IO () | ||
332 | > leaveSwarm SwarmSession {..} = do | ||
333 | > MSem.signal vacantPeers | ||
334 | > MSem.signal (activeThreads clientSession) | ||
335 | |||
336 | > waitVacancy :: SwarmSession -> IO () -> IO () | ||
337 | > waitVacancy se = | ||
338 | > bracket (enterSwarm se) (const (leaveSwarm se)) | ||
339 | > . const | ||
340 | |||
341 | > forkThrottle :: SwarmSession -> IO () -> IO ThreadId | ||
342 | > forkThrottle se action = do | ||
343 | > enterSwarm se | ||
344 | > (forkIO $ do | ||
345 | > action `finally` leaveSwarm se) | ||
346 | > `onException` leaveSwarm se | ||
347 | |||
348 | |||
349 | > findPieceCount :: PeerSession -> PieceCount | ||
350 | > findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | ||
351 | |||
352 | TODO: check content files location; | ||
353 | |||
354 | > validateLocation :: TorrentLoc -> IO Torrent | ||
355 | > validateLocation = fromFile . metafilePath | ||
356 | |||
357 | > registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO () | ||
358 | > registerTorrent = error "registerTorrent" | ||
359 | > {- | ||
360 | > Torrent {..} <- validateTorrent tl | ||
361 | > atomically $ modifyTVar' torrentMap $ HM.insert tInfoHash tl | ||
362 | > return (Just t) | ||
363 | > -} | ||
364 | |||
365 | > unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () | ||
366 | > unregisterTorrent = error "unregisterTorrent" | ||
367 | > -- modifyTVar' torrentMap $ HM.delete ih | ||
368 | |||
369 | > torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession | ||
370 | > torrentSwarm _ _ (Active sws) = return sws | ||
371 | > torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc | ||
372 | > torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih | ||
373 | |||
374 | > lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
375 | > lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih | ||
376 | |||
377 | Peer session creation | ||
378 | ------------------------------------------------------------------------ | ||
379 | |||
380 | The peer session cycle looks like: | ||
381 | |||
382 | * acquire vacant session and vacant thread slot; | ||
383 | * (fork could be here, but not necessary) | ||
384 | * establish peer connection; | ||
385 | * register peer session; | ||
386 | * ... exchange process ... | ||
387 | * unregister peer session; | ||
388 | * close peer connection; | ||
389 | * release acquired session and thread slot. | ||
390 | |||
391 | TODO: explain why this order | ||
392 | TODO: thread throttling | ||
393 | TODO: check if it connected yet peer | ||
394 | TODO: utilize peer Id. | ||
395 | TODO: use STM semaphore | ||
396 | |||
397 | > openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | ||
398 | > openSession ss @ SwarmSession {..} addr Handshake {..} = do | ||
399 | > let clientCaps = encodeExts $ allowedExtensions $ clientSession | ||
400 | > let enabled = decodeExts (enabledCaps clientCaps hsReserved) | ||
401 | > ps <- PeerSession addr ss enabled | ||
402 | > <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ()) | ||
403 | > <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ()) | ||
404 | > <*> atomically (dupTChan broadcastMessages) | ||
405 | > <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) | ||
406 | > -- TODO we could implement more interesting throtling scheme | ||
407 | > -- using connected peer information | ||
408 | > atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
409 | > return ps | ||
410 | |||
411 | > closeSession :: PeerSession -> IO () | ||
412 | > closeSession ps @ PeerSession {..} = do | ||
413 | > atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps) | ||
414 | |||
415 | > type PeerConn = (Socket, PeerSession) | ||
416 | > type Exchange = PeerConn -> IO () | ||
417 | |||
418 | > sendClientStatus :: PeerConn -> IO () | ||
419 | > sendClientStatus (sock, PeerSession {..}) = do | ||
420 | > cbf <- readTVarIO $ clientBitfield $ swarmSession | ||
421 | > sendAll sock $ encode $ Bitfield cbf | ||
422 | > | ||
423 | > port <- dhtPort $ clientSession swarmSession | ||
424 | > when (ExtDHT `elem` enabledExtensions) $ do | ||
425 | > sendAll sock $ encode $ Port port | ||
426 | |||
427 | Exchange action depends on session and socket, whereas session depends | ||
428 | on socket: | ||
429 | |||
430 | socket------>-----exchange | ||
431 | | | | ||
432 | \-->--session-->--/ | ||
433 | |||
434 | To handle exceptions properly we double bracket socket and session | ||
435 | then joining the resources and also ignoring session local exceptions. | ||
436 | |||
437 | > runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO () | ||
438 | > runSession connector opener action = | ||
439 | > handle isSessionException $ | ||
440 | > bracket connector close $ \sock -> | ||
441 | > bracket (opener sock) closeSession $ \ses -> | ||
442 | > action (sock, ses) | ||
443 | |||
444 | Used then the client want to connect to a peer. | ||
445 | |||
446 | > initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | ||
447 | > initiatePeerSession ss @ SwarmSession {..} addr | ||
448 | > = runSession (connectToPeer addr) initiated | ||
449 | > where | ||
450 | > initiated sock = do | ||
451 | > phs <- handshake sock (swarmHandshake ss) | ||
452 | > ps <- openSession ss addr phs | ||
453 | > sendClientStatus (sock, ps) | ||
454 | > return ps | ||
455 | |||
456 | Used the a peer want to connect to the client. | ||
457 | |||
458 | > acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO () | ||
459 | > acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted | ||
460 | > where | ||
461 | > accepted sock = do | ||
462 | > phs <- recvHandshake sock | ||
463 | > swarm <- lookupSwarm cs $ hsInfoHash phs | ||
464 | > ps <- openSession swarm addr phs | ||
465 | > sendHandshake sock $ Handshake { | ||
466 | > hsProtocol = defaultBTProtocol | ||
467 | > , hsReserved = encodeExts $ enabledExtensions ps | ||
468 | > , hsInfoHash = hsInfoHash phs | ||
469 | > , hsPeerId = clientPeerId | ||
470 | > } | ||
471 | > sendClientStatus (sock, ps) | ||
472 | > return ps | ||
473 | |||
474 | |||
475 | > listener :: ClientSession -> Exchange -> PortNumber -> IO () | ||
476 | > listener cs action serverPort = bracket openListener close loop | ||
477 | > where | ||
478 | > loop sock = forever $ handle isIOError $ do | ||
479 | > (conn, addr) <- accept sock | ||
480 | > case addr of | ||
481 | > SockAddrInet port host -> do | ||
482 | > acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
483 | > _ -> return () | ||
484 | > | ||
485 | > isIOError :: IOError -> IO () | ||
486 | > isIOError _ = return () | ||
487 | > | ||
488 | > openListener = do | ||
489 | > sock <- socket AF_INET Stream defaultProtocol | ||
490 | > bindSocket sock (SockAddrInet serverPort 0) | ||
491 | > listen sock 1 | ||
492 | > return sock | ||
493 | |||
494 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece | ||
495 | ------------------------------------------------------------------------ | ||
496 | |||
497 | Here we should enqueue broadcast messages and keep in mind that: | ||
498 | * We should enqueue broadcast events as they are appear. | ||
499 | * We should yield broadcast messages as fast as we get them. | ||
500 | |||
501 | these 2 phases might differ in time significantly | ||
502 | |||
503 | **TODO**: do this; but only when it'll be clean which other broadcast | ||
504 | messages & events we should send. | ||
505 | |||
506 | 1. Update client have bitfield --\____ in one transaction; | ||
507 | 2. Update downloaded stats --/ | ||
508 | 3. Signal to the all other peer about this. | ||
509 | |||
510 | > available :: Bitfield -> SwarmSession -> IO () | ||
511 | > available bf se @ SwarmSession {..} = {-# SCC available #-} do | ||
512 | > mark >> atomically broadcast | ||
513 | > where | ||
514 | > mark = do | ||
515 | > let piLen = ciPieceLength $ tInfo $ torrentMeta | ||
516 | > let bytes = piLen * BF.haveCount bf | ||
517 | > atomically $ do | ||
518 | > modifyTVar' clientBitfield (BF.union bf) | ||
519 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
520 | > | ||
521 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
522 | |||
523 | |||
524 | TODO compute size of messages: if it's faster to send Bitfield | ||
525 | instead many Have do that | ||
526 | |||
527 | Also if there is single Have message in queue then the | ||
528 | corresponding piece is likely still in memory or disc cache, | ||
529 | when we can send SuggestPiece. | ||
530 | |||
531 | Get pending messages queue appeared in result of asynchronously | ||
532 | changed client state. Resulting queue should be sent to a peer | ||
533 | immediately. | ||
534 | |||
535 | > getPending :: PeerSession -> IO [Message] | ||
536 | > getPending PeerSession {..} = {-# SCC getPending #-} do | ||
537 | > atomically (readAvail pendingMessages) | ||
538 | |||
539 | > readAvail :: TChan a -> STM [a] | ||
540 | > readAvail chan = do | ||
541 | > m <- tryReadTChan chan | ||
542 | > case m of | ||
543 | > Just a -> (:) <$> pure a <*> readAvail chan | ||
544 | > Nothing -> return [] | ||
545 | |||
546 | Timeouts | ||
547 | ----------------------------------------------------------------------- | ||
548 | |||
549 | for internal use only | ||
550 | |||
551 | > sec :: Int | ||
552 | > sec = 1000 * 1000 | ||
553 | |||
554 | > maxIncomingTime :: Int | ||
555 | > maxIncomingTime = 120 * sec | ||
556 | |||
557 | > maxOutcomingTime :: Int | ||
558 | > maxOutcomingTime = 1 * sec | ||
559 | |||
560 | > -- | Should be called after we have received any message from a peer. | ||
561 | > updateIncoming :: PeerSession -> IO () | ||
562 | > updateIncoming PeerSession {..} = do | ||
563 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
564 | > incomingTimeout maxIncomingTime | ||
565 | |||
566 | > -- | Should be called before we have send any message to a peer. | ||
567 | > updateOutcoming :: PeerSession -> IO () | ||
568 | > updateOutcoming PeerSession {..} = | ||
569 | > updateTimeout (eventManager (clientSession swarmSession)) | ||
570 | > outcomingTimeout maxOutcomingTime | ||
571 | |||
572 | > sendKA :: Socket -> IO () | ||
573 | > sendKA sock {- SwarmSession {..} -} = do | ||
574 | > return () | ||
575 | > -- print "I'm sending keep alive." | ||
576 | > -- sendAll sock (encode BT.KeepAlive) | ||
577 | > -- let mgr = eventManager clientSession | ||
578 | > -- updateTimeout mgr | ||
579 | > -- print "Done.." | ||
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs new file mode 100644 index 00000000..31b30e43 --- /dev/null +++ b/src/Network/BitTorrent/Sessions.hs | |||
@@ -0,0 +1,447 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam T. 2013 | ||
3 | -- License : MIT | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | {-# LANGUAGE OverloadedStrings #-} | ||
9 | {-# LANGUAGE RecordWildCards #-} | ||
10 | module Network.BitTorrent.Sessions | ||
11 | ( -- * Progress | ||
12 | Progress(..), startProgress | ||
13 | , ClientService(..) | ||
14 | , startService | ||
15 | , withRunning | ||
16 | |||
17 | -- * Client | ||
18 | , ClientSession ( ClientSession | ||
19 | , clientPeerId, allowedExtensions | ||
20 | , nodeListener, peerListener | ||
21 | ) | ||
22 | , withClientSession | ||
23 | , listenerPort, dhtPort | ||
24 | |||
25 | , ThreadCount | ||
26 | , defaultThreadCount | ||
27 | |||
28 | , TorrentLoc(..) | ||
29 | , registerTorrent | ||
30 | , unregisterTorrent | ||
31 | |||
32 | , getCurrentProgress | ||
33 | , getSwarmCount | ||
34 | , getPeerCount | ||
35 | |||
36 | -- * Swarm | ||
37 | , SwarmSession( SwarmSession, torrentMeta, clientSession ) | ||
38 | |||
39 | , SessionCount | ||
40 | , getSessionCount | ||
41 | |||
42 | , newLeecher | ||
43 | , newSeeder | ||
44 | , getClientBitfield | ||
45 | |||
46 | -- TODO hide this | ||
47 | , waitVacancy | ||
48 | , forkThrottle | ||
49 | |||
50 | -- * Peer | ||
51 | , PeerSession( PeerSession, connectedPeerAddr | ||
52 | , swarmSession, enabledExtensions | ||
53 | , sessionState | ||
54 | ) | ||
55 | , SessionState | ||
56 | , initiatePeerSession | ||
57 | , acceptPeerSession | ||
58 | , listener | ||
59 | |||
60 | -- * Timeouts | ||
61 | , updateIncoming, updateOutcoming | ||
62 | ) where | ||
63 | |||
64 | import Prelude hiding (mapM_) | ||
65 | |||
66 | import Control.Applicative | ||
67 | import Control.Concurrent | ||
68 | import Control.Concurrent.STM | ||
69 | import Control.Concurrent.MSem as MSem | ||
70 | import Control.Lens | ||
71 | import Control.Monad (when, forever, (>=>)) | ||
72 | import Control.Exception | ||
73 | import Control.Monad.Trans | ||
74 | |||
75 | import Data.IORef | ||
76 | import Data.Foldable (mapM_) | ||
77 | import Data.Map as M | ||
78 | import Data.HashMap.Strict as HM | ||
79 | import Data.Set as S | ||
80 | |||
81 | import Data.Serialize hiding (get) | ||
82 | |||
83 | import Network hiding (accept) | ||
84 | import Network.Socket | ||
85 | import Network.Socket.ByteString | ||
86 | |||
87 | import GHC.Event as Ev | ||
88 | |||
89 | import Data.Bitfield as BF | ||
90 | import Data.Torrent | ||
91 | import Network.BitTorrent.Extension | ||
92 | import Network.BitTorrent.Peer | ||
93 | import Network.BitTorrent.Exchange.Protocol as BT | ||
94 | import Network.BitTorrent.Tracker.Protocol as BT | ||
95 | import System.Torrent.Storage | ||
96 | import Network.BitTorrent.Sessions.Types | ||
97 | |||
98 | {----------------------------------------------------------------------- | ||
99 | Client Services | ||
100 | -----------------------------------------------------------------------} | ||
101 | |||
102 | startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO () | ||
103 | startService s port m = do | ||
104 | stopService s | ||
105 | putMVar s =<< spawn | ||
106 | where | ||
107 | spawn = ClientService port <$> forkIO (m port) | ||
108 | |||
109 | stopService :: MVar ClientService -> IO () | ||
110 | stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread) | ||
111 | |||
112 | -- | Service A might depend on service B. | ||
113 | withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO () | ||
114 | withRunning dep failure action = tryTakeMVar dep >>= maybe failure action | ||
115 | |||
116 | {----------------------------------------------------------------------- | ||
117 | Torrent presence | ||
118 | -----------------------------------------------------------------------} | ||
119 | |||
120 | data TorrentPresence = Active SwarmSession | ||
121 | | Registered TorrentLoc | ||
122 | | Unknown | ||
123 | |||
124 | torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence | ||
125 | torrentPresence ClientSession {..} ih = do | ||
126 | sws <- readTVarIO swarmSessions | ||
127 | case M.lookup ih sws of | ||
128 | Just ss -> return $ Active ss | ||
129 | Nothing -> do | ||
130 | tm <- readTVarIO torrentMap | ||
131 | return $ maybe Unknown Registered $ HM.lookup ih tm | ||
132 | |||
133 | {----------------------------------------------------------------------- | ||
134 | Client sessions | ||
135 | -----------------------------------------------------------------------} | ||
136 | |||
137 | -- | Create a new client session. The data passed to this function are | ||
138 | -- usually loaded from configuration file. | ||
139 | openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions. | ||
140 | -> [Extension] -- ^ Extensions allowed to use. | ||
141 | -> IO ClientSession -- ^ Client with unique peer ID. | ||
142 | openClientSession n exts = do | ||
143 | mgr <- Ev.new | ||
144 | -- TODO kill this thread when leave client | ||
145 | _ <- forkIO $ loop mgr | ||
146 | ClientSession | ||
147 | <$> genPeerId | ||
148 | <*> pure exts | ||
149 | <*> newEmptyMVar | ||
150 | <*> newEmptyMVar | ||
151 | <*> MSem.new n | ||
152 | <*> pure n | ||
153 | <*> newTVarIO M.empty | ||
154 | <*> pure mgr | ||
155 | <*> newTVarIO (startProgress 0) | ||
156 | <*> newTVarIO HM.empty | ||
157 | |||
158 | closeClientSession :: ClientSession -> IO () | ||
159 | closeClientSession ClientSession {..} = | ||
160 | stopService nodeListener `finally` stopService peerListener | ||
161 | -- TODO stop all swarm sessions | ||
162 | |||
163 | withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO () | ||
164 | withClientSession c es = bracket (openClientSession c es) closeClientSession | ||
165 | |||
166 | -- | Get current global progress of the client. This value is usually | ||
167 | -- shown to a user. | ||
168 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
169 | getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
170 | |||
171 | -- | Get number of swarms client aware of. | ||
172 | getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
173 | getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions | ||
174 | |||
175 | -- | Get number of peers the client currently connected to. | ||
176 | getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
177 | getPeerCount ClientSession {..} = liftIO $ do | ||
178 | unused <- peekAvail activeThreads | ||
179 | return (maxActive - unused) | ||
180 | |||
181 | listenerPort :: ClientSession -> IO PortNumber | ||
182 | listenerPort ClientSession {..} = servPort <$> readMVar peerListener | ||
183 | |||
184 | dhtPort :: ClientSession -> IO PortNumber | ||
185 | dhtPort ClientSession {..} = servPort <$> readMVar nodeListener | ||
186 | |||
187 | {----------------------------------------------------------------------- | ||
188 | Swarm session | ||
189 | -----------------------------------------------------------------------} | ||
190 | |||
191 | defSeederConns :: SessionCount | ||
192 | defSeederConns = defaultUnchokeSlots | ||
193 | |||
194 | defLeacherConns :: SessionCount | ||
195 | defLeacherConns = defaultNumWant | ||
196 | |||
197 | newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | ||
198 | -> IO SwarmSession | ||
199 | newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
200 | = SwarmSession t cs | ||
201 | <$> MSem.new n | ||
202 | <*> newTVarIO bf | ||
203 | <*> undefined | ||
204 | <*> newTVarIO S.empty | ||
205 | <*> newBroadcastTChanIO | ||
206 | |||
207 | -- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession | ||
208 | -- > openSwarmSession ClientSession {..} ih = do | ||
209 | -- > loc <- HM.lookup <$> readTVarIO torrentMap | ||
210 | -- > torrent <- validateLocation loc | ||
211 | -- > return undefined | ||
212 | |||
213 | closeSwarmSession :: SwarmSession -> IO () | ||
214 | closeSwarmSession se @ SwarmSession {..} = do | ||
215 | unregisterSwarmSession se | ||
216 | -- TODO stop discovery | ||
217 | -- TODO killall peer sessions | ||
218 | -- TODO the order is important! | ||
219 | closeStorage storage | ||
220 | |||
221 | unregisterSwarmSession :: SwarmSession -> IO () | ||
222 | unregisterSwarmSession SwarmSession {..} = | ||
223 | atomically $ modifyTVar (swarmSessions clientSession) $ | ||
224 | M.delete $ tInfoHash torrentMeta | ||
225 | |||
226 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
227 | getSwarm cs @ ClientSession {..} ih = do | ||
228 | ss <- readTVarIO $ swarmSessions | ||
229 | case M.lookup ih ss of | ||
230 | Just sw -> return sw | ||
231 | Nothing -> undefined -- openSwarm cs | ||
232 | |||
233 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
234 | newSeeder cs t @ Torrent {..} | ||
235 | = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
236 | |||
237 | -- | New swarm in which the client allowed both download and upload. | ||
238 | newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
239 | newLeecher cs t @ Torrent {..} = do | ||
240 | se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
241 | atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
242 | return se | ||
243 | |||
244 | -- | Get the number of connected peers in the given swarm. | ||
245 | getSessionCount :: SwarmSession -> IO SessionCount | ||
246 | getSessionCount SwarmSession {..} = do | ||
247 | S.size <$> readTVarIO connectedPeers | ||
248 | |||
249 | swarmHandshake :: SwarmSession -> Handshake | ||
250 | swarmHandshake SwarmSession {..} = Handshake { | ||
251 | hsProtocol = defaultBTProtocol | ||
252 | , hsReserved = encodeExts $ allowedExtensions $ clientSession | ||
253 | , hsInfoHash = tInfoHash torrentMeta | ||
254 | , hsPeerId = clientPeerId $ clientSession | ||
255 | } | ||
256 | |||
257 | {----------------------------------------------------------------------- | ||
258 | Peer sessions throttling | ||
259 | -----------------------------------------------------------------------} | ||
260 | |||
261 | -- | The number of threads suitable for a typical BT client. | ||
262 | defaultThreadCount :: ThreadCount | ||
263 | defaultThreadCount = 1000 | ||
264 | |||
265 | enterSwarm :: SwarmSession -> IO () | ||
266 | enterSwarm SwarmSession {..} = do | ||
267 | MSem.wait (activeThreads clientSession) | ||
268 | MSem.wait vacantPeers | ||
269 | |||
270 | leaveSwarm :: SwarmSession -> IO () | ||
271 | leaveSwarm SwarmSession {..} = do | ||
272 | MSem.signal vacantPeers | ||
273 | MSem.signal (activeThreads clientSession) | ||
274 | |||
275 | waitVacancy :: SwarmSession -> IO () -> IO () | ||
276 | waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const | ||
277 | |||
278 | forkThrottle :: SwarmSession -> IO () -> IO ThreadId | ||
279 | forkThrottle se action = do | ||
280 | enterSwarm se | ||
281 | (forkIO $ do | ||
282 | action `finally` leaveSwarm se) | ||
283 | `onException` leaveSwarm se | ||
284 | |||
285 | -- TODO: check content files location; | ||
286 | validateLocation :: TorrentLoc -> IO Torrent | ||
287 | validateLocation = fromFile . metafilePath | ||
288 | |||
289 | registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO () | ||
290 | registerTorrent = error "registerTorrent" | ||
291 | |||
292 | unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () | ||
293 | unregisterTorrent = error "unregisterTorrent" | ||
294 | |||
295 | torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession | ||
296 | torrentSwarm _ _ (Active sws) = return sws | ||
297 | torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc | ||
298 | torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih | ||
299 | |||
300 | lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
301 | lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih | ||
302 | |||
303 | {----------------------------------------------------------------------- | ||
304 | Peer session creation | ||
305 | ------------------------------------------------------------------------ | ||
306 | The peer session cycle looks like: | ||
307 | |||
308 | * acquire vacant session and vacant thread slot; | ||
309 | * (fork could be here, but not necessary) | ||
310 | * establish peer connection; | ||
311 | * register peer session; | ||
312 | * ... exchange process ... | ||
313 | * unregister peer session; | ||
314 | * close peer connection; | ||
315 | * release acquired session and thread slot. | ||
316 | |||
317 | TODO: explain why this order | ||
318 | TODO: thread throttling | ||
319 | TODO: check if it connected yet peer | ||
320 | TODO: utilize peer Id. | ||
321 | TODO: use STM semaphore | ||
322 | -----------------------------------------------------------------------} | ||
323 | |||
324 | openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | ||
325 | openSession ss @ SwarmSession {..} addr Handshake {..} = do | ||
326 | let clientCaps = encodeExts $ allowedExtensions $ clientSession | ||
327 | let enabled = decodeExts (enabledCaps clientCaps hsReserved) | ||
328 | ps <- PeerSession addr ss enabled | ||
329 | <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ()) | ||
330 | <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ()) | ||
331 | <*> atomically (dupTChan broadcastMessages) | ||
332 | <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) | ||
333 | -- TODO we could implement more interesting throtling scheme | ||
334 | -- using connected peer information | ||
335 | atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
336 | return ps | ||
337 | |||
338 | closeSession :: PeerSession -> IO () | ||
339 | closeSession ps @ PeerSession {..} = do | ||
340 | atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps) | ||
341 | |||
342 | type PeerConn = (Socket, PeerSession) | ||
343 | type Exchange = PeerConn -> IO () | ||
344 | |||
345 | sendClientStatus :: PeerConn -> IO () | ||
346 | sendClientStatus (sock, PeerSession {..}) = do | ||
347 | cbf <- readTVarIO $ clientBitfield $ swarmSession | ||
348 | sendAll sock $ encode $ Bitfield cbf | ||
349 | |||
350 | port <- dhtPort $ clientSession swarmSession | ||
351 | when (ExtDHT `elem` enabledExtensions) $ do | ||
352 | sendAll sock $ encode $ Port port | ||
353 | |||
354 | -- | Exchange action depends on session and socket, whereas session depends | ||
355 | -- on socket: | ||
356 | -- | ||
357 | -- socket------>-----exchange | ||
358 | -- | | | ||
359 | -- \-->--session-->--/ | ||
360 | -- | ||
361 | -- To handle exceptions properly we double bracket socket and session | ||
362 | -- then joining the resources and also ignoring session local exceptions. | ||
363 | -- | ||
364 | runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO () | ||
365 | runSession connector opener action = | ||
366 | handle isSessionException $ | ||
367 | bracket connector close $ \sock -> | ||
368 | bracket (opener sock) closeSession $ \ses -> | ||
369 | action (sock, ses) | ||
370 | |||
371 | -- | Used then the client want to connect to a peer. | ||
372 | initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | ||
373 | initiatePeerSession ss @ SwarmSession {..} addr | ||
374 | = runSession (connectToPeer addr) initiated | ||
375 | where | ||
376 | initiated sock = do | ||
377 | phs <- handshake sock (swarmHandshake ss) | ||
378 | ps <- openSession ss addr phs | ||
379 | sendClientStatus (sock, ps) | ||
380 | return ps | ||
381 | |||
382 | -- | Used the a peer want to connect to the client. | ||
383 | acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO () | ||
384 | acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted | ||
385 | where | ||
386 | accepted sock = do | ||
387 | phs <- recvHandshake sock | ||
388 | swarm <- lookupSwarm cs $ hsInfoHash phs | ||
389 | ps <- openSession swarm addr phs | ||
390 | sendHandshake sock $ Handshake { | ||
391 | hsProtocol = defaultBTProtocol | ||
392 | , hsReserved = encodeExts $ enabledExtensions ps | ||
393 | , hsInfoHash = hsInfoHash phs | ||
394 | , hsPeerId = clientPeerId | ||
395 | } | ||
396 | sendClientStatus (sock, ps) | ||
397 | return ps | ||
398 | |||
399 | listener :: ClientSession -> Exchange -> PortNumber -> IO () | ||
400 | listener cs action serverPort = bracket openListener close loop | ||
401 | where | ||
402 | loop sock = forever $ handle isIOError $ do | ||
403 | (conn, addr) <- accept sock | ||
404 | case addr of | ||
405 | SockAddrInet port host -> do | ||
406 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
407 | _ -> return () | ||
408 | |||
409 | isIOError :: IOError -> IO () | ||
410 | isIOError _ = return () | ||
411 | |||
412 | openListener = do | ||
413 | sock <- socket AF_INET Stream defaultProtocol | ||
414 | bindSocket sock (SockAddrInet serverPort 0) | ||
415 | listen sock 1 | ||
416 | return sock | ||
417 | |||
418 | |||
419 | {----------------------------------------------------------------------- | ||
420 | Keepalives | ||
421 | ------------------------------------------------------------------------ | ||
422 | TODO move to exchange | ||
423 | -----------------------------------------------------------------------} | ||
424 | |||
425 | sec :: Int | ||
426 | sec = 1000 * 1000 | ||
427 | |||
428 | maxIncomingTime :: Int | ||
429 | maxIncomingTime = 120 * sec | ||
430 | |||
431 | maxOutcomingTime :: Int | ||
432 | maxOutcomingTime = 1 * sec | ||
433 | |||
434 | -- | Should be called after we have received any message from a peer. | ||
435 | updateIncoming :: PeerSession -> IO () | ||
436 | updateIncoming PeerSession {..} = do | ||
437 | updateTimeout (eventManager (clientSession swarmSession)) | ||
438 | incomingTimeout maxIncomingTime | ||
439 | |||
440 | -- | Should be called before we have send any message to a peer. | ||
441 | updateOutcoming :: PeerSession -> IO () | ||
442 | updateOutcoming PeerSession {..} = | ||
443 | updateTimeout (eventManager (clientSession swarmSession)) | ||
444 | outcomingTimeout maxOutcomingTime | ||
445 | |||
446 | sendKA :: Socket -> IO () | ||
447 | sendKA sock = return () | ||
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs new file mode 100644 index 00000000..f94dbfa6 --- /dev/null +++ b/src/Network/BitTorrent/Sessions/Types.lhs | |||
@@ -0,0 +1,517 @@ | |||
1 | > -- | | ||
2 | > -- Copyright : (c) Sam T. 2013 | ||
3 | > -- License : MIT | ||
4 | > -- Maintainer : pxqr.sta@gmail.com | ||
5 | > -- Stability : experimental | ||
6 | > -- Portability : portable | ||
7 | > -- | ||
8 | > | ||
9 | > {-# LANGUAGE RecordWildCards #-} | ||
10 | > {-# LANGUAGE ViewPatterns #-} | ||
11 | > {-# LANGUAGE TemplateHaskell #-} | ||
12 | > {-# LANGUAGE DeriveDataTypeable #-} | ||
13 | > | ||
14 | > module Network.BitTorrent.Sessions.Types | ||
15 | > ( ClientService(..) | ||
16 | > , ThreadCount | ||
17 | > , TorrentLoc (..) | ||
18 | > , TorrentMap | ||
19 | > | ||
20 | > , Progress (..) | ||
21 | > , left, uploaded, downloaded | ||
22 | > , startProgress, enqueuedProgress | ||
23 | > | ||
24 | > , ClientSession (..) | ||
25 | > | ||
26 | > , SwarmSession (..) | ||
27 | > , getClientBitfield | ||
28 | > , getPending, available | ||
29 | > | ||
30 | > , PeerSession (..) | ||
31 | > , SessionCount | ||
32 | > , findPieceCount | ||
33 | > | ||
34 | > , SessionState (..) | ||
35 | > , status, bitfield | ||
36 | > , initialSessionState | ||
37 | > | ||
38 | > , SessionException (..) | ||
39 | > , isSessionException, putSessionException | ||
40 | > ) where | ||
41 | |||
42 | > import Control.Applicative | ||
43 | > import Control.Concurrent | ||
44 | > import Control.Concurrent.STM | ||
45 | > import Control.Concurrent.MSem as MSem | ||
46 | > import Control.Lens | ||
47 | > import Control.Exception | ||
48 | |||
49 | > import Data.IORef | ||
50 | > import Data.Default | ||
51 | > import Data.Function | ||
52 | > import Data.Map as M | ||
53 | > import Data.HashMap.Strict as HM | ||
54 | > import Data.Ord | ||
55 | > import Data.Set as S | ||
56 | > import Data.Typeable | ||
57 | > import Text.PrettyPrint | ||
58 | |||
59 | > import Network | ||
60 | |||
61 | > import GHC.Event as Ev | ||
62 | |||
63 | > import Data.Bitfield as BF | ||
64 | > import Data.Torrent | ||
65 | > import Network.BitTorrent.Extension | ||
66 | > import Network.BitTorrent.Peer | ||
67 | > import Network.BitTorrent.Exchange.Protocol as BT | ||
68 | > import Network.BitTorrent.Tracker.Protocol as BT | ||
69 | > import System.Torrent.Storage | ||
70 | |||
71 | Thread layout | ||
72 | ------------------------------------------------------------------------ | ||
73 | |||
74 | When client session created 2 new threads appear: | ||
75 | |||
76 | * DHT listener - replies to DHT requests; | ||
77 | |||
78 | * Peer listener - accept new P2P connection initiated by other | ||
79 | peers. | ||
80 | |||
81 | When swarn session created 3 new threads appear: | ||
82 | |||
83 | * DHT request loop asks for new peers; | ||
84 | |||
85 | * Tracker request loop asks for new peers; | ||
86 | |||
87 | * controller which fork new avaand manage running P2P sessions. | ||
88 | |||
89 | Peer session is one always forked thread. | ||
90 | |||
91 | When client\/swarm\/peer session gets closed kill the corresponding | ||
92 | threads, but flush data to disc. (for e.g. storage block map) | ||
93 | |||
94 | So for e.g., in order to obtain our first block we need to spawn at | ||
95 | least 7 threads: main thread, 2 client session threads, 3 swarm session | ||
96 | threads and PeerSession thread. | ||
97 | |||
98 | Thread throttling | ||
99 | ------------------------------------------------------------------------ | ||
100 | |||
101 | If we will not restrict number of threads we could end up | ||
102 | with thousands of connected swarm and make no particular progress. | ||
103 | |||
104 | Note also we do not bound number of swarms! This is not optimal | ||
105 | strategy because each swarm might have say 1 thread and we could end | ||
106 | up bounded by the meaningless limit. Bounding global number of p2p | ||
107 | sessions should work better, and simpler. | ||
108 | |||
109 | **TODO:** priority based throttling: leecher thread have more priority | ||
110 | than seeder threads. | ||
111 | |||
112 | > -- | Each client might have a limited number of threads. | ||
113 | > type ThreadCount = Int | ||
114 | |||
115 | Client Services | ||
116 | ------------------------------------------------------------------------ | ||
117 | |||
118 | There are two servers started as client start: | ||
119 | |||
120 | * DHT node listener - needed by other peers to discover | ||
121 | * Peer listener - need by other peers to join this client. | ||
122 | |||
123 | Thus any client (assuming DHT is enabled) provides at least 2 services | ||
124 | so we can abstract out into ClientService: | ||
125 | |||
126 | > data ClientService = ClientService { | ||
127 | > servPort :: !PortNumber | ||
128 | > , servThread :: !ThreadId | ||
129 | > } deriving Show | ||
130 | |||
131 | Torrent Map | ||
132 | ------------------------------------------------------------------------ | ||
133 | |||
134 | TODO: keep track global peer have piece set. | ||
135 | |||
136 | Keeping all seeding torrent metafiles in memory is a _bad_ idea: for | ||
137 | 1TB of data we need at least 100MB of metadata. (using 256KB piece | ||
138 | size). This solution do not scale further. | ||
139 | |||
140 | To avoid this we keep just *metainfo* about *metainfo*: | ||
141 | |||
142 | > -- | Local info about torrent location. | ||
143 | > data TorrentLoc = TorrentLoc { | ||
144 | > -- | Full path to .torrent metafile. | ||
145 | > metafilePath :: FilePath | ||
146 | > -- | Full path to directory contating content files associated | ||
147 | > -- with the metafile. | ||
148 | > , dataDirPath :: FilePath | ||
149 | > } | ||
150 | |||
151 | TorrentMap is used to keep track all known torrents for the | ||
152 | client. When some peer trying to connect to us it's necessary to | ||
153 | dispatch appropriate 'SwarmSession' (or start new one if there are | ||
154 | none) in the listener loop: we only know 'InfoHash' from 'Handshake' | ||
155 | but nothing more. So to accept new 'PeerSession' we need to lookup | ||
156 | torrent metainfo and content files (if there are some) by the | ||
157 | 'InfoHash' and only after that enter exchange loop. | ||
158 | |||
159 | Solution with TorrentLoc is much better and takes much more less | ||
160 | space, moreover it depends on count of torrents but not on count of | ||
161 | data itself. To scale further, in future we might add something like | ||
162 | database (for e.g. sqlite) for this kind of things. | ||
163 | |||
164 | > -- | Used to find torrent info and data in order to accept connection. | ||
165 | > type TorrentMap = HashMap InfoHash TorrentLoc | ||
166 | |||
167 | While *registering* torrent we need to check if torrent metafile is | ||
168 | correct, all the files are present in the filesystem and so | ||
169 | forth. However content validation using hashes will take a long time, | ||
170 | so we need to do this on demand: if a peer asks for a block, we | ||
171 | validate corresponding piece and only after read and send the block | ||
172 | back. | ||
173 | |||
174 | Progress | ||
175 | ------------------------------------------------------------------------ | ||
176 | |||
177 | Progress data is considered as dynamic within one client session. This | ||
178 | data also should be shared across client application sessions | ||
179 | (e.g. files), otherwise use 'startProgress' to get initial 'Progress'. | ||
180 | |||
181 | > -- | 'Progress' contains upload/download/left stats about | ||
182 | > -- current client state and used to notify the tracker. | ||
183 | > data Progress = Progress { | ||
184 | > _uploaded :: !Integer -- ^ Total amount of bytes uploaded. | ||
185 | > , _downloaded :: !Integer -- ^ Total amount of bytes downloaded. | ||
186 | > , _left :: !Integer -- ^ Total amount of bytes left. | ||
187 | > } deriving (Show, Read, Eq) | ||
188 | > | ||
189 | > $(makeLenses ''Progress) | ||
190 | |||
191 | **TODO:** Use Word64? | ||
192 | |||
193 | **TODO:** Use atomic bits? | ||
194 | |||
195 | Please note that tracker might penalize client some way if the do | ||
196 | not accumulate progress. If possible and save 'Progress' between | ||
197 | client sessions to avoid that. | ||
198 | |||
199 | > -- | Initial progress is used when there are no session before. | ||
200 | > startProgress :: Integer -> Progress | ||
201 | > startProgress = Progress 0 0 | ||
202 | |||
203 | > -- | Used when the client download some data from /any/ peer. | ||
204 | > downloadedProgress :: Int -> Progress -> Progress | ||
205 | > downloadedProgress (fromIntegral -> amount) | ||
206 | > = (left -~ amount) | ||
207 | > . (downloaded +~ amount) | ||
208 | > {-# INLINE downloadedProgress #-} | ||
209 | |||
210 | > -- | Used when the client upload some data to /any/ peer. | ||
211 | > uploadedProgress :: Int -> Progress -> Progress | ||
212 | > uploadedProgress (fromIntegral -> amount) = uploaded +~ amount | ||
213 | > {-# INLINE uploadedProgress #-} | ||
214 | |||
215 | > -- | Used when leecher join client session. | ||
216 | > enqueuedProgress :: Integer -> Progress -> Progress | ||
217 | > enqueuedProgress amount = left +~ amount | ||
218 | > {-# INLINE enqueuedProgress #-} | ||
219 | |||
220 | > -- | Used when leecher leave client session. | ||
221 | > -- (e.g. user deletes not completed torrent) | ||
222 | > dequeuedProgress :: Integer -> Progress -> Progress | ||
223 | > dequeuedProgress amount = left -~ amount | ||
224 | > {-# INLINE dequeuedProgress #-} | ||
225 | |||
226 | Client Sessions | ||
227 | ------------------------------------------------------------------------ | ||
228 | |||
229 | Basically, client session should contain options which user | ||
230 | application store in configuration files and related to the | ||
231 | protocol. Moreover it should contain the all client identification | ||
232 | info, for e.g. DHT. | ||
233 | |||
234 | Client session is the basic unit of bittorrent network, it has: | ||
235 | |||
236 | * The /peer ID/ used as unique identifier of the client in | ||
237 | network. Obviously, this value is not changed during client session. | ||
238 | |||
239 | * The number of /protocol extensions/ it might use. This value is | ||
240 | static as well, but if you want to dynamically reconfigure the client | ||
241 | you might kill the end the current session and create a new with the | ||
242 | fresh required extensions. | ||
243 | |||
244 | * The number of /swarms/ to join, each swarm described by the | ||
245 | 'SwarmSession'. | ||
246 | |||
247 | Normally, you would have one client session, however, if we needed, in | ||
248 | one application we could have many clients with different peer ID's | ||
249 | and different enabled extensions at the same time. | ||
250 | |||
251 | > -- | | ||
252 | > data ClientSession = ClientSession { | ||
253 | > -- | Used in handshakes and discovery mechanism. | ||
254 | > clientPeerId :: !PeerId | ||
255 | |||
256 | > -- | Extensions we should try to use. Hovewer some particular peer | ||
257 | > -- might not support some extension, so we keep enabledExtension in | ||
258 | > -- 'PeerSession'. | ||
259 | > , allowedExtensions :: [Extension] | ||
260 | |||
261 | > , peerListener :: !(MVar ClientService) | ||
262 | > , nodeListener :: !(MVar ClientService) | ||
263 | |||
264 | > -- | Semaphor used to bound number of active P2P sessions. | ||
265 | > , activeThreads :: !(MSem ThreadCount) | ||
266 | |||
267 | > -- | Max number of active connections. | ||
268 | > , maxActive :: !ThreadCount | ||
269 | |||
270 | > -- | Used to traverse the swarm session. | ||
271 | > , swarmSessions :: !(TVar (Map InfoHash SwarmSession)) | ||
272 | |||
273 | > , eventManager :: !EventManager | ||
274 | |||
275 | > -- | Used to keep track global client progress. | ||
276 | > , currentProgress :: !(TVar Progress) | ||
277 | |||
278 | > -- | Used to keep track available torrents. | ||
279 | > , torrentMap :: !(TVar TorrentMap) | ||
280 | > } | ||
281 | |||
282 | NOTE: currentProgress field is reduntant: progress depends on the all swarm | ||
283 | bitfields maybe we can remove the 'currentProgress' and compute it on | ||
284 | demand? | ||
285 | |||
286 | > instance Eq ClientSession where | ||
287 | > (==) = (==) `on` clientPeerId | ||
288 | |||
289 | > instance Ord ClientSession where | ||
290 | > compare = comparing clientPeerId | ||
291 | |||
292 | Swarm sessions | ||
293 | ------------------------------------------------------------------------ | ||
294 | |||
295 | NOTE: If client is a leecher then there is NO particular reason to | ||
296 | set max sessions count more than the_number_of_unchoke_slots * k: | ||
297 | |||
298 | * thread slot(activeThread semaphore) | ||
299 | * will take but no | ||
300 | |||
301 | So if client is a leecher then max sessions count depends on the | ||
302 | number of unchoke slots. | ||
303 | |||
304 | > -- | Used to bound the number of simultaneous connections and, which | ||
305 | > -- is the same, P2P sessions within the swarm session. | ||
306 | > type SessionCount = Int | ||
307 | |||
308 | However if client is a seeder then the value depends on . | ||
309 | |||
310 | > -- | Swarm session is | ||
311 | > data SwarmSession = SwarmSession { | ||
312 | > torrentMeta :: !Torrent | ||
313 | |||
314 | > , clientSession :: !ClientSession | ||
315 | |||
316 | TODO: lower "vacantPeers" when client becomes seeder according to | ||
317 | throttling policy. | ||
318 | |||
319 | Represent count of peers we _currently_ can connect to in the | ||
320 | swarm. Used to bound number of concurrent threads. See also *Thread | ||
321 | Throttling* section. | ||
322 | |||
323 | > , vacantPeers :: !(MSem SessionCount) | ||
324 | |||
325 | Client bitfield is used to keep track "the client have" piece set. | ||
326 | Modify this carefully always updating global progress. | ||
327 | |||
328 | > , clientBitfield :: !(TVar Bitfield) | ||
329 | |||
330 | > , storage :: !Storage | ||
331 | |||
332 | We keep set of the all connected peers for the each particular torrent | ||
333 | to prevent duplicated and therefore reduntant TCP connections. For | ||
334 | example consider the following very simle and realistic scenario: | ||
335 | |||
336 | * Peer A lookup tracker for peers. | ||
337 | |||
338 | * Peer B lookup tracker for peers. | ||
339 | |||
340 | * Finally, Peer A connect to B and Peer B connect to peer A | ||
341 | simultaneously. | ||
342 | |||
343 | There some other situation the problem may occur: duplicates in | ||
344 | successive tracker responses, tracker and DHT returns. So without any | ||
345 | protection we end up with two session between the same peers. That's | ||
346 | bad because this could lead: | ||
347 | |||
348 | * Reduced throughput - multiple sessions between the same peers will | ||
349 | mutiply control overhead (control messages, session state). | ||
350 | |||
351 | * Thread occupation - duplicated sessions will eat thread slots and | ||
352 | discourage other, possible more useful, peers to establish connection. | ||
353 | |||
354 | To avoid this we could check, into the one transaction, if a peer is | ||
355 | already connected and add a connection only if it is not. | ||
356 | |||
357 | > , connectedPeers :: !(TVar (Set PeerSession)) | ||
358 | |||
359 | TODO: use bounded broadcast chan with priority queue and drop old entries. | ||
360 | |||
361 | Channel used for replicate messages across all peers in swarm. For | ||
362 | exsample if we get some piece we should sent to all connected (and | ||
363 | interested in) peers HAVE message. | ||
364 | |||
365 | > , broadcastMessages :: !(TChan Message) | ||
366 | > } | ||
367 | |||
368 | INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers | ||
369 | |||
370 | > instance Eq SwarmSession where | ||
371 | > (==) = (==) `on` (tInfoHash . torrentMeta) | ||
372 | |||
373 | > instance Ord SwarmSession where | ||
374 | > compare = comparing (tInfoHash . torrentMeta) | ||
375 | |||
376 | > getClientBitfield :: SwarmSession -> IO Bitfield | ||
377 | > getClientBitfield = readTVarIO . clientBitfield | ||
378 | |||
379 | Peer sessions | ||
380 | ------------------------------------------------------------------------ | ||
381 | |||
382 | > -- | Peer session contain all data necessary for peer to peer | ||
383 | > -- communication. | ||
384 | > data PeerSession = PeerSession { | ||
385 | > -- | Used as unique 'PeerSession' identifier within one | ||
386 | > -- 'SwarmSession'. | ||
387 | > connectedPeerAddr :: !PeerAddr | ||
388 | |||
389 | > -- | The swarm to which both end points belong to. | ||
390 | > , swarmSession :: !SwarmSession | ||
391 | |||
392 | > -- | Extensions such that both peer and client support. | ||
393 | > , enabledExtensions :: [Extension] | ||
394 | |||
395 | To dissconnect from died peers appropriately we should check if a peer | ||
396 | do not sent the KA message within given interval. If yes, we should | ||
397 | throw an exception in 'TimeoutCallback' and close session between | ||
398 | peers. | ||
399 | |||
400 | We should update timeout if we /receive/ any message within timeout | ||
401 | interval to keep connection up. | ||
402 | |||
403 | > , incomingTimeout :: !TimeoutKey | ||
404 | |||
405 | To send KA message appropriately we should know when was last time we | ||
406 | sent a message to a peer. To do that we keep registered timeout in | ||
407 | event manager and if we do not sent any message to the peer within | ||
408 | given interval then we send KA message in 'TimeoutCallback'. | ||
409 | |||
410 | We should update timeout if we /send/ any message within timeout to | ||
411 | avoid reduntant KA messages. | ||
412 | |||
413 | > , outcomingTimeout :: !TimeoutKey | ||
414 | > | ||
415 | > -- | Broadcast messages waiting to be sent to peer. | ||
416 | > , pendingMessages :: !(TChan Message) | ||
417 | |||
418 | > -- | Dymanic P2P data. | ||
419 | > , sessionState :: !(IORef SessionState) | ||
420 | > } | ||
421 | |||
422 | > instance Eq PeerSession where | ||
423 | > (==) = (==) `on` connectedPeerAddr | ||
424 | |||
425 | > instance Ord PeerSession where | ||
426 | > compare = comparing connectedPeerAddr | ||
427 | |||
428 | > findPieceCount :: PeerSession -> PieceCount | ||
429 | > findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | ||
430 | |||
431 | Peer session state | ||
432 | ------------------------------------------------------------------------ | ||
433 | |||
434 | > data SessionState = SessionState { | ||
435 | > _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | ||
436 | > , _status :: !SessionStatus -- ^ Status of both peers. | ||
437 | > } deriving (Show, Eq) | ||
438 | |||
439 | > $(makeLenses ''SessionState) | ||
440 | |||
441 | > initialSessionState :: PieceCount -> SessionState | ||
442 | > initialSessionState pc = SessionState (haveNone pc) def | ||
443 | |||
444 | Peer session exceptions | ||
445 | ------------------------------------------------------------------------ | ||
446 | |||
447 | > -- | Exceptions used to interrupt the current P2P session. This | ||
448 | > -- exceptions will NOT affect other P2P sessions, DHT, peer <-> | ||
449 | > -- tracker, or any other session. | ||
450 | > -- | ||
451 | > data SessionException = PeerDisconnected | ||
452 | > | ProtocolError Doc | ||
453 | > | UnknownTorrent InfoHash | ||
454 | > deriving (Show, Typeable) | ||
455 | |||
456 | > instance Exception SessionException | ||
457 | |||
458 | |||
459 | > -- | Do nothing with exception, used with 'handle' or 'try'. | ||
460 | > isSessionException :: Monad m => SessionException -> m () | ||
461 | > isSessionException _ = return () | ||
462 | |||
463 | > -- | The same as 'isSessionException' but output to stdout the catched | ||
464 | > -- exception, for debugging purposes only. | ||
465 | > putSessionException :: SessionException -> IO () | ||
466 | > putSessionException = print | ||
467 | |||
468 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece | ||
469 | ------------------------------------------------------------------------ | ||
470 | |||
471 | Here we should enqueue broadcast messages and keep in mind that: | ||
472 | * We should enqueue broadcast events as they are appear. | ||
473 | * We should yield broadcast messages as fast as we get them. | ||
474 | |||
475 | these 2 phases might differ in time significantly | ||
476 | |||
477 | **TODO**: do this; but only when it'll be clean which other broadcast | ||
478 | messages & events we should send. | ||
479 | |||
480 | 1. Update client have bitfield --\____ in one transaction; | ||
481 | 2. Update downloaded stats --/ | ||
482 | 3. Signal to the all other peer about this. | ||
483 | |||
484 | > available :: Bitfield -> SwarmSession -> IO () | ||
485 | > available bf se @ SwarmSession {..} = {-# SCC available #-} do | ||
486 | > mark >> atomically broadcast | ||
487 | > where | ||
488 | > mark = do | ||
489 | > let piLen = ciPieceLength $ tInfo $ torrentMeta | ||
490 | > let bytes = piLen * BF.haveCount bf | ||
491 | > atomically $ do | ||
492 | > modifyTVar' clientBitfield (BF.union bf) | ||
493 | > modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
494 | > | ||
495 | > broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
496 | |||
497 | -- TODO compute size of messages: if it's faster to send Bitfield | ||
498 | -- instead many Have do that | ||
499 | |||
500 | -- Also if there is single Have message in queue then the | ||
501 | -- corresponding piece is likely still in memory or disc cache, | ||
502 | -- when we can send SuggestPiece. | ||
503 | |||
504 | -- | Get pending messages queue appeared in result of asynchronously | ||
505 | -- changed client state. Resulting queue should be sent to a peer | ||
506 | -- immediately. | ||
507 | |||
508 | > getPending :: PeerSession -> IO [Message] | ||
509 | > getPending PeerSession {..} = {-# SCC getPending #-} do | ||
510 | > atomically (readAvail pendingMessages) | ||
511 | |||
512 | > readAvail :: TChan a -> STM [a] | ||
513 | > readAvail chan = do | ||
514 | > m <- tryReadTChan chan | ||
515 | > case m of | ||
516 | > Just a -> (:) <$> pure a <*> readAvail chan | ||
517 | > Nothing -> return [] | ||
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index 2b8fab07..b320f0f9 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs | |||
@@ -62,7 +62,7 @@ import Network.HTTP | |||
62 | import Network.URI | 62 | import Network.URI |
63 | 63 | ||
64 | import Data.Torrent | 64 | import Data.Torrent |
65 | import Network.BitTorrent.Internal | 65 | import Network.BitTorrent.Sessions.Types |
66 | import Network.BitTorrent.Peer | 66 | import Network.BitTorrent.Peer |
67 | import Network.BitTorrent.Tracker.Protocol | 67 | import Network.BitTorrent.Tracker.Protocol |
68 | 68 | ||