summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/Internal.lhs411
1 files changed, 21 insertions, 390 deletions
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
index 23b3708a..ef4165e3 100644
--- a/src/Network/BitTorrent/Internal.lhs
+++ b/src/Network/BitTorrent/Internal.lhs
@@ -16,8 +16,6 @@
16> {-# LANGUAGE OverloadedStrings #-} 16> {-# LANGUAGE OverloadedStrings #-}
17> {-# LANGUAGE RecordWildCards #-} 17> {-# LANGUAGE RecordWildCards #-}
18> {-# LANGUAGE ViewPatterns #-} 18> {-# LANGUAGE ViewPatterns #-}
19> {-# LANGUAGE TemplateHaskell #-}
20> {-# LANGUAGE DeriveDataTypeable #-}
21> 19>
22> module Network.BitTorrent.Internal 20> module Network.BitTorrent.Internal
23> ( -- * Progress 21> ( -- * Progress
@@ -98,57 +96,28 @@
98> import Control.Monad.Trans 96> import Control.Monad.Trans
99 97
100> import Data.IORef 98> import Data.IORef
101> import Data.Default
102> import Data.Function
103> import Data.Foldable (mapM_) 99> import Data.Foldable (mapM_)
104> import Data.Map as M 100> import Data.Map as M
105> import Data.HashMap.Strict as HM 101> import Data.HashMap.Strict as HM
106> import Data.Ord
107> import Data.Set as S 102> import Data.Set as S
108> import Data.Typeable
109 103
110> import Data.Serialize hiding (get) 104> import Data.Serialize hiding (get)
111> import Text.PrettyPrint
112 105
113> import Network hiding (accept) 106> import Network hiding (accept)
114> import Network.Socket 107> import Network.Socket
115> import Network.Socket.ByteString 108> import Network.Socket.ByteString
116 109
117> import GHC.Event as Ev 110> import GHC.Event as Ev
118 111
119> import Data.Bitfield as BF 112> import Data.Bitfield as BF
120> import Data.Torrent 113> import Data.Torrent
121> import Network.BitTorrent.Extension 114> import Network.BitTorrent.Extension
122> import Network.BitTorrent.Peer 115> import Network.BitTorrent.Peer
123> import Network.BitTorrent.Exchange.Protocol as BT 116> import Network.BitTorrent.Exchange.Protocol as BT
124> import Network.BitTorrent.Tracker.Protocol as BT 117> import Network.BitTorrent.Tracker.Protocol as BT
125> import Network.BitTorrent.DHT.Protocol as BT
126> import System.Torrent.Storage 118> import System.Torrent.Storage
127 119> import Network.BitTorrent.Sessions.Types
128Progress 120
129------------------------------------------------------------------------
130
131Progress data is considered as dynamic within one client session. This
132data also should be shared across client application sessions
133(e.g. files), otherwise use 'startProgress' to get initial 'Progress'.
134
135> -- | 'Progress' contains upload/download/left stats about
136> -- current client state and used to notify the tracker.
137> data Progress = Progress {
138> _uploaded :: !Integer -- ^ Total amount of bytes uploaded.
139> , _downloaded :: !Integer -- ^ Total amount of bytes downloaded.
140> , _left :: !Integer -- ^ Total amount of bytes left.
141> } deriving (Show, Read, Eq)
142>
143> $(makeLenses ''Progress)
144
145**TODO:** Use Word64?
146
147**TODO:** Use atomic bits?
148
149Please note that tracker might penalize client some way if the do
150not accumulate progress. If possible and save 'Progress' between
151client sessions to avoid that.
152 121
153> -- | Initial progress is used when there are no session before. 122> -- | Initial progress is used when there are no session before.
154> startProgress :: Integer -> Progress 123> startProgress :: Integer -> Progress
@@ -177,131 +146,6 @@ client sessions to avoid that.
177> dequeuedProgress amount = left -~ amount 146> dequeuedProgress amount = left -~ amount
178> {-# INLINE dequeuedProgress #-} 147> {-# INLINE dequeuedProgress #-}
179 148
180
181Thread layout
182------------------------------------------------------------------------
183
184When client session created 2 new threads appear:
185
186 * DHT listener - replies to DHT requests;
187
188 * Peer listener - accept new P2P connection initiated by other
189peers.
190
191When swarn session created 3 new threads appear:
192
193 * DHT request loop asks for new peers;
194
195 * Tracker request loop asks for new peers;
196
197 * controller which fork new avaand manage running P2P sessions.
198
199Peer session is one always forked thread.
200
201When client\/swarm\/peer session gets closed kill the corresponding
202threads, but flush data to disc. (for e.g. storage block map)
203
204So for e.g., in order to obtain our first block we need to spawn at
205least 7 threads: main thread, 2 client session threads, 3 swarm session
206threads and PeerSession thread.
207
208Thread throttling
209------------------------------------------------------------------------
210
211If we will not restrict number of threads we could end up
212with thousands of connected swarm and make no particular progress.
213
214Note also we do not bound number of swarms! This is not optimal
215strategy because each swarm might have say 1 thread and we could end
216up bounded by the meaningless limit. Bounding global number of p2p
217sessions should work better, and simpler.
218
219**TODO:** priority based throttling: leecher thread have more priority
220than seeder threads.
221
222> -- | Each client might have a limited number of threads.
223> type ThreadCount = Int
224
225> -- | The number of threads suitable for a typical BT client.
226> defaultThreadCount :: ThreadCount
227> defaultThreadCount = 1000
228
229Torrent Map
230------------------------------------------------------------------------
231
232TODO: keep track global peer have piece set.
233
234Keeping all seeding torrent metafiles in memory is a _bad_ idea: for
2351TB of data we need at least 100MB of metadata. (using 256KB piece
236size). This solution do not scale further.
237
238To avoid this we keep just *metainfo* about *metainfo*:
239
240> -- | Local info about torrent location.
241> data TorrentLoc = TorrentLoc {
242> -- | Full path to .torrent metafile.
243> metafilePath :: FilePath
244> -- | Full path to directory contating content files associated
245> -- with the metafile.
246> , dataDirPath :: FilePath
247> }
248
249TorrentMap is used to keep track all known torrents for the
250client. When some peer trying to connect to us it's necessary to
251dispatch appropriate 'SwarmSession' (or start new one if there are
252none) in the listener loop: we only know 'InfoHash' from 'Handshake'
253but nothing more. So to accept new 'PeerSession' we need to lookup
254torrent metainfo and content files (if there are some) by the
255'InfoHash' and only after that enter exchange loop.
256
257TODO: check content files location;
258
259> validateLocation :: TorrentLoc -> IO Torrent
260> validateLocation = fromFile . metafilePath
261
262Solution with TorrentLoc is much better and takes much more less
263space, moreover it depends on count of torrents but not on count of
264data itself. To scale further, in future we might add something like
265database (for e.g. sqlite) for this kind of things.
266
267> -- | Used to find torrent info and data in order to accept connection.
268> type TorrentMap = HashMap InfoHash TorrentLoc
269
270While *registering* torrent we need to check if torrent metafile is
271correct, all the files are present in the filesystem and so
272forth. However content validation using hashes will take a long time,
273so we need to do this on demand: if a peer asks for a block, we
274validate corresponding piece and only after read and send the block
275back.
276
277> registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO ()
278> registerTorrent = error "registerTorrent"
279> {-
280> Torrent {..} <- validateTorrent tl
281> atomically $ modifyTVar' torrentMap $ HM.insert tInfoHash tl
282> return (Just t)
283> -}
284
285> unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
286> unregisterTorrent = error "unregisterTorrent"
287> -- modifyTVar' torrentMap $ HM.delete ih
288
289Client Services
290------------------------------------------------------------------------
291
292There are two servers started as client start:
293
294 * DHT node listener - needed by other peers to discover
295 * Peer listener - need by other peers to join this client.
296
297Thus any client (assuming DHT is enabled) provides at least 2 services
298so we can abstract out into ClientService:
299
300> data ClientService = ClientService {
301> servPort :: !PortNumber
302> , servThread :: !ThreadId
303> } deriving Show
304
305> startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO () 149> startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
306> startService s port m = do 150> startService s port m = do
307> stopService s 151> stopService s
@@ -317,72 +161,6 @@ Service A might depend on service B.
317> withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO () 161> withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
318> withRunning dep failure action = tryTakeMVar dep >>= maybe failure action 162> withRunning dep failure action = tryTakeMVar dep >>= maybe failure action
319 163
320Client Sessions
321------------------------------------------------------------------------
322
323Basically, client session should contain options which user
324application store in configuration files and related to the
325protocol. Moreover it should contain the all client identification
326info, for e.g. DHT.
327
328Client session is the basic unit of bittorrent network, it has:
329
330 * The /peer ID/ used as unique identifier of the client in
331network. Obviously, this value is not changed during client session.
332
333 * The number of /protocol extensions/ it might use. This value is
334static as well, but if you want to dynamically reconfigure the client
335you might kill the end the current session and create a new with the
336fresh required extensions.
337
338 * The number of /swarms/ to join, each swarm described by the
339'SwarmSession'.
340
341Normally, you would have one client session, however, if we needed, in
342one application we could have many clients with different peer ID's
343and different enabled extensions at the same time.
344
345> -- |
346> data ClientSession = ClientSession {
347> -- | Used in handshakes and discovery mechanism.
348> clientPeerId :: !PeerId
349
350> -- | Extensions we should try to use. Hovewer some particular peer
351> -- might not support some extension, so we keep enabledExtension in
352> -- 'PeerSession'.
353> , allowedExtensions :: [Extension]
354
355> , peerListener :: !(MVar ClientService)
356> , nodeListener :: !(MVar ClientService)
357
358> -- | Semaphor used to bound number of active P2P sessions.
359> , activeThreads :: !(MSem ThreadCount)
360
361> -- | Max number of active connections.
362> , maxActive :: !ThreadCount
363
364> -- | Used to traverse the swarm session.
365> , swarmSessions :: !(TVar (Map InfoHash SwarmSession))
366
367> , eventManager :: !EventManager
368
369> -- | Used to keep track global client progress.
370> , currentProgress :: !(TVar Progress)
371
372> -- | Used to keep track available torrents.
373> , torrentMap :: !(TVar TorrentMap)
374> }
375
376NOTE: currentProgress field is reduntant: progress depends on the all swarm
377bitfields maybe we can remove the 'currentProgress' and compute it on
378demand?
379
380> instance Eq ClientSession where
381> (==) = (==) `on` clientPeerId
382
383> instance Ord ClientSession where
384> compare = comparing clientPeerId
385
386Torrent presence 164Torrent presence
387------------------------------------------------------------------------ 165------------------------------------------------------------------------
388 166
@@ -454,23 +232,6 @@ Retrieving client info
454> dhtPort :: ClientSession -> IO PortNumber 232> dhtPort :: ClientSession -> IO PortNumber
455> dhtPort ClientSession {..} = servPort <$> readMVar nodeListener 233> dhtPort ClientSession {..} = servPort <$> readMVar nodeListener
456 234
457Swarm sessions
458------------------------------------------------------------------------
459
460NOTE: If client is a leecher then there is NO particular reason to
461set max sessions count more than the_number_of_unchoke_slots * k:
462
463 * thread slot(activeThread semaphore)
464 * will take but no
465
466So if client is a leecher then max sessions count depends on the
467number of unchoke slots.
468
469However if client is a seeder then the value depends on .
470
471> -- | Used to bound the number of simultaneous connections and, which
472> -- is the same, P2P sessions within the swarm session.
473> type SessionCount = Int
474 235
475> defSeederConns :: SessionCount 236> defSeederConns :: SessionCount
476> defSeederConns = defaultUnchokeSlots 237> defSeederConns = defaultUnchokeSlots
@@ -478,72 +239,6 @@ However if client is a seeder then the value depends on .
478> defLeacherConns :: SessionCount 239> defLeacherConns :: SessionCount
479> defLeacherConns = defaultNumWant 240> defLeacherConns = defaultNumWant
480 241
481> -- | Swarm session is
482> data SwarmSession = SwarmSession {
483> torrentMeta :: !Torrent
484
485> , clientSession :: !ClientSession
486
487TODO: lower "vacantPeers" when client becomes seeder according to
488throttling policy.
489
490Represent count of peers we _currently_ can connect to in the
491swarm. Used to bound number of concurrent threads. See also *Thread
492Throttling* section.
493
494> , vacantPeers :: !(MSem SessionCount)
495
496Client bitfield is used to keep track "the client have" piece set.
497Modify this carefully always updating global progress.
498
499> , clientBitfield :: !(TVar Bitfield)
500
501> , storage :: !Storage
502
503We keep set of the all connected peers for the each particular torrent
504to prevent duplicated and therefore reduntant TCP connections. For
505example consider the following very simle and realistic scenario:
506
507 * Peer A lookup tracker for peers.
508
509 * Peer B lookup tracker for peers.
510
511 * Finally, Peer A connect to B and Peer B connect to peer A
512simultaneously.
513
514There some other situation the problem may occur: duplicates in
515successive tracker responses, tracker and DHT returns. So without any
516protection we end up with two session between the same peers. That's
517bad because this could lead:
518
519 * Reduced throughput - multiple sessions between the same peers will
520mutiply control overhead (control messages, session state).
521
522 * Thread occupation - duplicated sessions will eat thread slots and
523discourage other, possible more useful, peers to establish connection.
524
525To avoid this we could check, into the one transaction, if a peer is
526already connected and add a connection only if it is not.
527
528> , connectedPeers :: !(TVar (Set PeerSession))
529
530TODO: use bounded broadcast chan with priority queue and drop old entries.
531
532Channel used for replicate messages across all peers in swarm. For
533exsample if we get some piece we should sent to all connected (and
534interested in) peers HAVE message.
535
536> , broadcastMessages :: !(TChan Message)
537> }
538
539INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers
540
541> instance Eq SwarmSession where
542> (==) = (==) `on` (tInfoHash . torrentMeta)
543
544> instance Ord SwarmSession where
545> compare = comparing (tInfoHash . torrentMeta)
546
547> newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent 242> newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
548> -> IO SwarmSession 243> -> IO SwarmSession
549> newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} 244> newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
@@ -624,6 +319,10 @@ getSwarm cs @ ClientSession {..} ih = do
624Peer sessions throttling 319Peer sessions throttling
625------------------------------------------------------------------------ 320------------------------------------------------------------------------
626 321
322> -- | The number of threads suitable for a typical BT client.
323> defaultThreadCount :: ThreadCount
324> defaultThreadCount = 1000
325
627> enterSwarm :: SwarmSession -> IO () 326> enterSwarm :: SwarmSession -> IO ()
628> enterSwarm SwarmSession {..} = do 327> enterSwarm SwarmSession {..} = do
629> MSem.wait (activeThreads clientSession) 328> MSem.wait (activeThreads clientSession)
@@ -646,95 +345,27 @@ Peer sessions throttling
646> action `finally` leaveSwarm se) 345> action `finally` leaveSwarm se)
647> `onException` leaveSwarm se 346> `onException` leaveSwarm se
648 347
649Peer sessions
650------------------------------------------------------------------------
651
652> -- | Peer session contain all data necessary for peer to peer
653> -- communication.
654> data PeerSession = PeerSession {
655> -- | Used as unique 'PeerSession' identifier within one
656> -- 'SwarmSession'.
657> connectedPeerAddr :: !PeerAddr
658
659> -- | The swarm to which both end points belong to.
660> , swarmSession :: !SwarmSession
661
662> -- | Extensions such that both peer and client support.
663> , enabledExtensions :: [Extension]
664
665To dissconnect from died peers appropriately we should check if a peer
666do not sent the KA message within given interval. If yes, we should
667throw an exception in 'TimeoutCallback' and close session between
668peers.
669
670We should update timeout if we /receive/ any message within timeout
671interval to keep connection up.
672
673> , incomingTimeout :: !TimeoutKey
674
675To send KA message appropriately we should know when was last time we
676sent a message to a peer. To do that we keep registered timeout in
677event manager and if we do not sent any message to the peer within
678given interval then we send KA message in 'TimeoutCallback'.
679
680We should update timeout if we /send/ any message within timeout to
681avoid reduntant KA messages.
682
683> , outcomingTimeout :: !TimeoutKey
684>
685> -- | Broadcast messages waiting to be sent to peer.
686> , pendingMessages :: !(TChan Message)
687
688> -- | Dymanic P2P data.
689> , sessionState :: !(IORef SessionState)
690> }
691
692> instance Eq PeerSession where
693> (==) = (==) `on` connectedPeerAddr
694
695> instance Ord PeerSession where
696> compare = comparing connectedPeerAddr
697
698Peer session state
699------------------------------------------------------------------------
700
701> data SessionState = SessionState {
702> _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
703> , _status :: !SessionStatus -- ^ Status of both peers.
704> } deriving (Show, Eq)
705
706> $(makeLenses ''SessionState)
707
708> initialSessionState :: PieceCount -> SessionState
709> initialSessionState pc = SessionState (haveNone pc) def
710 348
711> findPieceCount :: PeerSession -> PieceCount 349> findPieceCount :: PeerSession -> PieceCount
712> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession 350> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
713 351
714Peer session exceptions 352TODO: check content files location;
715------------------------------------------------------------------------
716
717> -- | Exceptions used to interrupt the current P2P session. This
718> -- exceptions will NOT affect other P2P sessions, DHT, peer <->
719> -- tracker, or any other session.
720> --
721> data SessionException = PeerDisconnected
722> | ProtocolError Doc
723> | UnknownTorrent InfoHash
724> deriving (Show, Typeable)
725
726> instance Exception SessionException
727
728 353
729> -- | Do nothing with exception, used with 'handle' or 'try'. 354> validateLocation :: TorrentLoc -> IO Torrent
730> isSessionException :: Monad m => SessionException -> m () 355> validateLocation = fromFile . metafilePath
731> isSessionException _ = return ()
732 356
733> -- | The same as 'isSessionException' but output to stdout the catched 357> registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO ()
734> -- exception, for debugging purposes only. 358> registerTorrent = error "registerTorrent"
735> putSessionException :: SessionException -> IO () 359> {-
736> putSessionException = print 360> Torrent {..} <- validateTorrent tl
361> atomically $ modifyTVar' torrentMap $ HM.insert tInfoHash tl
362> return (Just t)
363> -}
737 364
365> unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
366> unregisterTorrent = error "unregisterTorrent"
367> -- modifyTVar' torrentMap $ HM.delete ih
368
738> torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession 369> torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
739> torrentSwarm _ _ (Active sws) = return sws 370> torrentSwarm _ _ (Active sws) = return sws
740> torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc 371> torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc