summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Internal.lhs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Internal.lhs')
-rw-r--r--src/Network/BitTorrent/Internal.lhs686
1 files changed, 686 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
new file mode 100644
index 00000000..3f8f0371
--- /dev/null
+++ b/src/Network/BitTorrent/Internal.lhs
@@ -0,0 +1,686 @@
1> -- |
2> -- Copyright : (c) Sam T. 2013
3> -- License : MIT
4> -- Maintainer : pxqr.sta@gmail.com
5> -- Stability : experimental
6> -- Portability : portable
7> --
8> -- This module implement opaque broadcast message passing. It
9> -- provides sessions needed by Network.BitTorrent and
10> -- Network.BitTorrent.Exchange and modules. To hide some internals
11> -- of this module we detach it from Exchange.
12> --
13> --
14> -- NOTE: expose only static data in data field lists, all dynamic
15> -- data should be modified through standalone functions.
16> --
17> {-# LANGUAGE OverloadedStrings #-}
18> {-# LANGUAGE RecordWildCards #-}
19> {-# LANGUAGE ViewPatterns #-}
20> {-# LANGUAGE TemplateHaskell #-}
21> {-# LANGUAGE DeriveDataTypeable #-}
22> module Network.BitTorrent.Internal
23> ( Progress(..), startProgress
24> -- * Client
25> , ClientSession (clientPeerId, allowedExtensions, listenerPort)
26>
27> , ThreadCount
28> , defaultThreadCount
29>
30> , TorrentLoc(..)
31> , registerTorrent
32> , unregisterTorrent
33> , newClient
34
35> , getCurrentProgress
36> , getSwarmCount
37> , getPeerCount
38
39
40
41> -- * Swarm
42> , SwarmSession( SwarmSession, torrentMeta, clientSession )
43
44> , SessionCount
45> , getSessionCount
46
47> , newLeecher
48> , newSeeder
49> , getClientBitfield
50
51> , enterSwarm
52> , leaveSwarm
53> , waitVacancy
54
55> , pieceLength
56
57> -- * Peer
58> , PeerSession( PeerSession, connectedPeerAddr
59> , swarmSession, enabledExtensions
60> , sessionState
61> )
62> , SessionState
63> , withPeerSession
64
65> -- ** Broadcasting
66> , available
67> , getPending
68
69> -- ** Exceptions
70> , SessionException(..)
71> , isSessionException
72> , putSessionException
73
74> -- ** Properties
75> , bitfield, status
76> , findPieceCount
77
78> -- * Timeouts
79> , updateIncoming, updateOutcoming
80> ) where
81
82> import Prelude hiding (mapM_)
83
84> import Control.Applicative
85> import Control.Concurrent
86> import Control.Concurrent.STM
87> import Control.Concurrent.MSem as MSem
88> import Control.Lens
89> import Control.Exception
90> import Control.Monad.Trans
91
92> import Data.IORef
93> import Data.Default
94> import Data.Function
95> import Data.Foldable (mapM_)
96> import Data.HashMap.Strict as HM
97> import Data.Ord
98> import Data.Set as S
99> import Data.Typeable
100
101> import Data.Serialize hiding (get)
102> import Text.PrettyPrint
103
104> import Network
105> import Network.Socket
106> import Network.Socket.ByteString
107
108> import GHC.Event as Ev
109
110> import Data.Bitfield as BF
111> import Data.Torrent
112> import Network.BitTorrent.Extension
113> import Network.BitTorrent.Peer
114> import Network.BitTorrent.Exchange.Protocol as BT
115> import Network.BitTorrent.Tracker.Protocol as BT
116
117> {-----------------------------------------------------------------------
118> Progress
119> -----------------------------------------------------------------------}
120
121> -- | 'Progress' contains upload/download/left stats about
122> -- current client state and used to notify the tracker.
123> --
124> -- This data is considered as dynamic within one client
125> -- session. This data also should be shared across client
126> -- application sessions (e.g. files), otherwise use 'startProgress'
127> -- to get initial 'Progress'.
128> --
129> data Progress = Progress {
130> _uploaded :: !Integer -- ^ Total amount of bytes uploaded.
131> , _downloaded :: !Integer -- ^ Total amount of bytes downloaded.
132> , _left :: !Integer -- ^ Total amount of bytes left.
133> } deriving (Show, Read, Eq)
134
135> -- TODO use atomic bits and Word64
136
137> $(makeLenses ''Progress)
138
139> -- | Initial progress is used when there are no session before.
140> --
141> -- Please note that tracker might penalize client some way if the do
142> -- not accumulate progress. If possible and save 'Progress' between
143> -- client sessions to avoid that.
144> --
145> startProgress :: Integer -> Progress
146> startProgress = Progress 0 0
147
148> -- | Used when the client download some data from /any/ peer.
149> downloadedProgress :: Int -> Progress -> Progress
150> downloadedProgress (fromIntegral -> amount)
151> = (left -~ amount)
152> . (downloaded +~ amount)
153> {-# INLINE downloadedProgress #-}
154
155> -- | Used when the client upload some data to /any/ peer.
156> uploadedProgress :: Int -> Progress -> Progress
157> uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
158> {-# INLINE uploadedProgress #-}
159
160> -- | Used when leecher join client session.
161> enqueuedProgress :: Integer -> Progress -> Progress
162> enqueuedProgress amount = left +~ amount
163> {-# INLINE enqueuedProgress #-}
164
165> -- | Used when leecher leave client session.
166> -- (e.g. user deletes not completed torrent)
167> dequeuedProgress :: Integer -> Progress -> Progress
168> dequeuedProgress amount = left -~ amount
169> {-# INLINE dequeuedProgress #-}
170
171
172Thread layout
173-------------
174
175When client session created 2 new threads appear:
176
177 * DHT listener - replies to DHT requests;
178
179 * Peer listener - accept new P2P connection initiated by other
180peers.
181
182When swarn session created 3 new threads appear:
183
184 * DHT request loop asks for new peers;
185
186 * Tracker request loop asks for new peers;
187
188 * controller which fork new avaand manage running P2P sessions.
189
190Peer session is one always forked thread.
191
192When client\/swarm\/peer session gets closed kill the corresponding
193threads, but flush data to disc. (for e.g. storage block map)
194
195So for e.g., in order to obtain our first block we need to run at
196least 7 threads: main thread, 2 client session thread, 3 swarm session
197threads and PeerSession thread.
198
199> {-----------------------------------------------------------------------
200> Client session
201> -----------------------------------------------------------------------}
202
203> {- NOTE: If we will not restrict number of threads we could end up
204> with thousands of connected swarm and make no particular progress.
205>
206> Note also we do not bound number of swarms! This is not optimal
207> strategy because each swarm might have say 1 thread and we could end
208> up bounded by the meaningless limit. Bounding global number of p2p
209> sessions should work better, and simpler.-}
210
211> -- | Each client might have a limited number of threads.
212> type ThreadCount = Int
213
214> -- | The number of threads suitable for a typical BT client.
215> defaultThreadCount :: ThreadCount
216> defaultThreadCount = 1000
217
218> {- PERFORMANCE NOTE: keeping torrent metafiles in memory is a _bad_
219> idea: for 1TB of data we need at least 100MB of metadata. (using 256KB
220> piece size). This solution do not scale further. Solution with
221> TorrentLoc is much better and takes much more less space, moreover it
222> depends on count of torrents but not on count of data itself. To scale
223> further, in future we might add something like database (for
224> e.g. sqlite) for this kind of things.-}
225
226> -- | Identifies location of
227> data TorrentLoc = TorrentLoc {
228> metafilePath :: FilePath
229> , dataPath :: FilePath
230> }
231
232> validateTorrent :: TorrentLoc -> IO ()
233> validateTorrent = error "validateTorrent: not implemented"
234
235> -- | TorrentMap is used to keep track all known torrents for the
236> -- client. When some peer trying to connect to us it's necessary to
237> -- dispatch appropriate 'SwarmSession' (or start new one if there are
238> -- none) in the listener loop: we only know 'InfoHash' from
239> -- 'Handshake' but nothing more. So to accept new 'PeerSession' we
240> -- need to lookup torrent metainfo and content files (if there are
241> -- some) by the 'InfoHash' and only after that enter exchange loop.
242> --
243> type TorrentMap = HashMap InfoHash TorrentLoc
244
245> {- NOTE: basically, client session should contain options which user
246> app store in configuration files. (related to the protocol) Moreover
247> it should contain the all client identification info. (e.g. DHT) -}
248
249> -- | Client session is the basic unit of bittorrent network, it has:
250> --
251> -- * The /peer ID/ used as unique identifier of the client in
252> -- network. Obviously, this value is not changed during client
253> -- session.
254> --
255> -- * The number of /protocol extensions/ it might use. This value
256> -- is static as well, but if you want to dynamically reconfigure
257> -- the client you might kill the end the current session and
258> -- create a new with the fresh required extensions.
259> --
260> -- * The number of /swarms/ to join, each swarm described by the
261> -- 'SwarmSession'.
262> --
263> -- Normally, you would have one client session, however, if we need,
264> -- in one application we could have many clients with different peer
265> -- ID's and different enabled extensions at the same time.
266> --
267> data ClientSession = ClientSession {
268> -- | Used in handshakes and discovery mechanism.
269> clientPeerId :: !PeerId
270
271> -- | Extensions we should try to use. Hovewer some particular peer
272> -- might not support some extension, so we keep enabledExtension in
273> -- 'PeerSession'.
274> , allowedExtensions :: [Extension]
275
276> -- | Port where client listen for other peers
277> , listenerPort :: PortNumber
278> -- TODO restart listener if it fail
279
280> -- | Semaphor used to bound number of active P2P sessions.
281> , activeThreads :: !(MSem ThreadCount)
282
283> -- | Max number of active connections.
284> , maxActive :: !ThreadCount
285
286> -- | Used to traverse the swarm session.
287> , swarmSessions :: !(TVar (Set SwarmSession))
288
289> , eventManager :: !EventManager
290
291> -- | Used to keep track global client progress.
292> , currentProgress :: !(TVar Progress)
293
294> -- | Used to keep track available torrents.
295> , torrentMap :: !(TVar TorrentMap)
296> }
297
298> -- currentProgress field is reduntant: progress depends on the all swarm bitfields
299> -- maybe we can remove the 'currentProgress' and compute it on demand?
300
301
302> instance Eq ClientSession where
303> (==) = (==) `on` clientPeerId
304
305> instance Ord ClientSession where
306> compare = comparing clientPeerId
307
308> -- | Get current global progress of the client. This value is usually
309> -- shown to a user.
310> getCurrentProgress :: MonadIO m => ClientSession -> m Progress
311> getCurrentProgress = liftIO . readTVarIO . currentProgress
312
313> -- | Get number of swarms client aware of.
314> getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
315> getSwarmCount ClientSession {..} = liftIO $
316> S.size <$> readTVarIO swarmSessions
317
318> -- | Get number of peers the client currently connected to.
319> getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
320> getPeerCount ClientSession {..} = liftIO $ do
321> unused <- peekAvail activeThreads
322> return (maxActive - unused)
323
324> -- | Create a new client session. The data passed to this function are
325> -- usually loaded from configuration file.
326> newClient :: SessionCount -- ^ Maximum count of active P2P Sessions.
327> -> [Extension] -- ^ Extensions allowed to use.
328> -> IO ClientSession -- ^ Client with unique peer ID.
329
330> newClient n exts = do
331> mgr <- Ev.new
332> -- TODO kill this thread when leave client
333> _ <- forkIO $ loop mgr
334
335> ClientSession
336> <$> newPeerId
337> <*> pure exts
338> <*> forkListener (error "listener")
339> <*> MSem.new n
340> <*> pure n
341> <*> newTVarIO S.empty
342> <*> pure mgr
343> <*> newTVarIO (startProgress 0)
344> <*> newTVarIO HM.empty
345
346> registerTorrent :: ClientSession -> InfoHash -> TorrentLoc -> STM ()
347> registerTorrent ClientSession {..} ih tl = do
348> modifyTVar' torrentMap $ HM.insert ih tl
349
350> unregisterTorrent :: ClientSession -> InfoHash -> STM ()
351> unregisterTorrent ClientSession {..} ih = do
352> modifyTVar' torrentMap $ HM.delete ih
353
354> {-----------------------------------------------------------------------
355> Swarm session
356> -----------------------------------------------------------------------}
357
358> {- NOTE: If client is a leecher then there is NO particular reason to
359> set max sessions count more than the_number_of_unchoke_slots * k:
360
361> * thread slot(activeThread semaphore)
362> * will take but no
363
364> So if client is a leecher then max sessions count depends on the
365> number of unchoke slots.
366
367> However if client is a seeder then the value depends on .
368> -}
369
370> -- | Used to bound the number of simultaneous connections and, which
371> -- is the same, P2P sessions within the swarm session.
372> type SessionCount = Int
373
374> defSeederConns :: SessionCount
375> defSeederConns = defaultUnchokeSlots
376
377> defLeacherConns :: SessionCount
378> defLeacherConns = defaultNumWant
379
380> -- | Swarm session is
381> data SwarmSession = SwarmSession {
382> torrentMeta :: !Torrent
383
384> -- |
385> , clientSession :: !ClientSession
386
387> -- | Represent count of peers we _currently_ can connect to in the
388> -- swarm. Used to bound number of concurrent threads.
389> , vacantPeers :: !(MSem SessionCount)
390
391> -- | Modify this carefully updating global progress.
392> , clientBitfield :: !(TVar Bitfield)
393
394> , connectedPeers :: !(TVar (Set PeerSession))
395
396> -- TODO use bounded broadcast chan with priority queue and drop old entries
397> -- | Channel used for replicate messages across all peers in
398> -- swarm. For exsample if we get some piece we should sent to all
399> -- connected (and interested in) peers HAVE message.
400> --
401> , broadcastMessages :: !(TChan Message)
402> }
403
404> -- INVARIANT:
405> -- max_sessions_count - sizeof connectedPeers = value vacantPeers
406
407> instance Eq SwarmSession where
408> (==) = (==) `on` (tInfoHash . torrentMeta)
409
410> instance Ord SwarmSession where
411> compare = comparing (tInfoHash . torrentMeta)
412
413> newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
414> -> IO SwarmSession
415> newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
416> = SwarmSession <$> pure t
417> <*> pure cs
418> <*> MSem.new n
419> <*> newTVarIO bf
420> <*> newTVarIO S.empty
421> <*> newBroadcastTChanIO
422
423> -- | New swarm session in which the client allowed to upload only.
424> newSeeder :: ClientSession -> Torrent -> IO SwarmSession
425> newSeeder cs t @ Torrent {..}
426> = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
427
428> -- | New swarm in which the client allowed both download and upload.
429> newLeecher :: ClientSession -> Torrent -> IO SwarmSession
430> newLeecher cs t @ Torrent {..} = do
431> se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
432> atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
433> return se
434
435> --isLeacher :: SwarmSession -> IO Bool
436> --isLeacher = undefined
437
438> -- | Get the number of connected peers in the given swarm.
439> getSessionCount :: SwarmSession -> IO SessionCount
440> getSessionCount SwarmSession {..} = do
441> S.size <$> readTVarIO connectedPeers
442
443> getClientBitfield :: SwarmSession -> IO Bitfield
444> getClientBitfield = readTVarIO . clientBitfield
445
446> {-
447> haveDone :: MonadIO m => PieceIx -> SwarmSession -> m ()
448> haveDone ix =
449> liftIO $ atomically $ do
450> bf <- readTVar clientBitfield
451> writeTVar (have ix bf)
452> currentProgress
453> -}
454
455> -- acquire/release mechanism: for internal use only
456
457> enterSwarm :: SwarmSession -> IO ()
458> enterSwarm SwarmSession {..} = do
459> MSem.wait (activeThreads clientSession)
460> MSem.wait vacantPeers
461
462> leaveSwarm :: SwarmSession -> IO ()
463> leaveSwarm SwarmSession {..} = do
464> MSem.signal vacantPeers
465> MSem.signal (activeThreads clientSession)
466
467> waitVacancy :: SwarmSession -> IO () -> IO ()
468> waitVacancy se =
469> bracket (enterSwarm se) (const (leaveSwarm se))
470> . const
471
472> pieceLength :: SwarmSession -> Int
473> pieceLength = ciPieceLength . tInfo . torrentMeta
474> {-# INLINE pieceLength #-}
475
476> {-----------------------------------------------------------------------
477> Peer session
478> -----------------------------------------------------------------------}
479
480> -- | Peer session contain all data necessary for peer to peer
481> -- communication.
482> data PeerSession = PeerSession {
483> -- | Used as unique 'PeerSession' identifier within one
484> -- 'SwarmSession'.
485> connectedPeerAddr :: !PeerAddr
486
487> -- | The swarm to which both end points belong to.
488> , swarmSession :: !SwarmSession
489
490> -- | Extensions such that both peer and client support.
491> , enabledExtensions :: [Extension]
492
493> -- | To dissconnect from died peers appropriately we should check
494> -- if a peer do not sent the KA message within given interval. If
495> -- yes, we should throw an exception in 'TimeoutCallback' and
496> -- close session between peers.
497> --
498> -- We should update timeout if we /receive/ any message within
499> -- timeout interval to keep connection up.
500> , incomingTimeout :: !TimeoutKey
501
502> -- | To send KA message appropriately we should know when was last
503> -- time we sent a message to a peer. To do that we keep registered
504> -- timeout in event manager and if we do not sent any message to
505> -- the peer within given interval then we send KA message in
506> -- 'TimeoutCallback'.
507> --
508> -- We should update timeout if we /send/ any message within timeout
509> -- to avoid reduntant KA messages.
510> --
511> , outcomingTimeout :: !TimeoutKey
512
513> -- TODO use dupChan for broadcasting
514> -- | Broadcast messages waiting to be sent to peer.
515> , pendingMessages :: !(TChan Message)
516
517> -- | Dymanic P2P data.
518> , sessionState :: !(IORef SessionState)
519> }
520
521> -- TODO unpack some fields
522
523> data SessionState = SessionState {
524> _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
525> , _status :: !SessionStatus -- ^ Status of both peers.
526> } deriving (Show, Eq)
527
528> $(makeLenses ''SessionState)
529
530> instance Eq PeerSession where
531> (==) = (==) `on` connectedPeerAddr
532
533> instance Ord PeerSession where
534> compare = comparing connectedPeerAddr
535
536> -- | Exceptions used to interrupt the current P2P session. This
537> -- exceptions will NOT affect other P2P sessions, DHT, peer <->
538> -- tracker, or any other session.
539> --
540> data SessionException = PeerDisconnected
541> | ProtocolError Doc
542> deriving (Show, Typeable)
543
544> instance Exception SessionException
545
546
547> -- | Do nothing with exception, used with 'handle' or 'try'.
548> isSessionException :: Monad m => SessionException -> m ()
549> isSessionException _ = return ()
550
551> -- | The same as 'isSessionException' but output to stdout the catched
552> -- exception, for debugging purposes only.
553> putSessionException :: SessionException -> IO ()
554> putSessionException = print
555
556> -- TODO modify such that we can use this in listener loop
557> -- TODO check if it connected yet peer
558> withPeerSession :: SwarmSession -> PeerAddr
559> -> ((Socket, PeerSession) -> IO ())
560> -> IO ()
561
562> withPeerSession ss @ SwarmSession {..} addr
563> = handle isSessionException . bracket openSession closeSession
564> where
565> openSession = do
566> let caps = encodeExts $ allowedExtensions $ clientSession
567> let ihash = tInfoHash torrentMeta
568> let pid = clientPeerId $ clientSession
569> let chs = Handshake defaultBTProtocol caps ihash pid
570
571> sock <- connectToPeer addr
572> phs <- handshake sock chs `onException` close sock
573
574> cbf <- readTVarIO clientBitfield
575> sendAll sock (encode (Bitfield cbf))
576
577> let enabled = decodeExts (enabledCaps caps (handshakeCaps phs))
578> ps <- PeerSession addr ss enabled
579> <$> registerTimeout (eventManager clientSession)
580> maxIncomingTime (return ())
581> <*> registerTimeout (eventManager clientSession)
582> maxOutcomingTime (sendKA sock)
583> <*> atomically (dupTChan broadcastMessages)
584> <*> do {
585> ; tc <- totalCount <$> readTVarIO clientBitfield
586> ; newIORef (SessionState (haveNone tc) def)
587> }
588
589> atomically $ modifyTVar' connectedPeers (S.insert ps)
590
591> return (sock, ps)
592
593> closeSession (sock, ps) = do
594> atomically $ modifyTVar' connectedPeers (S.delete ps)
595> close sock
596
597> findPieceCount :: PeerSession -> PieceCount
598> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
599
600> {-----------------------------------------------------------------------
601> Broadcasting: Have, Cancel, Bitfield, SuggestPiece
602> -----------------------------------------------------------------------}
603
604> -- here we should enqueue broadcast messages and keep in mind that:
605> --
606> -- * We should enqueue broadcast events as they are appear.
607> -- * We should yield broadcast messages as fast as we get them.
608> --
609> -- these 2 phases might differ in time significantly
610
611> -- TODO do this; but only when it'll be clean which other broadcast
612> -- messages & events we should send
613
614> -- 1. Update client have bitfield --\____ in one transaction;
615> -- 2. Update downloaded stats --/
616> -- 3. Signal to the all other peer about this.
617
618> available :: Bitfield -> SwarmSession -> IO ()
619> available bf se @ SwarmSession {..} = {-# SCC available #-} do
620> mark >> atomically broadcast
621> where
622> mark = do
623> let bytes = pieceLength se * BF.haveCount bf
624> atomically $ do
625> modifyTVar' clientBitfield (BF.union bf)
626> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
627
628> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
629
630
631> -- TODO compute size of messages: if it's faster to send Bitfield
632> -- instead many Have do that
633> --
634> -- also if there is single Have message in queue then the
635> -- corresponding piece is likely still in memory or disc cache,
636> -- when we can send SuggestPiece
637
638> -- | Get pending messages queue appeared in result of asynchronously
639> -- changed client state. Resulting queue should be sent to a peer
640> -- immediately.
641> getPending :: PeerSession -> IO [Message]
642> getPending PeerSession {..} = {-# SCC getPending #-} do
643> atomically (readAvail pendingMessages)
644
645> readAvail :: TChan a -> STM [a]
646> readAvail chan = do
647> m <- tryReadTChan chan
648> case m of
649> Just a -> (:) <$> pure a <*> readAvail chan
650> Nothing -> return []
651
652> {-----------------------------------------------------------------------
653> Timeouts
654> -----------------------------------------------------------------------}
655
656> -- for internal use only
657
658> sec :: Int
659> sec = 1000 * 1000
660
661> maxIncomingTime :: Int
662> maxIncomingTime = 120 * sec
663
664> maxOutcomingTime :: Int
665> maxOutcomingTime = 1 * sec
666
667> -- | Should be called after we have received any message from a peer.
668> updateIncoming :: PeerSession -> IO ()
669> updateIncoming PeerSession {..} = do
670> updateTimeout (eventManager (clientSession swarmSession))
671> incomingTimeout maxIncomingTime
672
673> -- | Should be called before we have send any message to a peer.
674> updateOutcoming :: PeerSession -> IO ()
675> updateOutcoming PeerSession {..} =
676> updateTimeout (eventManager (clientSession swarmSession))
677> outcomingTimeout maxOutcomingTime
678
679> sendKA :: Socket -> IO ()
680> sendKA sock {- SwarmSession {..} -} = do
681> return ()
682> -- print "I'm sending keep alive."
683> -- sendAll sock (encode BT.KeepAlive)
684> -- let mgr = eventManager clientSession
685> -- updateTimeout mgr
686> -- print "Done.."