summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-07-07 03:21:49 +0400
committerSam T <pxqr.sta@gmail.com>2013-07-07 03:21:49 +0400
commitc4b95aaa5709a326167a9301a5a5b25500e299c4 (patch)
tree33996b884741c4ed81903febf148fbff418eb7d0 /src/Network
parent0fe7e4ebd2f7ffd9f71738d6683427a110e58497 (diff)
Literate Internal module a bit
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/Internal.hs686
-rw-r--r--src/Network/BitTorrent/Internal.lhs686
2 files changed, 686 insertions, 686 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
deleted file mode 100644
index dd5f3bb7..00000000
--- a/src/Network/BitTorrent/Internal.hs
+++ /dev/null
@@ -1,686 +0,0 @@
1-- |
2-- Copyright : (c) Sam T. 2013
3-- License : MIT
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module implement opaque broadcast message passing. It
9-- provides sessions needed by Network.BitTorrent and
10-- Network.BitTorrent.Exchange and modules. To hide some internals
11-- of this module we detach it from Exchange.
12--
13-- Thread layout
14--
15-- When client session created 2 new threads appear:
16--
17-- * DHT listener - replies to DHT requests;
18--
19-- * Peer listener - accept new P2P connection initiated by other
20-- peers.
21--
22-- When swarn session created 3 new threads appear:
23--
24-- * DHT request loop asks for new peers;
25--
26-- * Tracker request loop asks for new peers;
27--
28-- * controller which fork new avaand manage running P2P sessions.
29--
30-- Peer session is one always forked thread.
31--
32-- When client\/swarm\/peer session gets closed kill the
33-- corresponding threads, but flush data to disc. (for e.g. storage
34-- block map)
35--
36-- So for e.g., in order to obtain our first block we need to run at
37-- least 7 threads: main thread, 2 client session thread, 3 swarm
38-- session threads and PeerSession thread.
39--
40--
41-- NOTE: expose only static data in data field lists, all dynamic
42-- data should be modified through standalone functions.
43--
44{-# LANGUAGE OverloadedStrings #-}
45{-# LANGUAGE RecordWildCards #-}
46{-# LANGUAGE ViewPatterns #-}
47{-# LANGUAGE TemplateHaskell #-}
48{-# LANGUAGE DeriveDataTypeable #-}
49module Network.BitTorrent.Internal
50 ( Progress(..), startProgress
51
52 -- * Client
53 , ClientSession (clientPeerId, allowedExtensions, listenerPort)
54
55 , ThreadCount
56 , defaultThreadCount
57
58 , TorrentLoc(..)
59 , registerTorrent
60 , unregisterTorrent
61
62 , newClient
63
64 , getCurrentProgress
65 , getSwarmCount
66 , getPeerCount
67
68
69 -- * Swarm
70 , SwarmSession( SwarmSession, torrentMeta, clientSession )
71
72 , SessionCount
73 , getSessionCount
74
75 , newLeecher
76 , newSeeder
77 , getClientBitfield
78
79 , enterSwarm
80 , leaveSwarm
81 , waitVacancy
82
83 , pieceLength
84
85 -- * Peer
86 , PeerSession( PeerSession, connectedPeerAddr
87 , swarmSession, enabledExtensions
88 , sessionState
89 )
90 , SessionState
91 , withPeerSession
92
93 -- ** Broadcasting
94 , available
95 , getPending
96
97 -- ** Exceptions
98 , SessionException(..)
99 , isSessionException
100 , putSessionException
101
102 -- ** Properties
103 , bitfield, status
104 , findPieceCount
105
106 -- * Timeouts
107 , updateIncoming, updateOutcoming
108 ) where
109
110import Prelude hiding (mapM_)
111
112import Control.Applicative
113import Control.Concurrent
114import Control.Concurrent.STM
115import Control.Concurrent.MSem as MSem
116import Control.Lens
117import Control.Exception
118import Control.Monad.Trans
119
120import Data.IORef
121import Data.Default
122import Data.Function
123import Data.Foldable (mapM_)
124import Data.HashMap.Strict as HM
125import Data.Ord
126import Data.Set as S
127import Data.Typeable
128
129import Data.Serialize hiding (get)
130import Text.PrettyPrint
131
132import Network
133import Network.Socket
134import Network.Socket.ByteString
135
136import GHC.Event as Ev
137
138import Data.Bitfield as BF
139import Data.Torrent
140import Network.BitTorrent.Extension
141import Network.BitTorrent.Peer
142import Network.BitTorrent.Exchange.Protocol as BT
143import Network.BitTorrent.Tracker.Protocol as BT
144
145{-----------------------------------------------------------------------
146 Progress
147-----------------------------------------------------------------------}
148
149-- | 'Progress' contains upload/download/left stats about
150-- current client state and used to notify the tracker.
151--
152-- This data is considered as dynamic within one client
153-- session. This data also should be shared across client
154-- application sessions (e.g. files), otherwise use 'startProgress'
155-- to get initial 'Progress'.
156--
157data Progress = Progress {
158 _uploaded :: !Integer -- ^ Total amount of bytes uploaded.
159 , _downloaded :: !Integer -- ^ Total amount of bytes downloaded.
160 , _left :: !Integer -- ^ Total amount of bytes left.
161 } deriving (Show, Read, Eq)
162
163-- TODO use atomic bits and Word64
164
165$(makeLenses ''Progress)
166
167-- | Initial progress is used when there are no session before.
168--
169-- Please note that tracker might penalize client some way if the do
170-- not accumulate progress. If possible and save 'Progress' between
171-- client sessions to avoid that.
172--
173startProgress :: Integer -> Progress
174startProgress = Progress 0 0
175
176-- | Used when the client download some data from /any/ peer.
177downloadedProgress :: Int -> Progress -> Progress
178downloadedProgress (fromIntegral -> amount)
179 = (left -~ amount)
180 . (downloaded +~ amount)
181{-# INLINE downloadedProgress #-}
182
183-- | Used when the client upload some data to /any/ peer.
184uploadedProgress :: Int -> Progress -> Progress
185uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
186{-# INLINE uploadedProgress #-}
187
188-- | Used when leecher join client session.
189enqueuedProgress :: Integer -> Progress -> Progress
190enqueuedProgress amount = left +~ amount
191{-# INLINE enqueuedProgress #-}
192
193-- | Used when leecher leave client session.
194-- (e.g. user deletes not completed torrent)
195dequeuedProgress :: Integer -> Progress -> Progress
196dequeuedProgress amount = left -~ amount
197{-# INLINE dequeuedProgress #-}
198
199{-----------------------------------------------------------------------
200 Client session
201-----------------------------------------------------------------------}
202
203{- NOTE: If we will not restrict number of threads we could end up
204with thousands of connected swarm and make no particular progress.
205
206Note also we do not bound number of swarms! This is not optimal
207strategy because each swarm might have say 1 thread and we could end
208up bounded by the meaningless limit. Bounding global number of p2p
209sessions should work better, and simpler.-}
210
211-- | Each client might have a limited number of threads.
212type ThreadCount = Int
213
214-- | The number of threads suitable for a typical BT client.
215defaultThreadCount :: ThreadCount
216defaultThreadCount = 1000
217
218{- PERFORMANCE NOTE: keeping torrent metafiles in memory is a _bad_
219idea: for 1TB of data we need at least 100MB of metadata. (using 256KB
220piece size). This solution do not scale further. Solution with
221TorrentLoc is much better and takes much more less space, moreover it
222depends on count of torrents but not on count of data itself. To scale
223further, in future we might add something like database (for
224e.g. sqlite) for this kind of things.-}
225
226-- | Identifies location of
227data TorrentLoc = TorrentLoc {
228 metafilePath :: FilePath
229 , dataPath :: FilePath
230 }
231
232validateTorrent :: TorrentLoc -> IO ()
233validateTorrent = error "validateTorrent: not implemented"
234
235-- | TorrentMap is used to keep track all known torrents for the
236-- client. When some peer trying to connect to us it's necessary to
237-- dispatch appropriate 'SwarmSession' (or start new one if there are
238-- none) in the listener loop: we only know 'InfoHash' from
239-- 'Handshake' but nothing more. So to accept new 'PeerSession' we
240-- need to lookup torrent metainfo and content files (if there are
241-- some) by the 'InfoHash' and only after that enter exchange loop.
242--
243type TorrentMap = HashMap InfoHash TorrentLoc
244
245{- NOTE: basically, client session should contain options which user
246app store in configuration files. (related to the protocol) Moreover
247it should contain the all client identification info. (e.g. DHT) -}
248
249-- | Client session is the basic unit of bittorrent network, it has:
250--
251-- * The /peer ID/ used as unique identifier of the client in
252-- network. Obviously, this value is not changed during client
253-- session.
254--
255-- * The number of /protocol extensions/ it might use. This value
256-- is static as well, but if you want to dynamically reconfigure
257-- the client you might kill the end the current session and
258-- create a new with the fresh required extensions.
259--
260-- * The number of /swarms/ to join, each swarm described by the
261-- 'SwarmSession'.
262--
263-- Normally, you would have one client session, however, if we need,
264-- in one application we could have many clients with different peer
265-- ID's and different enabled extensions at the same time.
266--
267data ClientSession = ClientSession {
268 -- | Used in handshakes and discovery mechanism.
269 clientPeerId :: !PeerId
270
271 -- | Extensions we should try to use. Hovewer some particular peer
272 -- might not support some extension, so we keep enabledExtension in
273 -- 'PeerSession'.
274 , allowedExtensions :: [Extension]
275
276 -- | Port where client listen for other peers
277 , listenerPort :: PortNumber
278 -- TODO restart listener if it fail
279
280 -- | Semaphor used to bound number of active P2P sessions.
281 , activeThreads :: !(MSem ThreadCount)
282
283 -- | Max number of active connections.
284 , maxActive :: !ThreadCount
285
286 -- | Used to traverse the swarm session.
287 , swarmSessions :: !(TVar (Set SwarmSession))
288
289 , eventManager :: !EventManager
290
291 -- | Used to keep track global client progress.
292 , currentProgress :: !(TVar Progress)
293
294 -- | Used to keep track available torrents.
295 , torrentMap :: !(TVar TorrentMap)
296 }
297
298-- currentProgress field is reduntant: progress depends on the all swarm bitfields
299-- maybe we can remove the 'currentProgress' and compute it on demand?
300
301
302instance Eq ClientSession where
303 (==) = (==) `on` clientPeerId
304
305instance Ord ClientSession where
306 compare = comparing clientPeerId
307
308-- | Get current global progress of the client. This value is usually
309-- shown to a user.
310getCurrentProgress :: MonadIO m => ClientSession -> m Progress
311getCurrentProgress = liftIO . readTVarIO . currentProgress
312
313-- | Get number of swarms client aware of.
314getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
315getSwarmCount ClientSession {..} = liftIO $
316 S.size <$> readTVarIO swarmSessions
317
318-- | Get number of peers the client currently connected to.
319getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
320getPeerCount ClientSession {..} = liftIO $ do
321 unused <- peekAvail activeThreads
322 return (maxActive - unused)
323
324-- | Create a new client session. The data passed to this function are
325-- usually loaded from configuration file.
326newClient :: SessionCount -- ^ Maximum count of active P2P Sessions.
327 -> [Extension] -- ^ Extensions allowed to use.
328 -> IO ClientSession -- ^ Client with unique peer ID.
329
330newClient n exts = do
331 mgr <- Ev.new
332 -- TODO kill this thread when leave client
333 _ <- forkIO $ loop mgr
334
335 ClientSession
336 <$> newPeerId
337 <*> pure exts
338 <*> forkListener (error "listener")
339 <*> MSem.new n
340 <*> pure n
341 <*> newTVarIO S.empty
342 <*> pure mgr
343 <*> newTVarIO (startProgress 0)
344 <*> newTVarIO HM.empty
345
346registerTorrent :: ClientSession -> InfoHash -> TorrentLoc -> STM ()
347registerTorrent ClientSession {..} ih tl = do
348 modifyTVar' torrentMap $ HM.insert ih tl
349
350unregisterTorrent :: ClientSession -> InfoHash -> STM ()
351unregisterTorrent ClientSession {..} ih = do
352 modifyTVar' torrentMap $ HM.delete ih
353
354{-----------------------------------------------------------------------
355 Swarm session
356-----------------------------------------------------------------------}
357
358{- NOTE: If client is a leecher then there is NO particular reason to
359set max sessions count more than the_number_of_unchoke_slots * k:
360
361 * thread slot(activeThread semaphore)
362 * will take but no
363
364So if client is a leecher then max sessions count depends on the
365number of unchoke slots.
366
367However if client is a seeder then the value depends on .
368-}
369
370-- | Used to bound the number of simultaneous connections and, which
371-- is the same, P2P sessions within the swarm session.
372type SessionCount = Int
373
374defSeederConns :: SessionCount
375defSeederConns = defaultUnchokeSlots
376
377defLeacherConns :: SessionCount
378defLeacherConns = defaultNumWant
379
380-- | Swarm session is
381data SwarmSession = SwarmSession {
382 torrentMeta :: !Torrent
383
384 -- |
385 , clientSession :: !ClientSession
386
387 -- | Represent count of peers we _currently_ can connect to in the
388 -- swarm. Used to bound number of concurrent threads.
389 , vacantPeers :: !(MSem SessionCount)
390
391 -- | Modify this carefully updating global progress.
392 , clientBitfield :: !(TVar Bitfield)
393
394 , connectedPeers :: !(TVar (Set PeerSession))
395
396 -- TODO use bounded broadcast chan with priority queue and drop old entries
397 -- | Channel used for replicate messages across all peers in
398 -- swarm. For exsample if we get some piece we should sent to all
399 -- connected (and interested in) peers HAVE message.
400 --
401 , broadcastMessages :: !(TChan Message)
402 }
403
404-- INVARIANT:
405-- max_sessions_count - sizeof connectedPeers = value vacantPeers
406
407instance Eq SwarmSession where
408 (==) = (==) `on` (tInfoHash . torrentMeta)
409
410instance Ord SwarmSession where
411 compare = comparing (tInfoHash . torrentMeta)
412
413newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
414 -> IO SwarmSession
415newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
416 = SwarmSession <$> pure t
417 <*> pure cs
418 <*> MSem.new n
419 <*> newTVarIO bf
420 <*> newTVarIO S.empty
421 <*> newBroadcastTChanIO
422
423-- | New swarm session in which the client allowed to upload only.
424newSeeder :: ClientSession -> Torrent -> IO SwarmSession
425newSeeder cs t @ Torrent {..}
426 = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
427
428-- | New swarm in which the client allowed both download and upload.
429newLeecher :: ClientSession -> Torrent -> IO SwarmSession
430newLeecher cs t @ Torrent {..} = do
431 se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
432 atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
433 return se
434
435--isLeacher :: SwarmSession -> IO Bool
436--isLeacher = undefined
437
438-- | Get the number of connected peers in the given swarm.
439getSessionCount :: SwarmSession -> IO SessionCount
440getSessionCount SwarmSession {..} = do
441 S.size <$> readTVarIO connectedPeers
442
443getClientBitfield :: SwarmSession -> IO Bitfield
444getClientBitfield = readTVarIO . clientBitfield
445
446{-
447haveDone :: MonadIO m => PieceIx -> SwarmSession -> m ()
448haveDone ix =
449 liftIO $ atomically $ do
450 bf <- readTVar clientBitfield
451 writeTVar (have ix bf)
452 currentProgress
453-}
454
455-- acquire/release mechanism: for internal use only
456
457enterSwarm :: SwarmSession -> IO ()
458enterSwarm SwarmSession {..} = do
459 MSem.wait (activeThreads clientSession)
460 MSem.wait vacantPeers
461
462leaveSwarm :: SwarmSession -> IO ()
463leaveSwarm SwarmSession {..} = do
464 MSem.signal vacantPeers
465 MSem.signal (activeThreads clientSession)
466
467waitVacancy :: SwarmSession -> IO () -> IO ()
468waitVacancy se =
469 bracket (enterSwarm se) (const (leaveSwarm se))
470 . const
471
472pieceLength :: SwarmSession -> Int
473pieceLength = ciPieceLength . tInfo . torrentMeta
474{-# INLINE pieceLength #-}
475
476{-----------------------------------------------------------------------
477 Peer session
478-----------------------------------------------------------------------}
479
480-- | Peer session contain all data necessary for peer to peer
481-- communication.
482data PeerSession = PeerSession {
483 -- | Used as unique 'PeerSession' identifier within one
484 -- 'SwarmSession'.
485 connectedPeerAddr :: !PeerAddr
486
487 -- | The swarm to which both end points belong to.
488 , swarmSession :: !SwarmSession
489
490 -- | Extensions such that both peer and client support.
491 , enabledExtensions :: [Extension]
492
493 -- | To dissconnect from died peers appropriately we should check
494 -- if a peer do not sent the KA message within given interval. If
495 -- yes, we should throw an exception in 'TimeoutCallback' and
496 -- close session between peers.
497 --
498 -- We should update timeout if we /receive/ any message within
499 -- timeout interval to keep connection up.
500 , incomingTimeout :: !TimeoutKey
501
502 -- | To send KA message appropriately we should know when was last
503 -- time we sent a message to a peer. To do that we keep registered
504 -- timeout in event manager and if we do not sent any message to
505 -- the peer within given interval then we send KA message in
506 -- 'TimeoutCallback'.
507 --
508 -- We should update timeout if we /send/ any message within timeout
509 -- to avoid reduntant KA messages.
510 --
511 , outcomingTimeout :: !TimeoutKey
512
513 -- TODO use dupChan for broadcasting
514 -- | Broadcast messages waiting to be sent to peer.
515 , pendingMessages :: !(TChan Message)
516
517 -- | Dymanic P2P data.
518 , sessionState :: !(IORef SessionState)
519 }
520
521-- TODO unpack some fields
522
523data SessionState = SessionState {
524 _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
525 , _status :: !SessionStatus -- ^ Status of both peers.
526 } deriving (Show, Eq)
527
528$(makeLenses ''SessionState)
529
530instance Eq PeerSession where
531 (==) = (==) `on` connectedPeerAddr
532
533instance Ord PeerSession where
534 compare = comparing connectedPeerAddr
535
536-- | Exceptions used to interrupt the current P2P session. This
537-- exceptions will NOT affect other P2P sessions, DHT, peer <->
538-- tracker, or any other session.
539--
540data SessionException = PeerDisconnected
541 | ProtocolError Doc
542 deriving (Show, Typeable)
543
544instance Exception SessionException
545
546
547-- | Do nothing with exception, used with 'handle' or 'try'.
548isSessionException :: Monad m => SessionException -> m ()
549isSessionException _ = return ()
550
551-- | The same as 'isSessionException' but output to stdout the catched
552-- exception, for debugging purposes only.
553putSessionException :: SessionException -> IO ()
554putSessionException = print
555
556-- TODO modify such that we can use this in listener loop
557-- TODO check if it connected yet peer
558withPeerSession :: SwarmSession -> PeerAddr
559 -> ((Socket, PeerSession) -> IO ())
560 -> IO ()
561
562withPeerSession ss @ SwarmSession {..} addr
563 = handle isSessionException . bracket openSession closeSession
564 where
565 openSession = do
566 let caps = encodeExts $ allowedExtensions $ clientSession
567 let ihash = tInfoHash torrentMeta
568 let pid = clientPeerId $ clientSession
569 let chs = Handshake defaultBTProtocol caps ihash pid
570
571 sock <- connectToPeer addr
572 phs <- handshake sock chs `onException` close sock
573
574 cbf <- readTVarIO clientBitfield
575 sendAll sock (encode (Bitfield cbf))
576
577 let enabled = decodeExts (enabledCaps caps (handshakeCaps phs))
578 ps <- PeerSession addr ss enabled
579 <$> registerTimeout (eventManager clientSession)
580 maxIncomingTime (return ())
581 <*> registerTimeout (eventManager clientSession)
582 maxOutcomingTime (sendKA sock)
583 <*> atomically (dupTChan broadcastMessages)
584 <*> do {
585 ; tc <- totalCount <$> readTVarIO clientBitfield
586 ; newIORef (SessionState (haveNone tc) def)
587 }
588
589 atomically $ modifyTVar' connectedPeers (S.insert ps)
590
591 return (sock, ps)
592
593 closeSession (sock, ps) = do
594 atomically $ modifyTVar' connectedPeers (S.delete ps)
595 close sock
596
597findPieceCount :: PeerSession -> PieceCount
598findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
599
600{-----------------------------------------------------------------------
601 Broadcasting: Have, Cancel, Bitfield, SuggestPiece
602-----------------------------------------------------------------------}
603
604-- here we should enqueue broadcast messages and keep in mind that:
605--
606-- * We should enqueue broadcast events as they are appear.
607-- * We should yield broadcast messages as fast as we get them.
608--
609-- these 2 phases might differ in time significantly
610
611-- TODO do this; but only when it'll be clean which other broadcast
612-- messages & events we should send
613
614-- 1. Update client have bitfield --\____ in one transaction;
615-- 2. Update downloaded stats --/
616-- 3. Signal to the all other peer about this.
617
618available :: Bitfield -> SwarmSession -> IO ()
619available bf se @ SwarmSession {..} = {-# SCC available #-} do
620 mark >> atomically broadcast
621 where
622 mark = do
623 let bytes = pieceLength se * BF.haveCount bf
624 atomically $ do
625 modifyTVar' clientBitfield (BF.union bf)
626 modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
627
628 broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
629
630
631-- TODO compute size of messages: if it's faster to send Bitfield
632-- instead many Have do that
633--
634-- also if there is single Have message in queue then the
635-- corresponding piece is likely still in memory or disc cache,
636-- when we can send SuggestPiece
637
638-- | Get pending messages queue appeared in result of asynchronously
639-- changed client state. Resulting queue should be sent to a peer
640-- immediately.
641getPending :: PeerSession -> IO [Message]
642getPending PeerSession {..} = {-# SCC getPending #-} do
643 atomically (readAvail pendingMessages)
644
645readAvail :: TChan a -> STM [a]
646readAvail chan = do
647 m <- tryReadTChan chan
648 case m of
649 Just a -> (:) <$> pure a <*> readAvail chan
650 Nothing -> return []
651
652{-----------------------------------------------------------------------
653 Timeouts
654-----------------------------------------------------------------------}
655
656-- for internal use only
657
658sec :: Int
659sec = 1000 * 1000
660
661maxIncomingTime :: Int
662maxIncomingTime = 120 * sec
663
664maxOutcomingTime :: Int
665maxOutcomingTime = 1 * sec
666
667-- | Should be called after we have received any message from a peer.
668updateIncoming :: PeerSession -> IO ()
669updateIncoming PeerSession {..} = do
670 updateTimeout (eventManager (clientSession swarmSession))
671 incomingTimeout maxIncomingTime
672
673-- | Should be called before we have send any message to a peer.
674updateOutcoming :: PeerSession -> IO ()
675updateOutcoming PeerSession {..} =
676 updateTimeout (eventManager (clientSession swarmSession))
677 outcomingTimeout maxOutcomingTime
678
679sendKA :: Socket -> IO ()
680sendKA sock {- SwarmSession {..} -} = do
681 return ()
682-- print "I'm sending keep alive."
683-- sendAll sock (encode BT.KeepAlive)
684-- let mgr = eventManager clientSession
685-- updateTimeout mgr
686-- print "Done.."
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
new file mode 100644
index 00000000..3f8f0371
--- /dev/null
+++ b/src/Network/BitTorrent/Internal.lhs
@@ -0,0 +1,686 @@
1> -- |
2> -- Copyright : (c) Sam T. 2013
3> -- License : MIT
4> -- Maintainer : pxqr.sta@gmail.com
5> -- Stability : experimental
6> -- Portability : portable
7> --
8> -- This module implement opaque broadcast message passing. It
9> -- provides sessions needed by Network.BitTorrent and
10> -- Network.BitTorrent.Exchange and modules. To hide some internals
11> -- of this module we detach it from Exchange.
12> --
13> --
14> -- NOTE: expose only static data in data field lists, all dynamic
15> -- data should be modified through standalone functions.
16> --
17> {-# LANGUAGE OverloadedStrings #-}
18> {-# LANGUAGE RecordWildCards #-}
19> {-# LANGUAGE ViewPatterns #-}
20> {-# LANGUAGE TemplateHaskell #-}
21> {-# LANGUAGE DeriveDataTypeable #-}
22> module Network.BitTorrent.Internal
23> ( Progress(..), startProgress
24> -- * Client
25> , ClientSession (clientPeerId, allowedExtensions, listenerPort)
26>
27> , ThreadCount
28> , defaultThreadCount
29>
30> , TorrentLoc(..)
31> , registerTorrent
32> , unregisterTorrent
33> , newClient
34
35> , getCurrentProgress
36> , getSwarmCount
37> , getPeerCount
38
39
40
41> -- * Swarm
42> , SwarmSession( SwarmSession, torrentMeta, clientSession )
43
44> , SessionCount
45> , getSessionCount
46
47> , newLeecher
48> , newSeeder
49> , getClientBitfield
50
51> , enterSwarm
52> , leaveSwarm
53> , waitVacancy
54
55> , pieceLength
56
57> -- * Peer
58> , PeerSession( PeerSession, connectedPeerAddr
59> , swarmSession, enabledExtensions
60> , sessionState
61> )
62> , SessionState
63> , withPeerSession
64
65> -- ** Broadcasting
66> , available
67> , getPending
68
69> -- ** Exceptions
70> , SessionException(..)
71> , isSessionException
72> , putSessionException
73
74> -- ** Properties
75> , bitfield, status
76> , findPieceCount
77
78> -- * Timeouts
79> , updateIncoming, updateOutcoming
80> ) where
81
82> import Prelude hiding (mapM_)
83
84> import Control.Applicative
85> import Control.Concurrent
86> import Control.Concurrent.STM
87> import Control.Concurrent.MSem as MSem
88> import Control.Lens
89> import Control.Exception
90> import Control.Monad.Trans
91
92> import Data.IORef
93> import Data.Default
94> import Data.Function
95> import Data.Foldable (mapM_)
96> import Data.HashMap.Strict as HM
97> import Data.Ord
98> import Data.Set as S
99> import Data.Typeable
100
101> import Data.Serialize hiding (get)
102> import Text.PrettyPrint
103
104> import Network
105> import Network.Socket
106> import Network.Socket.ByteString
107
108> import GHC.Event as Ev
109
110> import Data.Bitfield as BF
111> import Data.Torrent
112> import Network.BitTorrent.Extension
113> import Network.BitTorrent.Peer
114> import Network.BitTorrent.Exchange.Protocol as BT
115> import Network.BitTorrent.Tracker.Protocol as BT
116
117> {-----------------------------------------------------------------------
118> Progress
119> -----------------------------------------------------------------------}
120
121> -- | 'Progress' contains upload/download/left stats about
122> -- current client state and used to notify the tracker.
123> --
124> -- This data is considered as dynamic within one client
125> -- session. This data also should be shared across client
126> -- application sessions (e.g. files), otherwise use 'startProgress'
127> -- to get initial 'Progress'.
128> --
129> data Progress = Progress {
130> _uploaded :: !Integer -- ^ Total amount of bytes uploaded.
131> , _downloaded :: !Integer -- ^ Total amount of bytes downloaded.
132> , _left :: !Integer -- ^ Total amount of bytes left.
133> } deriving (Show, Read, Eq)
134
135> -- TODO use atomic bits and Word64
136
137> $(makeLenses ''Progress)
138
139> -- | Initial progress is used when there are no session before.
140> --
141> -- Please note that tracker might penalize client some way if the do
142> -- not accumulate progress. If possible and save 'Progress' between
143> -- client sessions to avoid that.
144> --
145> startProgress :: Integer -> Progress
146> startProgress = Progress 0 0
147
148> -- | Used when the client download some data from /any/ peer.
149> downloadedProgress :: Int -> Progress -> Progress
150> downloadedProgress (fromIntegral -> amount)
151> = (left -~ amount)
152> . (downloaded +~ amount)
153> {-# INLINE downloadedProgress #-}
154
155> -- | Used when the client upload some data to /any/ peer.
156> uploadedProgress :: Int -> Progress -> Progress
157> uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
158> {-# INLINE uploadedProgress #-}
159
160> -- | Used when leecher join client session.
161> enqueuedProgress :: Integer -> Progress -> Progress
162> enqueuedProgress amount = left +~ amount
163> {-# INLINE enqueuedProgress #-}
164
165> -- | Used when leecher leave client session.
166> -- (e.g. user deletes not completed torrent)
167> dequeuedProgress :: Integer -> Progress -> Progress
168> dequeuedProgress amount = left -~ amount
169> {-# INLINE dequeuedProgress #-}
170
171
172Thread layout
173-------------
174
175When client session created 2 new threads appear:
176
177 * DHT listener - replies to DHT requests;
178
179 * Peer listener - accept new P2P connection initiated by other
180peers.
181
182When swarn session created 3 new threads appear:
183
184 * DHT request loop asks for new peers;
185
186 * Tracker request loop asks for new peers;
187
188 * controller which fork new avaand manage running P2P sessions.
189
190Peer session is one always forked thread.
191
192When client\/swarm\/peer session gets closed kill the corresponding
193threads, but flush data to disc. (for e.g. storage block map)
194
195So for e.g., in order to obtain our first block we need to run at
196least 7 threads: main thread, 2 client session thread, 3 swarm session
197threads and PeerSession thread.
198
199> {-----------------------------------------------------------------------
200> Client session
201> -----------------------------------------------------------------------}
202
203> {- NOTE: If we will not restrict number of threads we could end up
204> with thousands of connected swarm and make no particular progress.
205>
206> Note also we do not bound number of swarms! This is not optimal
207> strategy because each swarm might have say 1 thread and we could end
208> up bounded by the meaningless limit. Bounding global number of p2p
209> sessions should work better, and simpler.-}
210
211> -- | Each client might have a limited number of threads.
212> type ThreadCount = Int
213
214> -- | The number of threads suitable for a typical BT client.
215> defaultThreadCount :: ThreadCount
216> defaultThreadCount = 1000
217
218> {- PERFORMANCE NOTE: keeping torrent metafiles in memory is a _bad_
219> idea: for 1TB of data we need at least 100MB of metadata. (using 256KB
220> piece size). This solution do not scale further. Solution with
221> TorrentLoc is much better and takes much more less space, moreover it
222> depends on count of torrents but not on count of data itself. To scale
223> further, in future we might add something like database (for
224> e.g. sqlite) for this kind of things.-}
225
226> -- | Identifies location of
227> data TorrentLoc = TorrentLoc {
228> metafilePath :: FilePath
229> , dataPath :: FilePath
230> }
231
232> validateTorrent :: TorrentLoc -> IO ()
233> validateTorrent = error "validateTorrent: not implemented"
234
235> -- | TorrentMap is used to keep track all known torrents for the
236> -- client. When some peer trying to connect to us it's necessary to
237> -- dispatch appropriate 'SwarmSession' (or start new one if there are
238> -- none) in the listener loop: we only know 'InfoHash' from
239> -- 'Handshake' but nothing more. So to accept new 'PeerSession' we
240> -- need to lookup torrent metainfo and content files (if there are
241> -- some) by the 'InfoHash' and only after that enter exchange loop.
242> --
243> type TorrentMap = HashMap InfoHash TorrentLoc
244
245> {- NOTE: basically, client session should contain options which user
246> app store in configuration files. (related to the protocol) Moreover
247> it should contain the all client identification info. (e.g. DHT) -}
248
249> -- | Client session is the basic unit of bittorrent network, it has:
250> --
251> -- * The /peer ID/ used as unique identifier of the client in
252> -- network. Obviously, this value is not changed during client
253> -- session.
254> --
255> -- * The number of /protocol extensions/ it might use. This value
256> -- is static as well, but if you want to dynamically reconfigure
257> -- the client you might kill the end the current session and
258> -- create a new with the fresh required extensions.
259> --
260> -- * The number of /swarms/ to join, each swarm described by the
261> -- 'SwarmSession'.
262> --
263> -- Normally, you would have one client session, however, if we need,
264> -- in one application we could have many clients with different peer
265> -- ID's and different enabled extensions at the same time.
266> --
267> data ClientSession = ClientSession {
268> -- | Used in handshakes and discovery mechanism.
269> clientPeerId :: !PeerId
270
271> -- | Extensions we should try to use. Hovewer some particular peer
272> -- might not support some extension, so we keep enabledExtension in
273> -- 'PeerSession'.
274> , allowedExtensions :: [Extension]
275
276> -- | Port where client listen for other peers
277> , listenerPort :: PortNumber
278> -- TODO restart listener if it fail
279
280> -- | Semaphor used to bound number of active P2P sessions.
281> , activeThreads :: !(MSem ThreadCount)
282
283> -- | Max number of active connections.
284> , maxActive :: !ThreadCount
285
286> -- | Used to traverse the swarm session.
287> , swarmSessions :: !(TVar (Set SwarmSession))
288
289> , eventManager :: !EventManager
290
291> -- | Used to keep track global client progress.
292> , currentProgress :: !(TVar Progress)
293
294> -- | Used to keep track available torrents.
295> , torrentMap :: !(TVar TorrentMap)
296> }
297
298> -- currentProgress field is reduntant: progress depends on the all swarm bitfields
299> -- maybe we can remove the 'currentProgress' and compute it on demand?
300
301
302> instance Eq ClientSession where
303> (==) = (==) `on` clientPeerId
304
305> instance Ord ClientSession where
306> compare = comparing clientPeerId
307
308> -- | Get current global progress of the client. This value is usually
309> -- shown to a user.
310> getCurrentProgress :: MonadIO m => ClientSession -> m Progress
311> getCurrentProgress = liftIO . readTVarIO . currentProgress
312
313> -- | Get number of swarms client aware of.
314> getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
315> getSwarmCount ClientSession {..} = liftIO $
316> S.size <$> readTVarIO swarmSessions
317
318> -- | Get number of peers the client currently connected to.
319> getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
320> getPeerCount ClientSession {..} = liftIO $ do
321> unused <- peekAvail activeThreads
322> return (maxActive - unused)
323
324> -- | Create a new client session. The data passed to this function are
325> -- usually loaded from configuration file.
326> newClient :: SessionCount -- ^ Maximum count of active P2P Sessions.
327> -> [Extension] -- ^ Extensions allowed to use.
328> -> IO ClientSession -- ^ Client with unique peer ID.
329
330> newClient n exts = do
331> mgr <- Ev.new
332> -- TODO kill this thread when leave client
333> _ <- forkIO $ loop mgr
334
335> ClientSession
336> <$> newPeerId
337> <*> pure exts
338> <*> forkListener (error "listener")
339> <*> MSem.new n
340> <*> pure n
341> <*> newTVarIO S.empty
342> <*> pure mgr
343> <*> newTVarIO (startProgress 0)
344> <*> newTVarIO HM.empty
345
346> registerTorrent :: ClientSession -> InfoHash -> TorrentLoc -> STM ()
347> registerTorrent ClientSession {..} ih tl = do
348> modifyTVar' torrentMap $ HM.insert ih tl
349
350> unregisterTorrent :: ClientSession -> InfoHash -> STM ()
351> unregisterTorrent ClientSession {..} ih = do
352> modifyTVar' torrentMap $ HM.delete ih
353
354> {-----------------------------------------------------------------------
355> Swarm session
356> -----------------------------------------------------------------------}
357
358> {- NOTE: If client is a leecher then there is NO particular reason to
359> set max sessions count more than the_number_of_unchoke_slots * k:
360
361> * thread slot(activeThread semaphore)
362> * will take but no
363
364> So if client is a leecher then max sessions count depends on the
365> number of unchoke slots.
366
367> However if client is a seeder then the value depends on .
368> -}
369
370> -- | Used to bound the number of simultaneous connections and, which
371> -- is the same, P2P sessions within the swarm session.
372> type SessionCount = Int
373
374> defSeederConns :: SessionCount
375> defSeederConns = defaultUnchokeSlots
376
377> defLeacherConns :: SessionCount
378> defLeacherConns = defaultNumWant
379
380> -- | Swarm session is
381> data SwarmSession = SwarmSession {
382> torrentMeta :: !Torrent
383
384> -- |
385> , clientSession :: !ClientSession
386
387> -- | Represent count of peers we _currently_ can connect to in the
388> -- swarm. Used to bound number of concurrent threads.
389> , vacantPeers :: !(MSem SessionCount)
390
391> -- | Modify this carefully updating global progress.
392> , clientBitfield :: !(TVar Bitfield)
393
394> , connectedPeers :: !(TVar (Set PeerSession))
395
396> -- TODO use bounded broadcast chan with priority queue and drop old entries
397> -- | Channel used for replicate messages across all peers in
398> -- swarm. For exsample if we get some piece we should sent to all
399> -- connected (and interested in) peers HAVE message.
400> --
401> , broadcastMessages :: !(TChan Message)
402> }
403
404> -- INVARIANT:
405> -- max_sessions_count - sizeof connectedPeers = value vacantPeers
406
407> instance Eq SwarmSession where
408> (==) = (==) `on` (tInfoHash . torrentMeta)
409
410> instance Ord SwarmSession where
411> compare = comparing (tInfoHash . torrentMeta)
412
413> newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
414> -> IO SwarmSession
415> newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
416> = SwarmSession <$> pure t
417> <*> pure cs
418> <*> MSem.new n
419> <*> newTVarIO bf
420> <*> newTVarIO S.empty
421> <*> newBroadcastTChanIO
422
423> -- | New swarm session in which the client allowed to upload only.
424> newSeeder :: ClientSession -> Torrent -> IO SwarmSession
425> newSeeder cs t @ Torrent {..}
426> = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
427
428> -- | New swarm in which the client allowed both download and upload.
429> newLeecher :: ClientSession -> Torrent -> IO SwarmSession
430> newLeecher cs t @ Torrent {..} = do
431> se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
432> atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
433> return se
434
435> --isLeacher :: SwarmSession -> IO Bool
436> --isLeacher = undefined
437
438> -- | Get the number of connected peers in the given swarm.
439> getSessionCount :: SwarmSession -> IO SessionCount
440> getSessionCount SwarmSession {..} = do
441> S.size <$> readTVarIO connectedPeers
442
443> getClientBitfield :: SwarmSession -> IO Bitfield
444> getClientBitfield = readTVarIO . clientBitfield
445
446> {-
447> haveDone :: MonadIO m => PieceIx -> SwarmSession -> m ()
448> haveDone ix =
449> liftIO $ atomically $ do
450> bf <- readTVar clientBitfield
451> writeTVar (have ix bf)
452> currentProgress
453> -}
454
455> -- acquire/release mechanism: for internal use only
456
457> enterSwarm :: SwarmSession -> IO ()
458> enterSwarm SwarmSession {..} = do
459> MSem.wait (activeThreads clientSession)
460> MSem.wait vacantPeers
461
462> leaveSwarm :: SwarmSession -> IO ()
463> leaveSwarm SwarmSession {..} = do
464> MSem.signal vacantPeers
465> MSem.signal (activeThreads clientSession)
466
467> waitVacancy :: SwarmSession -> IO () -> IO ()
468> waitVacancy se =
469> bracket (enterSwarm se) (const (leaveSwarm se))
470> . const
471
472> pieceLength :: SwarmSession -> Int
473> pieceLength = ciPieceLength . tInfo . torrentMeta
474> {-# INLINE pieceLength #-}
475
476> {-----------------------------------------------------------------------
477> Peer session
478> -----------------------------------------------------------------------}
479
480> -- | Peer session contain all data necessary for peer to peer
481> -- communication.
482> data PeerSession = PeerSession {
483> -- | Used as unique 'PeerSession' identifier within one
484> -- 'SwarmSession'.
485> connectedPeerAddr :: !PeerAddr
486
487> -- | The swarm to which both end points belong to.
488> , swarmSession :: !SwarmSession
489
490> -- | Extensions such that both peer and client support.
491> , enabledExtensions :: [Extension]
492
493> -- | To dissconnect from died peers appropriately we should check
494> -- if a peer do not sent the KA message within given interval. If
495> -- yes, we should throw an exception in 'TimeoutCallback' and
496> -- close session between peers.
497> --
498> -- We should update timeout if we /receive/ any message within
499> -- timeout interval to keep connection up.
500> , incomingTimeout :: !TimeoutKey
501
502> -- | To send KA message appropriately we should know when was last
503> -- time we sent a message to a peer. To do that we keep registered
504> -- timeout in event manager and if we do not sent any message to
505> -- the peer within given interval then we send KA message in
506> -- 'TimeoutCallback'.
507> --
508> -- We should update timeout if we /send/ any message within timeout
509> -- to avoid reduntant KA messages.
510> --
511> , outcomingTimeout :: !TimeoutKey
512
513> -- TODO use dupChan for broadcasting
514> -- | Broadcast messages waiting to be sent to peer.
515> , pendingMessages :: !(TChan Message)
516
517> -- | Dymanic P2P data.
518> , sessionState :: !(IORef SessionState)
519> }
520
521> -- TODO unpack some fields
522
523> data SessionState = SessionState {
524> _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
525> , _status :: !SessionStatus -- ^ Status of both peers.
526> } deriving (Show, Eq)
527
528> $(makeLenses ''SessionState)
529
530> instance Eq PeerSession where
531> (==) = (==) `on` connectedPeerAddr
532
533> instance Ord PeerSession where
534> compare = comparing connectedPeerAddr
535
536> -- | Exceptions used to interrupt the current P2P session. This
537> -- exceptions will NOT affect other P2P sessions, DHT, peer <->
538> -- tracker, or any other session.
539> --
540> data SessionException = PeerDisconnected
541> | ProtocolError Doc
542> deriving (Show, Typeable)
543
544> instance Exception SessionException
545
546
547> -- | Do nothing with exception, used with 'handle' or 'try'.
548> isSessionException :: Monad m => SessionException -> m ()
549> isSessionException _ = return ()
550
551> -- | The same as 'isSessionException' but output to stdout the catched
552> -- exception, for debugging purposes only.
553> putSessionException :: SessionException -> IO ()
554> putSessionException = print
555
556> -- TODO modify such that we can use this in listener loop
557> -- TODO check if it connected yet peer
558> withPeerSession :: SwarmSession -> PeerAddr
559> -> ((Socket, PeerSession) -> IO ())
560> -> IO ()
561
562> withPeerSession ss @ SwarmSession {..} addr
563> = handle isSessionException . bracket openSession closeSession
564> where
565> openSession = do
566> let caps = encodeExts $ allowedExtensions $ clientSession
567> let ihash = tInfoHash torrentMeta
568> let pid = clientPeerId $ clientSession
569> let chs = Handshake defaultBTProtocol caps ihash pid
570
571> sock <- connectToPeer addr
572> phs <- handshake sock chs `onException` close sock
573
574> cbf <- readTVarIO clientBitfield
575> sendAll sock (encode (Bitfield cbf))
576
577> let enabled = decodeExts (enabledCaps caps (handshakeCaps phs))
578> ps <- PeerSession addr ss enabled
579> <$> registerTimeout (eventManager clientSession)
580> maxIncomingTime (return ())
581> <*> registerTimeout (eventManager clientSession)
582> maxOutcomingTime (sendKA sock)
583> <*> atomically (dupTChan broadcastMessages)
584> <*> do {
585> ; tc <- totalCount <$> readTVarIO clientBitfield
586> ; newIORef (SessionState (haveNone tc) def)
587> }
588
589> atomically $ modifyTVar' connectedPeers (S.insert ps)
590
591> return (sock, ps)
592
593> closeSession (sock, ps) = do
594> atomically $ modifyTVar' connectedPeers (S.delete ps)
595> close sock
596
597> findPieceCount :: PeerSession -> PieceCount
598> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
599
600> {-----------------------------------------------------------------------
601> Broadcasting: Have, Cancel, Bitfield, SuggestPiece
602> -----------------------------------------------------------------------}
603
604> -- here we should enqueue broadcast messages and keep in mind that:
605> --
606> -- * We should enqueue broadcast events as they are appear.
607> -- * We should yield broadcast messages as fast as we get them.
608> --
609> -- these 2 phases might differ in time significantly
610
611> -- TODO do this; but only when it'll be clean which other broadcast
612> -- messages & events we should send
613
614> -- 1. Update client have bitfield --\____ in one transaction;
615> -- 2. Update downloaded stats --/
616> -- 3. Signal to the all other peer about this.
617
618> available :: Bitfield -> SwarmSession -> IO ()
619> available bf se @ SwarmSession {..} = {-# SCC available #-} do
620> mark >> atomically broadcast
621> where
622> mark = do
623> let bytes = pieceLength se * BF.haveCount bf
624> atomically $ do
625> modifyTVar' clientBitfield (BF.union bf)
626> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
627
628> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
629
630
631> -- TODO compute size of messages: if it's faster to send Bitfield
632> -- instead many Have do that
633> --
634> -- also if there is single Have message in queue then the
635> -- corresponding piece is likely still in memory or disc cache,
636> -- when we can send SuggestPiece
637
638> -- | Get pending messages queue appeared in result of asynchronously
639> -- changed client state. Resulting queue should be sent to a peer
640> -- immediately.
641> getPending :: PeerSession -> IO [Message]
642> getPending PeerSession {..} = {-# SCC getPending #-} do
643> atomically (readAvail pendingMessages)
644
645> readAvail :: TChan a -> STM [a]
646> readAvail chan = do
647> m <- tryReadTChan chan
648> case m of
649> Just a -> (:) <$> pure a <*> readAvail chan
650> Nothing -> return []
651
652> {-----------------------------------------------------------------------
653> Timeouts
654> -----------------------------------------------------------------------}
655
656> -- for internal use only
657
658> sec :: Int
659> sec = 1000 * 1000
660
661> maxIncomingTime :: Int
662> maxIncomingTime = 120 * sec
663
664> maxOutcomingTime :: Int
665> maxOutcomingTime = 1 * sec
666
667> -- | Should be called after we have received any message from a peer.
668> updateIncoming :: PeerSession -> IO ()
669> updateIncoming PeerSession {..} = do
670> updateTimeout (eventManager (clientSession swarmSession))
671> incomingTimeout maxIncomingTime
672
673> -- | Should be called before we have send any message to a peer.
674> updateOutcoming :: PeerSession -> IO ()
675> updateOutcoming PeerSession {..} =
676> updateTimeout (eventManager (clientSession swarmSession))
677> outcomingTimeout maxOutcomingTime
678
679> sendKA :: Socket -> IO ()
680> sendKA sock {- SwarmSession {..} -} = do
681> return ()
682> -- print "I'm sending keep alive."
683> -- sendAll sock (encode BT.KeepAlive)
684> -- let mgr = eventManager clientSession
685> -- updateTimeout mgr
686> -- print "Done.."