summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r--src/Network/BitTorrent/Discovery.hs2
-rw-r--r--src/Network/BitTorrent/Exchange.hs2
-rw-r--r--src/Network/BitTorrent/Internal.lhs579
-rw-r--r--src/Network/BitTorrent/Sessions.hs447
-rw-r--r--src/Network/BitTorrent/Sessions/Types.lhs517
-rw-r--r--src/Network/BitTorrent/Tracker.hs2
6 files changed, 967 insertions, 582 deletions
diff --git a/src/Network/BitTorrent/Discovery.hs b/src/Network/BitTorrent/Discovery.hs
index 222dfe56..8403461c 100644
--- a/src/Network/BitTorrent/Discovery.hs
+++ b/src/Network/BitTorrent/Discovery.hs
@@ -11,7 +11,7 @@ import Network.Socket
11 11
12import Data.Torrent 12import Data.Torrent
13import Network.BitTorrent.Peer 13import Network.BitTorrent.Peer
14import Network.BitTorrent.Internal 14import Network.BitTorrent.Sessions
15import Network.BitTorrent.Exchange 15import Network.BitTorrent.Exchange
16import Network.BitTorrent.Tracker 16import Network.BitTorrent.Tracker
17import Network.BitTorrent.DHT 17import Network.BitTorrent.DHT
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 6ba56a22..32ab493d 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -88,10 +88,10 @@ import Text.PrettyPrint as PP hiding (($$))
88import Network 88import Network
89 89
90import Data.Bitfield as BF 90import Data.Bitfield as BF
91import Network.BitTorrent.Internal
92import Network.BitTorrent.Extension 91import Network.BitTorrent.Extension
93import Network.BitTorrent.Peer 92import Network.BitTorrent.Peer
94import Network.BitTorrent.Exchange.Protocol 93import Network.BitTorrent.Exchange.Protocol
94import Network.BitTorrent.Sessions.Types
95import System.Torrent.Storage 95import System.Torrent.Storage
96 96
97{----------------------------------------------------------------------- 97{-----------------------------------------------------------------------
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
deleted file mode 100644
index ef4165e3..00000000
--- a/src/Network/BitTorrent/Internal.lhs
+++ /dev/null
@@ -1,579 +0,0 @@
1> -- |
2> -- Copyright : (c) Sam T. 2013
3> -- License : MIT
4> -- Maintainer : pxqr.sta@gmail.com
5> -- Stability : experimental
6> -- Portability : portable
7> --
8> -- This module implement opaque broadcast message passing. It
9> -- provides sessions needed by Network.BitTorrent and
10> -- Network.BitTorrent.Exchange and modules. To hide some internals
11> -- of this module we detach it from Exchange.
12> -- NOTE: Expose only static data in data field lists, all dynamic
13> -- data should be modified through standalone functions.
14> --
15>
16> {-# LANGUAGE OverloadedStrings #-}
17> {-# LANGUAGE RecordWildCards #-}
18> {-# LANGUAGE ViewPatterns #-}
19>
20> module Network.BitTorrent.Internal
21> ( -- * Progress
22> Progress(..), startProgress
23>
24> , ClientService(..)
25> , startService
26> , withRunning
27>
28> -- * Client
29> , ClientSession ( ClientSession
30> , clientPeerId, allowedExtensions
31> , nodeListener, peerListener
32> )
33> , withClientSession
34> , listenerPort, dhtPort
35>
36> , ThreadCount
37> , defaultThreadCount
38>
39> , TorrentLoc(..)
40> , registerTorrent
41> , unregisterTorrent
42>
43> , getCurrentProgress
44> , getSwarmCount
45> , getPeerCount
46>
47> -- * Swarm
48> , SwarmSession( SwarmSession, torrentMeta, clientSession )
49>
50> , SessionCount
51> , getSessionCount
52>
53> , newLeecher
54> , newSeeder
55> , getClientBitfield
56>
57> , waitVacancy
58> , forkThrottle
59>
60> -- * Peer
61> , PeerSession( PeerSession, connectedPeerAddr
62> , swarmSession, enabledExtensions
63> , sessionState
64> )
65> , SessionState
66> , initiatePeerSession
67> , acceptPeerSession
68> , listener
69>
70> -- ** Broadcasting
71> , available
72> , getPending
73>
74> -- ** Exceptions
75> , SessionException(..)
76> , isSessionException
77> , putSessionException
78>
79> -- ** Properties
80> , bitfield, status
81> , findPieceCount
82>
83> -- * Timeouts
84> , updateIncoming, updateOutcoming
85> ) where
86
87> import Prelude hiding (mapM_)
88
89> import Control.Applicative
90> import Control.Concurrent
91> import Control.Concurrent.STM
92> import Control.Concurrent.MSem as MSem
93> import Control.Lens
94> import Control.Monad (when, forever, (>=>))
95> import Control.Exception
96> import Control.Monad.Trans
97
98> import Data.IORef
99> import Data.Foldable (mapM_)
100> import Data.Map as M
101> import Data.HashMap.Strict as HM
102> import Data.Set as S
103
104> import Data.Serialize hiding (get)
105
106> import Network hiding (accept)
107> import Network.Socket
108> import Network.Socket.ByteString
109
110> import GHC.Event as Ev
111
112> import Data.Bitfield as BF
113> import Data.Torrent
114> import Network.BitTorrent.Extension
115> import Network.BitTorrent.Peer
116> import Network.BitTorrent.Exchange.Protocol as BT
117> import Network.BitTorrent.Tracker.Protocol as BT
118> import System.Torrent.Storage
119> import Network.BitTorrent.Sessions.Types
120
121
122> -- | Initial progress is used when there are no session before.
123> startProgress :: Integer -> Progress
124> startProgress = Progress 0 0
125
126> -- | Used when the client download some data from /any/ peer.
127> downloadedProgress :: Int -> Progress -> Progress
128> downloadedProgress (fromIntegral -> amount)
129> = (left -~ amount)
130> . (downloaded +~ amount)
131> {-# INLINE downloadedProgress #-}
132
133> -- | Used when the client upload some data to /any/ peer.
134> uploadedProgress :: Int -> Progress -> Progress
135> uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
136> {-# INLINE uploadedProgress #-}
137
138> -- | Used when leecher join client session.
139> enqueuedProgress :: Integer -> Progress -> Progress
140> enqueuedProgress amount = left +~ amount
141> {-# INLINE enqueuedProgress #-}
142
143> -- | Used when leecher leave client session.
144> -- (e.g. user deletes not completed torrent)
145> dequeuedProgress :: Integer -> Progress -> Progress
146> dequeuedProgress amount = left -~ amount
147> {-# INLINE dequeuedProgress #-}
148
149> startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
150> startService s port m = do
151> stopService s
152> putMVar s =<< spawn
153> where
154> spawn = ClientService port <$> forkIO (m port)
155
156> stopService :: MVar ClientService -> IO ()
157> stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread)
158
159Service A might depend on service B.
160
161> withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
162> withRunning dep failure action = tryTakeMVar dep >>= maybe failure action
163
164Torrent presence
165------------------------------------------------------------------------
166
167> data TorrentPresence = Active SwarmSession
168> | Registered TorrentLoc
169> | Unknown
170
171> torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence
172> torrentPresence ClientSession {..} ih = do
173> sws <- readTVarIO swarmSessions
174> case M.lookup ih sws of
175> Just ss -> return $ Active ss
176> Nothing -> do
177> tm <- readTVarIO torrentMap
178> return $ maybe Unknown Registered $ HM.lookup ih tm
179
180Retrieving client info
181------------------------------------------------------------------------
182
183> -- | Get current global progress of the client. This value is usually
184> -- shown to a user.
185> getCurrentProgress :: MonadIO m => ClientSession -> m Progress
186> getCurrentProgress = liftIO . readTVarIO . currentProgress
187
188> -- | Get number of swarms client aware of.
189> getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
190> getSwarmCount ClientSession {..} = liftIO $
191> M.size <$> readTVarIO swarmSessions
192
193> -- | Get number of peers the client currently connected to.
194> getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
195> getPeerCount ClientSession {..} = liftIO $ do
196> unused <- peekAvail activeThreads
197> return (maxActive - unused)
198
199> -- | Create a new client session. The data passed to this function are
200> -- usually loaded from configuration file.
201> openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions.
202> -> [Extension] -- ^ Extensions allowed to use.
203> -> IO ClientSession -- ^ Client with unique peer ID.
204
205> openClientSession n exts = do
206> mgr <- Ev.new
207> -- TODO kill this thread when leave client
208> _ <- forkIO $ loop mgr
209>
210> ClientSession
211> <$> genPeerId
212> <*> pure exts
213> <*> newEmptyMVar
214> <*> newEmptyMVar
215> <*> MSem.new n
216> <*> pure n
217> <*> newTVarIO M.empty
218> <*> pure mgr
219> <*> newTVarIO (startProgress 0)
220> <*> newTVarIO HM.empty
221
222> closeClientSession :: ClientSession -> IO ()
223> closeClientSession ClientSession {..} =
224> stopService nodeListener `finally` stopService peerListener
225
226> withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO ()
227> withClientSession c es = bracket (openClientSession c es) closeClientSession
228
229> listenerPort :: ClientSession -> IO PortNumber
230> listenerPort ClientSession {..} = servPort <$> readMVar peerListener
231
232> dhtPort :: ClientSession -> IO PortNumber
233> dhtPort ClientSession {..} = servPort <$> readMVar nodeListener
234
235
236> defSeederConns :: SessionCount
237> defSeederConns = defaultUnchokeSlots
238
239> defLeacherConns :: SessionCount
240> defLeacherConns = defaultNumWant
241
242> newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
243> -> IO SwarmSession
244> newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
245> = SwarmSession t cs
246> <$> MSem.new n
247> <*> newTVarIO bf
248> <*> undefined
249> <*> newTVarIO S.empty
250> <*> newBroadcastTChanIO
251
252-- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession
253-- > openSwarmSession ClientSession {..} ih = do
254-- > loc <- HM.lookup <$> readTVarIO torrentMap
255-- > torrent <- validateLocation loc
256-- > return undefined
257
258> closeSwarmSession :: SwarmSession -> IO ()
259> closeSwarmSession se @ SwarmSession {..} = do
260> unregisterSwarmSession se
261> -- TODO stop discovery
262> -- TODO killall peer sessions
263> -- TODO the order is important!
264> closeStorage storage
265
266
267
268> unregisterSwarmSession :: SwarmSession -> IO ()
269> unregisterSwarmSession SwarmSession {..} =
270> atomically $ modifyTVar (swarmSessions clientSession) $
271> M.delete $ tInfoHash torrentMeta
272
273getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
274getSwarm cs @ ClientSession {..} ih = do
275 ss <- readTVarIO $ swarmSessions
276 case HM.lookup ih ss of
277 Just sw -> return sw
278 Nothing -> openSwarm cs
279
280> newSeeder :: ClientSession -> Torrent -> IO SwarmSession
281> newSeeder cs t @ Torrent {..}
282> = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
283
284> -- | New swarm in which the client allowed both download and upload.
285> newLeecher :: ClientSession -> Torrent -> IO SwarmSession
286> newLeecher cs t @ Torrent {..} = do
287> se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
288> atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
289> return se
290
291> --isLeacher :: SwarmSession -> IO Bool
292> --isLeacher = undefined
293
294> -- | Get the number of connected peers in the given swarm.
295> getSessionCount :: SwarmSession -> IO SessionCount
296> getSessionCount SwarmSession {..} = do
297> S.size <$> readTVarIO connectedPeers
298
299> getClientBitfield :: SwarmSession -> IO Bitfield
300> getClientBitfield = readTVarIO . clientBitfield
301
302> swarmHandshake :: SwarmSession -> Handshake
303> swarmHandshake SwarmSession {..} = Handshake {
304> hsProtocol = defaultBTProtocol
305> , hsReserved = encodeExts $ allowedExtensions $ clientSession
306> , hsInfoHash = tInfoHash torrentMeta
307> , hsPeerId = clientPeerId $ clientSession
308> }
309
310> {-
311> haveDone :: MonadIO m => PieceIx -> SwarmSession -> m ()
312> haveDone ix =
313> liftIO $ atomically $ do
314> bf <- readTVar clientBitfield
315> writeTVar (have ix bf)
316> currentProgress
317> -}
318
319Peer sessions throttling
320------------------------------------------------------------------------
321
322> -- | The number of threads suitable for a typical BT client.
323> defaultThreadCount :: ThreadCount
324> defaultThreadCount = 1000
325
326> enterSwarm :: SwarmSession -> IO ()
327> enterSwarm SwarmSession {..} = do
328> MSem.wait (activeThreads clientSession)
329> MSem.wait vacantPeers
330
331> leaveSwarm :: SwarmSession -> IO ()
332> leaveSwarm SwarmSession {..} = do
333> MSem.signal vacantPeers
334> MSem.signal (activeThreads clientSession)
335
336> waitVacancy :: SwarmSession -> IO () -> IO ()
337> waitVacancy se =
338> bracket (enterSwarm se) (const (leaveSwarm se))
339> . const
340
341> forkThrottle :: SwarmSession -> IO () -> IO ThreadId
342> forkThrottle se action = do
343> enterSwarm se
344> (forkIO $ do
345> action `finally` leaveSwarm se)
346> `onException` leaveSwarm se
347
348
349> findPieceCount :: PeerSession -> PieceCount
350> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
351
352TODO: check content files location;
353
354> validateLocation :: TorrentLoc -> IO Torrent
355> validateLocation = fromFile . metafilePath
356
357> registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO ()
358> registerTorrent = error "registerTorrent"
359> {-
360> Torrent {..} <- validateTorrent tl
361> atomically $ modifyTVar' torrentMap $ HM.insert tInfoHash tl
362> return (Just t)
363> -}
364
365> unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
366> unregisterTorrent = error "unregisterTorrent"
367> -- modifyTVar' torrentMap $ HM.delete ih
368
369> torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
370> torrentSwarm _ _ (Active sws) = return sws
371> torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc
372> torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih
373
374> lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession
375> lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih
376
377Peer session creation
378------------------------------------------------------------------------
379
380The peer session cycle looks like:
381
382 * acquire vacant session and vacant thread slot;
383 * (fork could be here, but not necessary)
384 * establish peer connection;
385 * register peer session;
386 * ... exchange process ...
387 * unregister peer session;
388 * close peer connection;
389 * release acquired session and thread slot.
390
391TODO: explain why this order
392TODO: thread throttling
393TODO: check if it connected yet peer
394TODO: utilize peer Id.
395TODO: use STM semaphore
396
397> openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
398> openSession ss @ SwarmSession {..} addr Handshake {..} = do
399> let clientCaps = encodeExts $ allowedExtensions $ clientSession
400> let enabled = decodeExts (enabledCaps clientCaps hsReserved)
401> ps <- PeerSession addr ss enabled
402> <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ())
403> <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ())
404> <*> atomically (dupTChan broadcastMessages)
405> <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield)
406> -- TODO we could implement more interesting throtling scheme
407> -- using connected peer information
408> atomically $ modifyTVar' connectedPeers (S.insert ps)
409> return ps
410
411> closeSession :: PeerSession -> IO ()
412> closeSession ps @ PeerSession {..} = do
413> atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)
414
415> type PeerConn = (Socket, PeerSession)
416> type Exchange = PeerConn -> IO ()
417
418> sendClientStatus :: PeerConn -> IO ()
419> sendClientStatus (sock, PeerSession {..}) = do
420> cbf <- readTVarIO $ clientBitfield $ swarmSession
421> sendAll sock $ encode $ Bitfield cbf
422>
423> port <- dhtPort $ clientSession swarmSession
424> when (ExtDHT `elem` enabledExtensions) $ do
425> sendAll sock $ encode $ Port port
426
427Exchange action depends on session and socket, whereas session depends
428on socket:
429
430 socket------>-----exchange
431 | |
432 \-->--session-->--/
433
434To handle exceptions properly we double bracket socket and session
435then joining the resources and also ignoring session local exceptions.
436
437> runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
438> runSession connector opener action =
439> handle isSessionException $
440> bracket connector close $ \sock ->
441> bracket (opener sock) closeSession $ \ses ->
442> action (sock, ses)
443
444Used then the client want to connect to a peer.
445
446> initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
447> initiatePeerSession ss @ SwarmSession {..} addr
448> = runSession (connectToPeer addr) initiated
449> where
450> initiated sock = do
451> phs <- handshake sock (swarmHandshake ss)
452> ps <- openSession ss addr phs
453> sendClientStatus (sock, ps)
454> return ps
455
456Used the a peer want to connect to the client.
457
458> acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
459> acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
460> where
461> accepted sock = do
462> phs <- recvHandshake sock
463> swarm <- lookupSwarm cs $ hsInfoHash phs
464> ps <- openSession swarm addr phs
465> sendHandshake sock $ Handshake {
466> hsProtocol = defaultBTProtocol
467> , hsReserved = encodeExts $ enabledExtensions ps
468> , hsInfoHash = hsInfoHash phs
469> , hsPeerId = clientPeerId
470> }
471> sendClientStatus (sock, ps)
472> return ps
473
474
475> listener :: ClientSession -> Exchange -> PortNumber -> IO ()
476> listener cs action serverPort = bracket openListener close loop
477> where
478> loop sock = forever $ handle isIOError $ do
479> (conn, addr) <- accept sock
480> case addr of
481> SockAddrInet port host -> do
482> acceptPeerSession cs (PeerAddr Nothing host port) conn action
483> _ -> return ()
484>
485> isIOError :: IOError -> IO ()
486> isIOError _ = return ()
487>
488> openListener = do
489> sock <- socket AF_INET Stream defaultProtocol
490> bindSocket sock (SockAddrInet serverPort 0)
491> listen sock 1
492> return sock
493
494Broadcasting: Have, Cancel, Bitfield, SuggestPiece
495------------------------------------------------------------------------
496
497Here we should enqueue broadcast messages and keep in mind that:
498 * We should enqueue broadcast events as they are appear.
499 * We should yield broadcast messages as fast as we get them.
500
501these 2 phases might differ in time significantly
502
503**TODO**: do this; but only when it'll be clean which other broadcast
504messages & events we should send.
505
5061. Update client have bitfield --\____ in one transaction;
5072. Update downloaded stats --/
5083. Signal to the all other peer about this.
509
510> available :: Bitfield -> SwarmSession -> IO ()
511> available bf se @ SwarmSession {..} = {-# SCC available #-} do
512> mark >> atomically broadcast
513> where
514> mark = do
515> let piLen = ciPieceLength $ tInfo $ torrentMeta
516> let bytes = piLen * BF.haveCount bf
517> atomically $ do
518> modifyTVar' clientBitfield (BF.union bf)
519> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
520>
521> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
522
523
524TODO compute size of messages: if it's faster to send Bitfield
525instead many Have do that
526
527Also if there is single Have message in queue then the
528corresponding piece is likely still in memory or disc cache,
529when we can send SuggestPiece.
530
531Get pending messages queue appeared in result of asynchronously
532changed client state. Resulting queue should be sent to a peer
533immediately.
534
535> getPending :: PeerSession -> IO [Message]
536> getPending PeerSession {..} = {-# SCC getPending #-} do
537> atomically (readAvail pendingMessages)
538
539> readAvail :: TChan a -> STM [a]
540> readAvail chan = do
541> m <- tryReadTChan chan
542> case m of
543> Just a -> (:) <$> pure a <*> readAvail chan
544> Nothing -> return []
545
546Timeouts
547-----------------------------------------------------------------------
548
549for internal use only
550
551> sec :: Int
552> sec = 1000 * 1000
553
554> maxIncomingTime :: Int
555> maxIncomingTime = 120 * sec
556
557> maxOutcomingTime :: Int
558> maxOutcomingTime = 1 * sec
559
560> -- | Should be called after we have received any message from a peer.
561> updateIncoming :: PeerSession -> IO ()
562> updateIncoming PeerSession {..} = do
563> updateTimeout (eventManager (clientSession swarmSession))
564> incomingTimeout maxIncomingTime
565
566> -- | Should be called before we have send any message to a peer.
567> updateOutcoming :: PeerSession -> IO ()
568> updateOutcoming PeerSession {..} =
569> updateTimeout (eventManager (clientSession swarmSession))
570> outcomingTimeout maxOutcomingTime
571
572> sendKA :: Socket -> IO ()
573> sendKA sock {- SwarmSession {..} -} = do
574> return ()
575> -- print "I'm sending keep alive."
576> -- sendAll sock (encode BT.KeepAlive)
577> -- let mgr = eventManager clientSession
578> -- updateTimeout mgr
579> -- print "Done.."
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs
new file mode 100644
index 00000000..31b30e43
--- /dev/null
+++ b/src/Network/BitTorrent/Sessions.hs
@@ -0,0 +1,447 @@
1-- |
2-- Copyright : (c) Sam T. 2013
3-- License : MIT
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8{-# LANGUAGE OverloadedStrings #-}
9{-# LANGUAGE RecordWildCards #-}
10module Network.BitTorrent.Sessions
11 ( -- * Progress
12 Progress(..), startProgress
13 , ClientService(..)
14 , startService
15 , withRunning
16
17 -- * Client
18 , ClientSession ( ClientSession
19 , clientPeerId, allowedExtensions
20 , nodeListener, peerListener
21 )
22 , withClientSession
23 , listenerPort, dhtPort
24
25 , ThreadCount
26 , defaultThreadCount
27
28 , TorrentLoc(..)
29 , registerTorrent
30 , unregisterTorrent
31
32 , getCurrentProgress
33 , getSwarmCount
34 , getPeerCount
35
36 -- * Swarm
37 , SwarmSession( SwarmSession, torrentMeta, clientSession )
38
39 , SessionCount
40 , getSessionCount
41
42 , newLeecher
43 , newSeeder
44 , getClientBitfield
45
46 -- TODO hide this
47 , waitVacancy
48 , forkThrottle
49
50 -- * Peer
51 , PeerSession( PeerSession, connectedPeerAddr
52 , swarmSession, enabledExtensions
53 , sessionState
54 )
55 , SessionState
56 , initiatePeerSession
57 , acceptPeerSession
58 , listener
59
60 -- * Timeouts
61 , updateIncoming, updateOutcoming
62 ) where
63
64import Prelude hiding (mapM_)
65
66import Control.Applicative
67import Control.Concurrent
68import Control.Concurrent.STM
69import Control.Concurrent.MSem as MSem
70import Control.Lens
71import Control.Monad (when, forever, (>=>))
72import Control.Exception
73import Control.Monad.Trans
74
75import Data.IORef
76import Data.Foldable (mapM_)
77import Data.Map as M
78import Data.HashMap.Strict as HM
79import Data.Set as S
80
81import Data.Serialize hiding (get)
82
83import Network hiding (accept)
84import Network.Socket
85import Network.Socket.ByteString
86
87import GHC.Event as Ev
88
89import Data.Bitfield as BF
90import Data.Torrent
91import Network.BitTorrent.Extension
92import Network.BitTorrent.Peer
93import Network.BitTorrent.Exchange.Protocol as BT
94import Network.BitTorrent.Tracker.Protocol as BT
95import System.Torrent.Storage
96import Network.BitTorrent.Sessions.Types
97
98{-----------------------------------------------------------------------
99 Client Services
100-----------------------------------------------------------------------}
101
102startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
103startService s port m = do
104 stopService s
105 putMVar s =<< spawn
106 where
107 spawn = ClientService port <$> forkIO (m port)
108
109stopService :: MVar ClientService -> IO ()
110stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread)
111
112-- | Service A might depend on service B.
113withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
114withRunning dep failure action = tryTakeMVar dep >>= maybe failure action
115
116{-----------------------------------------------------------------------
117 Torrent presence
118-----------------------------------------------------------------------}
119
120data TorrentPresence = Active SwarmSession
121 | Registered TorrentLoc
122 | Unknown
123
124torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence
125torrentPresence ClientSession {..} ih = do
126 sws <- readTVarIO swarmSessions
127 case M.lookup ih sws of
128 Just ss -> return $ Active ss
129 Nothing -> do
130 tm <- readTVarIO torrentMap
131 return $ maybe Unknown Registered $ HM.lookup ih tm
132
133{-----------------------------------------------------------------------
134 Client sessions
135-----------------------------------------------------------------------}
136
137-- | Create a new client session. The data passed to this function are
138-- usually loaded from configuration file.
139openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions.
140 -> [Extension] -- ^ Extensions allowed to use.
141 -> IO ClientSession -- ^ Client with unique peer ID.
142openClientSession n exts = do
143 mgr <- Ev.new
144 -- TODO kill this thread when leave client
145 _ <- forkIO $ loop mgr
146 ClientSession
147 <$> genPeerId
148 <*> pure exts
149 <*> newEmptyMVar
150 <*> newEmptyMVar
151 <*> MSem.new n
152 <*> pure n
153 <*> newTVarIO M.empty
154 <*> pure mgr
155 <*> newTVarIO (startProgress 0)
156 <*> newTVarIO HM.empty
157
158closeClientSession :: ClientSession -> IO ()
159closeClientSession ClientSession {..} =
160 stopService nodeListener `finally` stopService peerListener
161-- TODO stop all swarm sessions
162
163withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO ()
164withClientSession c es = bracket (openClientSession c es) closeClientSession
165
166-- | Get current global progress of the client. This value is usually
167-- shown to a user.
168getCurrentProgress :: MonadIO m => ClientSession -> m Progress
169getCurrentProgress = liftIO . readTVarIO . currentProgress
170
171-- | Get number of swarms client aware of.
172getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
173getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions
174
175-- | Get number of peers the client currently connected to.
176getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
177getPeerCount ClientSession {..} = liftIO $ do
178 unused <- peekAvail activeThreads
179 return (maxActive - unused)
180
181listenerPort :: ClientSession -> IO PortNumber
182listenerPort ClientSession {..} = servPort <$> readMVar peerListener
183
184dhtPort :: ClientSession -> IO PortNumber
185dhtPort ClientSession {..} = servPort <$> readMVar nodeListener
186
187{-----------------------------------------------------------------------
188 Swarm session
189-----------------------------------------------------------------------}
190
191defSeederConns :: SessionCount
192defSeederConns = defaultUnchokeSlots
193
194defLeacherConns :: SessionCount
195defLeacherConns = defaultNumWant
196
197newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
198 -> IO SwarmSession
199newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
200 = SwarmSession t cs
201 <$> MSem.new n
202 <*> newTVarIO bf
203 <*> undefined
204 <*> newTVarIO S.empty
205 <*> newBroadcastTChanIO
206
207-- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession
208-- > openSwarmSession ClientSession {..} ih = do
209-- > loc <- HM.lookup <$> readTVarIO torrentMap
210-- > torrent <- validateLocation loc
211-- > return undefined
212
213closeSwarmSession :: SwarmSession -> IO ()
214closeSwarmSession se @ SwarmSession {..} = do
215 unregisterSwarmSession se
216 -- TODO stop discovery
217 -- TODO killall peer sessions
218 -- TODO the order is important!
219 closeStorage storage
220
221unregisterSwarmSession :: SwarmSession -> IO ()
222unregisterSwarmSession SwarmSession {..} =
223 atomically $ modifyTVar (swarmSessions clientSession) $
224 M.delete $ tInfoHash torrentMeta
225
226getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
227getSwarm cs @ ClientSession {..} ih = do
228 ss <- readTVarIO $ swarmSessions
229 case M.lookup ih ss of
230 Just sw -> return sw
231 Nothing -> undefined -- openSwarm cs
232
233newSeeder :: ClientSession -> Torrent -> IO SwarmSession
234newSeeder cs t @ Torrent {..}
235 = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
236
237-- | New swarm in which the client allowed both download and upload.
238newLeecher :: ClientSession -> Torrent -> IO SwarmSession
239newLeecher cs t @ Torrent {..} = do
240 se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
241 atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
242 return se
243
244-- | Get the number of connected peers in the given swarm.
245getSessionCount :: SwarmSession -> IO SessionCount
246getSessionCount SwarmSession {..} = do
247 S.size <$> readTVarIO connectedPeers
248
249swarmHandshake :: SwarmSession -> Handshake
250swarmHandshake SwarmSession {..} = Handshake {
251 hsProtocol = defaultBTProtocol
252 , hsReserved = encodeExts $ allowedExtensions $ clientSession
253 , hsInfoHash = tInfoHash torrentMeta
254 , hsPeerId = clientPeerId $ clientSession
255 }
256
257{-----------------------------------------------------------------------
258 Peer sessions throttling
259-----------------------------------------------------------------------}
260
261-- | The number of threads suitable for a typical BT client.
262defaultThreadCount :: ThreadCount
263defaultThreadCount = 1000
264
265enterSwarm :: SwarmSession -> IO ()
266enterSwarm SwarmSession {..} = do
267 MSem.wait (activeThreads clientSession)
268 MSem.wait vacantPeers
269
270leaveSwarm :: SwarmSession -> IO ()
271leaveSwarm SwarmSession {..} = do
272 MSem.signal vacantPeers
273 MSem.signal (activeThreads clientSession)
274
275waitVacancy :: SwarmSession -> IO () -> IO ()
276waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const
277
278forkThrottle :: SwarmSession -> IO () -> IO ThreadId
279forkThrottle se action = do
280 enterSwarm se
281 (forkIO $ do
282 action `finally` leaveSwarm se)
283 `onException` leaveSwarm se
284
285-- TODO: check content files location;
286validateLocation :: TorrentLoc -> IO Torrent
287validateLocation = fromFile . metafilePath
288
289registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO ()
290registerTorrent = error "registerTorrent"
291
292unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
293unregisterTorrent = error "unregisterTorrent"
294
295torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
296torrentSwarm _ _ (Active sws) = return sws
297torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc
298torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih
299
300lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession
301lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih
302
303{-----------------------------------------------------------------------
304 Peer session creation
305------------------------------------------------------------------------
306The peer session cycle looks like:
307
308 * acquire vacant session and vacant thread slot;
309 * (fork could be here, but not necessary)
310 * establish peer connection;
311 * register peer session;
312 * ... exchange process ...
313 * unregister peer session;
314 * close peer connection;
315 * release acquired session and thread slot.
316
317TODO: explain why this order
318TODO: thread throttling
319TODO: check if it connected yet peer
320TODO: utilize peer Id.
321TODO: use STM semaphore
322-----------------------------------------------------------------------}
323
324openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
325openSession ss @ SwarmSession {..} addr Handshake {..} = do
326 let clientCaps = encodeExts $ allowedExtensions $ clientSession
327 let enabled = decodeExts (enabledCaps clientCaps hsReserved)
328 ps <- PeerSession addr ss enabled
329 <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ())
330 <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ())
331 <*> atomically (dupTChan broadcastMessages)
332 <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield)
333 -- TODO we could implement more interesting throtling scheme
334 -- using connected peer information
335 atomically $ modifyTVar' connectedPeers (S.insert ps)
336 return ps
337
338closeSession :: PeerSession -> IO ()
339closeSession ps @ PeerSession {..} = do
340 atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)
341
342type PeerConn = (Socket, PeerSession)
343type Exchange = PeerConn -> IO ()
344
345sendClientStatus :: PeerConn -> IO ()
346sendClientStatus (sock, PeerSession {..}) = do
347 cbf <- readTVarIO $ clientBitfield $ swarmSession
348 sendAll sock $ encode $ Bitfield cbf
349
350 port <- dhtPort $ clientSession swarmSession
351 when (ExtDHT `elem` enabledExtensions) $ do
352 sendAll sock $ encode $ Port port
353
354-- | Exchange action depends on session and socket, whereas session depends
355-- on socket:
356--
357-- socket------>-----exchange
358-- | |
359-- \-->--session-->--/
360--
361-- To handle exceptions properly we double bracket socket and session
362-- then joining the resources and also ignoring session local exceptions.
363--
364runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
365runSession connector opener action =
366 handle isSessionException $
367 bracket connector close $ \sock ->
368 bracket (opener sock) closeSession $ \ses ->
369 action (sock, ses)
370
371-- | Used then the client want to connect to a peer.
372initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
373initiatePeerSession ss @ SwarmSession {..} addr
374 = runSession (connectToPeer addr) initiated
375 where
376 initiated sock = do
377 phs <- handshake sock (swarmHandshake ss)
378 ps <- openSession ss addr phs
379 sendClientStatus (sock, ps)
380 return ps
381
382-- | Used the a peer want to connect to the client.
383acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
384acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
385 where
386 accepted sock = do
387 phs <- recvHandshake sock
388 swarm <- lookupSwarm cs $ hsInfoHash phs
389 ps <- openSession swarm addr phs
390 sendHandshake sock $ Handshake {
391 hsProtocol = defaultBTProtocol
392 , hsReserved = encodeExts $ enabledExtensions ps
393 , hsInfoHash = hsInfoHash phs
394 , hsPeerId = clientPeerId
395 }
396 sendClientStatus (sock, ps)
397 return ps
398
399listener :: ClientSession -> Exchange -> PortNumber -> IO ()
400listener cs action serverPort = bracket openListener close loop
401 where
402 loop sock = forever $ handle isIOError $ do
403 (conn, addr) <- accept sock
404 case addr of
405 SockAddrInet port host -> do
406 acceptPeerSession cs (PeerAddr Nothing host port) conn action
407 _ -> return ()
408
409 isIOError :: IOError -> IO ()
410 isIOError _ = return ()
411
412 openListener = do
413 sock <- socket AF_INET Stream defaultProtocol
414 bindSocket sock (SockAddrInet serverPort 0)
415 listen sock 1
416 return sock
417
418
419{-----------------------------------------------------------------------
420 Keepalives
421------------------------------------------------------------------------
422TODO move to exchange
423-----------------------------------------------------------------------}
424
425sec :: Int
426sec = 1000 * 1000
427
428maxIncomingTime :: Int
429maxIncomingTime = 120 * sec
430
431maxOutcomingTime :: Int
432maxOutcomingTime = 1 * sec
433
434-- | Should be called after we have received any message from a peer.
435updateIncoming :: PeerSession -> IO ()
436updateIncoming PeerSession {..} = do
437 updateTimeout (eventManager (clientSession swarmSession))
438 incomingTimeout maxIncomingTime
439
440-- | Should be called before we have send any message to a peer.
441updateOutcoming :: PeerSession -> IO ()
442updateOutcoming PeerSession {..} =
443 updateTimeout (eventManager (clientSession swarmSession))
444 outcomingTimeout maxOutcomingTime
445
446sendKA :: Socket -> IO ()
447sendKA sock = return ()
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs
new file mode 100644
index 00000000..f94dbfa6
--- /dev/null
+++ b/src/Network/BitTorrent/Sessions/Types.lhs
@@ -0,0 +1,517 @@
1> -- |
2> -- Copyright : (c) Sam T. 2013
3> -- License : MIT
4> -- Maintainer : pxqr.sta@gmail.com
5> -- Stability : experimental
6> -- Portability : portable
7> --
8>
9> {-# LANGUAGE RecordWildCards #-}
10> {-# LANGUAGE ViewPatterns #-}
11> {-# LANGUAGE TemplateHaskell #-}
12> {-# LANGUAGE DeriveDataTypeable #-}
13>
14> module Network.BitTorrent.Sessions.Types
15> ( ClientService(..)
16> , ThreadCount
17> , TorrentLoc (..)
18> , TorrentMap
19>
20> , Progress (..)
21> , left, uploaded, downloaded
22> , startProgress, enqueuedProgress
23>
24> , ClientSession (..)
25>
26> , SwarmSession (..)
27> , getClientBitfield
28> , getPending, available
29>
30> , PeerSession (..)
31> , SessionCount
32> , findPieceCount
33>
34> , SessionState (..)
35> , status, bitfield
36> , initialSessionState
37>
38> , SessionException (..)
39> , isSessionException, putSessionException
40> ) where
41
42> import Control.Applicative
43> import Control.Concurrent
44> import Control.Concurrent.STM
45> import Control.Concurrent.MSem as MSem
46> import Control.Lens
47> import Control.Exception
48
49> import Data.IORef
50> import Data.Default
51> import Data.Function
52> import Data.Map as M
53> import Data.HashMap.Strict as HM
54> import Data.Ord
55> import Data.Set as S
56> import Data.Typeable
57> import Text.PrettyPrint
58
59> import Network
60
61> import GHC.Event as Ev
62
63> import Data.Bitfield as BF
64> import Data.Torrent
65> import Network.BitTorrent.Extension
66> import Network.BitTorrent.Peer
67> import Network.BitTorrent.Exchange.Protocol as BT
68> import Network.BitTorrent.Tracker.Protocol as BT
69> import System.Torrent.Storage
70
71Thread layout
72------------------------------------------------------------------------
73
74When client session created 2 new threads appear:
75
76 * DHT listener - replies to DHT requests;
77
78 * Peer listener - accept new P2P connection initiated by other
79peers.
80
81When swarn session created 3 new threads appear:
82
83 * DHT request loop asks for new peers;
84
85 * Tracker request loop asks for new peers;
86
87 * controller which fork new avaand manage running P2P sessions.
88
89Peer session is one always forked thread.
90
91When client\/swarm\/peer session gets closed kill the corresponding
92threads, but flush data to disc. (for e.g. storage block map)
93
94So for e.g., in order to obtain our first block we need to spawn at
95least 7 threads: main thread, 2 client session threads, 3 swarm session
96threads and PeerSession thread.
97
98Thread throttling
99------------------------------------------------------------------------
100
101If we will not restrict number of threads we could end up
102with thousands of connected swarm and make no particular progress.
103
104Note also we do not bound number of swarms! This is not optimal
105strategy because each swarm might have say 1 thread and we could end
106up bounded by the meaningless limit. Bounding global number of p2p
107sessions should work better, and simpler.
108
109**TODO:** priority based throttling: leecher thread have more priority
110than seeder threads.
111
112> -- | Each client might have a limited number of threads.
113> type ThreadCount = Int
114
115Client Services
116------------------------------------------------------------------------
117
118There are two servers started as client start:
119
120 * DHT node listener - needed by other peers to discover
121 * Peer listener - need by other peers to join this client.
122
123Thus any client (assuming DHT is enabled) provides at least 2 services
124so we can abstract out into ClientService:
125
126> data ClientService = ClientService {
127> servPort :: !PortNumber
128> , servThread :: !ThreadId
129> } deriving Show
130
131Torrent Map
132------------------------------------------------------------------------
133
134TODO: keep track global peer have piece set.
135
136Keeping all seeding torrent metafiles in memory is a _bad_ idea: for
1371TB of data we need at least 100MB of metadata. (using 256KB piece
138size). This solution do not scale further.
139
140To avoid this we keep just *metainfo* about *metainfo*:
141
142> -- | Local info about torrent location.
143> data TorrentLoc = TorrentLoc {
144> -- | Full path to .torrent metafile.
145> metafilePath :: FilePath
146> -- | Full path to directory contating content files associated
147> -- with the metafile.
148> , dataDirPath :: FilePath
149> }
150
151TorrentMap is used to keep track all known torrents for the
152client. When some peer trying to connect to us it's necessary to
153dispatch appropriate 'SwarmSession' (or start new one if there are
154none) in the listener loop: we only know 'InfoHash' from 'Handshake'
155but nothing more. So to accept new 'PeerSession' we need to lookup
156torrent metainfo and content files (if there are some) by the
157'InfoHash' and only after that enter exchange loop.
158
159Solution with TorrentLoc is much better and takes much more less
160space, moreover it depends on count of torrents but not on count of
161data itself. To scale further, in future we might add something like
162database (for e.g. sqlite) for this kind of things.
163
164> -- | Used to find torrent info and data in order to accept connection.
165> type TorrentMap = HashMap InfoHash TorrentLoc
166
167While *registering* torrent we need to check if torrent metafile is
168correct, all the files are present in the filesystem and so
169forth. However content validation using hashes will take a long time,
170so we need to do this on demand: if a peer asks for a block, we
171validate corresponding piece and only after read and send the block
172back.
173
174Progress
175------------------------------------------------------------------------
176
177Progress data is considered as dynamic within one client session. This
178data also should be shared across client application sessions
179(e.g. files), otherwise use 'startProgress' to get initial 'Progress'.
180
181> -- | 'Progress' contains upload/download/left stats about
182> -- current client state and used to notify the tracker.
183> data Progress = Progress {
184> _uploaded :: !Integer -- ^ Total amount of bytes uploaded.
185> , _downloaded :: !Integer -- ^ Total amount of bytes downloaded.
186> , _left :: !Integer -- ^ Total amount of bytes left.
187> } deriving (Show, Read, Eq)
188>
189> $(makeLenses ''Progress)
190
191**TODO:** Use Word64?
192
193**TODO:** Use atomic bits?
194
195Please note that tracker might penalize client some way if the do
196not accumulate progress. If possible and save 'Progress' between
197client sessions to avoid that.
198
199> -- | Initial progress is used when there are no session before.
200> startProgress :: Integer -> Progress
201> startProgress = Progress 0 0
202
203> -- | Used when the client download some data from /any/ peer.
204> downloadedProgress :: Int -> Progress -> Progress
205> downloadedProgress (fromIntegral -> amount)
206> = (left -~ amount)
207> . (downloaded +~ amount)
208> {-# INLINE downloadedProgress #-}
209
210> -- | Used when the client upload some data to /any/ peer.
211> uploadedProgress :: Int -> Progress -> Progress
212> uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
213> {-# INLINE uploadedProgress #-}
214
215> -- | Used when leecher join client session.
216> enqueuedProgress :: Integer -> Progress -> Progress
217> enqueuedProgress amount = left +~ amount
218> {-# INLINE enqueuedProgress #-}
219
220> -- | Used when leecher leave client session.
221> -- (e.g. user deletes not completed torrent)
222> dequeuedProgress :: Integer -> Progress -> Progress
223> dequeuedProgress amount = left -~ amount
224> {-# INLINE dequeuedProgress #-}
225
226Client Sessions
227------------------------------------------------------------------------
228
229Basically, client session should contain options which user
230application store in configuration files and related to the
231protocol. Moreover it should contain the all client identification
232info, for e.g. DHT.
233
234Client session is the basic unit of bittorrent network, it has:
235
236 * The /peer ID/ used as unique identifier of the client in
237network. Obviously, this value is not changed during client session.
238
239 * The number of /protocol extensions/ it might use. This value is
240static as well, but if you want to dynamically reconfigure the client
241you might kill the end the current session and create a new with the
242fresh required extensions.
243
244 * The number of /swarms/ to join, each swarm described by the
245'SwarmSession'.
246
247Normally, you would have one client session, however, if we needed, in
248one application we could have many clients with different peer ID's
249and different enabled extensions at the same time.
250
251> -- |
252> data ClientSession = ClientSession {
253> -- | Used in handshakes and discovery mechanism.
254> clientPeerId :: !PeerId
255
256> -- | Extensions we should try to use. Hovewer some particular peer
257> -- might not support some extension, so we keep enabledExtension in
258> -- 'PeerSession'.
259> , allowedExtensions :: [Extension]
260
261> , peerListener :: !(MVar ClientService)
262> , nodeListener :: !(MVar ClientService)
263
264> -- | Semaphor used to bound number of active P2P sessions.
265> , activeThreads :: !(MSem ThreadCount)
266
267> -- | Max number of active connections.
268> , maxActive :: !ThreadCount
269
270> -- | Used to traverse the swarm session.
271> , swarmSessions :: !(TVar (Map InfoHash SwarmSession))
272
273> , eventManager :: !EventManager
274
275> -- | Used to keep track global client progress.
276> , currentProgress :: !(TVar Progress)
277
278> -- | Used to keep track available torrents.
279> , torrentMap :: !(TVar TorrentMap)
280> }
281
282NOTE: currentProgress field is reduntant: progress depends on the all swarm
283bitfields maybe we can remove the 'currentProgress' and compute it on
284demand?
285
286> instance Eq ClientSession where
287> (==) = (==) `on` clientPeerId
288
289> instance Ord ClientSession where
290> compare = comparing clientPeerId
291
292Swarm sessions
293------------------------------------------------------------------------
294
295NOTE: If client is a leecher then there is NO particular reason to
296set max sessions count more than the_number_of_unchoke_slots * k:
297
298 * thread slot(activeThread semaphore)
299 * will take but no
300
301So if client is a leecher then max sessions count depends on the
302number of unchoke slots.
303
304> -- | Used to bound the number of simultaneous connections and, which
305> -- is the same, P2P sessions within the swarm session.
306> type SessionCount = Int
307
308However if client is a seeder then the value depends on .
309
310> -- | Swarm session is
311> data SwarmSession = SwarmSession {
312> torrentMeta :: !Torrent
313
314> , clientSession :: !ClientSession
315
316TODO: lower "vacantPeers" when client becomes seeder according to
317throttling policy.
318
319Represent count of peers we _currently_ can connect to in the
320swarm. Used to bound number of concurrent threads. See also *Thread
321Throttling* section.
322
323> , vacantPeers :: !(MSem SessionCount)
324
325Client bitfield is used to keep track "the client have" piece set.
326Modify this carefully always updating global progress.
327
328> , clientBitfield :: !(TVar Bitfield)
329
330> , storage :: !Storage
331
332We keep set of the all connected peers for the each particular torrent
333to prevent duplicated and therefore reduntant TCP connections. For
334example consider the following very simle and realistic scenario:
335
336 * Peer A lookup tracker for peers.
337
338 * Peer B lookup tracker for peers.
339
340 * Finally, Peer A connect to B and Peer B connect to peer A
341simultaneously.
342
343There some other situation the problem may occur: duplicates in
344successive tracker responses, tracker and DHT returns. So without any
345protection we end up with two session between the same peers. That's
346bad because this could lead:
347
348 * Reduced throughput - multiple sessions between the same peers will
349mutiply control overhead (control messages, session state).
350
351 * Thread occupation - duplicated sessions will eat thread slots and
352discourage other, possible more useful, peers to establish connection.
353
354To avoid this we could check, into the one transaction, if a peer is
355already connected and add a connection only if it is not.
356
357> , connectedPeers :: !(TVar (Set PeerSession))
358
359TODO: use bounded broadcast chan with priority queue and drop old entries.
360
361Channel used for replicate messages across all peers in swarm. For
362exsample if we get some piece we should sent to all connected (and
363interested in) peers HAVE message.
364
365> , broadcastMessages :: !(TChan Message)
366> }
367
368INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers
369
370> instance Eq SwarmSession where
371> (==) = (==) `on` (tInfoHash . torrentMeta)
372
373> instance Ord SwarmSession where
374> compare = comparing (tInfoHash . torrentMeta)
375
376> getClientBitfield :: SwarmSession -> IO Bitfield
377> getClientBitfield = readTVarIO . clientBitfield
378
379Peer sessions
380------------------------------------------------------------------------
381
382> -- | Peer session contain all data necessary for peer to peer
383> -- communication.
384> data PeerSession = PeerSession {
385> -- | Used as unique 'PeerSession' identifier within one
386> -- 'SwarmSession'.
387> connectedPeerAddr :: !PeerAddr
388
389> -- | The swarm to which both end points belong to.
390> , swarmSession :: !SwarmSession
391
392> -- | Extensions such that both peer and client support.
393> , enabledExtensions :: [Extension]
394
395To dissconnect from died peers appropriately we should check if a peer
396do not sent the KA message within given interval. If yes, we should
397throw an exception in 'TimeoutCallback' and close session between
398peers.
399
400We should update timeout if we /receive/ any message within timeout
401interval to keep connection up.
402
403> , incomingTimeout :: !TimeoutKey
404
405To send KA message appropriately we should know when was last time we
406sent a message to a peer. To do that we keep registered timeout in
407event manager and if we do not sent any message to the peer within
408given interval then we send KA message in 'TimeoutCallback'.
409
410We should update timeout if we /send/ any message within timeout to
411avoid reduntant KA messages.
412
413> , outcomingTimeout :: !TimeoutKey
414>
415> -- | Broadcast messages waiting to be sent to peer.
416> , pendingMessages :: !(TChan Message)
417
418> -- | Dymanic P2P data.
419> , sessionState :: !(IORef SessionState)
420> }
421
422> instance Eq PeerSession where
423> (==) = (==) `on` connectedPeerAddr
424
425> instance Ord PeerSession where
426> compare = comparing connectedPeerAddr
427
428> findPieceCount :: PeerSession -> PieceCount
429> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
430
431Peer session state
432------------------------------------------------------------------------
433
434> data SessionState = SessionState {
435> _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
436> , _status :: !SessionStatus -- ^ Status of both peers.
437> } deriving (Show, Eq)
438
439> $(makeLenses ''SessionState)
440
441> initialSessionState :: PieceCount -> SessionState
442> initialSessionState pc = SessionState (haveNone pc) def
443
444Peer session exceptions
445------------------------------------------------------------------------
446
447> -- | Exceptions used to interrupt the current P2P session. This
448> -- exceptions will NOT affect other P2P sessions, DHT, peer <->
449> -- tracker, or any other session.
450> --
451> data SessionException = PeerDisconnected
452> | ProtocolError Doc
453> | UnknownTorrent InfoHash
454> deriving (Show, Typeable)
455
456> instance Exception SessionException
457
458
459> -- | Do nothing with exception, used with 'handle' or 'try'.
460> isSessionException :: Monad m => SessionException -> m ()
461> isSessionException _ = return ()
462
463> -- | The same as 'isSessionException' but output to stdout the catched
464> -- exception, for debugging purposes only.
465> putSessionException :: SessionException -> IO ()
466> putSessionException = print
467
468Broadcasting: Have, Cancel, Bitfield, SuggestPiece
469------------------------------------------------------------------------
470
471Here we should enqueue broadcast messages and keep in mind that:
472 * We should enqueue broadcast events as they are appear.
473 * We should yield broadcast messages as fast as we get them.
474
475these 2 phases might differ in time significantly
476
477**TODO**: do this; but only when it'll be clean which other broadcast
478messages & events we should send.
479
4801. Update client have bitfield --\____ in one transaction;
4812. Update downloaded stats --/
4823. Signal to the all other peer about this.
483
484> available :: Bitfield -> SwarmSession -> IO ()
485> available bf se @ SwarmSession {..} = {-# SCC available #-} do
486> mark >> atomically broadcast
487> where
488> mark = do
489> let piLen = ciPieceLength $ tInfo $ torrentMeta
490> let bytes = piLen * BF.haveCount bf
491> atomically $ do
492> modifyTVar' clientBitfield (BF.union bf)
493> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
494>
495> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
496
497-- TODO compute size of messages: if it's faster to send Bitfield
498-- instead many Have do that
499
500-- Also if there is single Have message in queue then the
501-- corresponding piece is likely still in memory or disc cache,
502-- when we can send SuggestPiece.
503
504-- | Get pending messages queue appeared in result of asynchronously
505-- changed client state. Resulting queue should be sent to a peer
506-- immediately.
507
508> getPending :: PeerSession -> IO [Message]
509> getPending PeerSession {..} = {-# SCC getPending #-} do
510> atomically (readAvail pendingMessages)
511
512> readAvail :: TChan a -> STM [a]
513> readAvail chan = do
514> m <- tryReadTChan chan
515> case m of
516> Just a -> (:) <$> pure a <*> readAvail chan
517> Nothing -> return []
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs
index 2b8fab07..b320f0f9 100644
--- a/src/Network/BitTorrent/Tracker.hs
+++ b/src/Network/BitTorrent/Tracker.hs
@@ -62,7 +62,7 @@ import Network.HTTP
62import Network.URI 62import Network.URI
63 63
64import Data.Torrent 64import Data.Torrent
65import Network.BitTorrent.Internal 65import Network.BitTorrent.Sessions.Types
66import Network.BitTorrent.Peer 66import Network.BitTorrent.Peer
67import Network.BitTorrent.Tracker.Protocol 67import Network.BitTorrent.Tracker.Protocol
68 68