summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Internal.lhs
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-07-14 00:50:42 +0400
committerSam T <pxqr.sta@gmail.com>2013-07-14 00:50:42 +0400
commit7ddf7cd76d6f545c4dfbb5c6741024c097375bf1 (patch)
treec7e781b6dc1cdbe133191256fc0453ce65d139bf /src/Network/BitTorrent/Internal.lhs
parent83b9af0674f2d4713be1d9540abe7ce09ed33257 (diff)
~ Unliterate sessions module.
Diffstat (limited to 'src/Network/BitTorrent/Internal.lhs')
-rw-r--r--src/Network/BitTorrent/Internal.lhs579
1 files changed, 0 insertions, 579 deletions
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
deleted file mode 100644
index ef4165e3..00000000
--- a/src/Network/BitTorrent/Internal.lhs
+++ /dev/null
@@ -1,579 +0,0 @@
1> -- |
2> -- Copyright : (c) Sam T. 2013
3> -- License : MIT
4> -- Maintainer : pxqr.sta@gmail.com
5> -- Stability : experimental
6> -- Portability : portable
7> --
8> -- This module implement opaque broadcast message passing. It
9> -- provides sessions needed by Network.BitTorrent and
10> -- Network.BitTorrent.Exchange and modules. To hide some internals
11> -- of this module we detach it from Exchange.
12> -- NOTE: Expose only static data in data field lists, all dynamic
13> -- data should be modified through standalone functions.
14> --
15>
16> {-# LANGUAGE OverloadedStrings #-}
17> {-# LANGUAGE RecordWildCards #-}
18> {-# LANGUAGE ViewPatterns #-}
19>
20> module Network.BitTorrent.Internal
21> ( -- * Progress
22> Progress(..), startProgress
23>
24> , ClientService(..)
25> , startService
26> , withRunning
27>
28> -- * Client
29> , ClientSession ( ClientSession
30> , clientPeerId, allowedExtensions
31> , nodeListener, peerListener
32> )
33> , withClientSession
34> , listenerPort, dhtPort
35>
36> , ThreadCount
37> , defaultThreadCount
38>
39> , TorrentLoc(..)
40> , registerTorrent
41> , unregisterTorrent
42>
43> , getCurrentProgress
44> , getSwarmCount
45> , getPeerCount
46>
47> -- * Swarm
48> , SwarmSession( SwarmSession, torrentMeta, clientSession )
49>
50> , SessionCount
51> , getSessionCount
52>
53> , newLeecher
54> , newSeeder
55> , getClientBitfield
56>
57> , waitVacancy
58> , forkThrottle
59>
60> -- * Peer
61> , PeerSession( PeerSession, connectedPeerAddr
62> , swarmSession, enabledExtensions
63> , sessionState
64> )
65> , SessionState
66> , initiatePeerSession
67> , acceptPeerSession
68> , listener
69>
70> -- ** Broadcasting
71> , available
72> , getPending
73>
74> -- ** Exceptions
75> , SessionException(..)
76> , isSessionException
77> , putSessionException
78>
79> -- ** Properties
80> , bitfield, status
81> , findPieceCount
82>
83> -- * Timeouts
84> , updateIncoming, updateOutcoming
85> ) where
86
87> import Prelude hiding (mapM_)
88
89> import Control.Applicative
90> import Control.Concurrent
91> import Control.Concurrent.STM
92> import Control.Concurrent.MSem as MSem
93> import Control.Lens
94> import Control.Monad (when, forever, (>=>))
95> import Control.Exception
96> import Control.Monad.Trans
97
98> import Data.IORef
99> import Data.Foldable (mapM_)
100> import Data.Map as M
101> import Data.HashMap.Strict as HM
102> import Data.Set as S
103
104> import Data.Serialize hiding (get)
105
106> import Network hiding (accept)
107> import Network.Socket
108> import Network.Socket.ByteString
109
110> import GHC.Event as Ev
111
112> import Data.Bitfield as BF
113> import Data.Torrent
114> import Network.BitTorrent.Extension
115> import Network.BitTorrent.Peer
116> import Network.BitTorrent.Exchange.Protocol as BT
117> import Network.BitTorrent.Tracker.Protocol as BT
118> import System.Torrent.Storage
119> import Network.BitTorrent.Sessions.Types
120
121
122> -- | Initial progress is used when there are no session before.
123> startProgress :: Integer -> Progress
124> startProgress = Progress 0 0
125
126> -- | Used when the client download some data from /any/ peer.
127> downloadedProgress :: Int -> Progress -> Progress
128> downloadedProgress (fromIntegral -> amount)
129> = (left -~ amount)
130> . (downloaded +~ amount)
131> {-# INLINE downloadedProgress #-}
132
133> -- | Used when the client upload some data to /any/ peer.
134> uploadedProgress :: Int -> Progress -> Progress
135> uploadedProgress (fromIntegral -> amount) = uploaded +~ amount
136> {-# INLINE uploadedProgress #-}
137
138> -- | Used when leecher join client session.
139> enqueuedProgress :: Integer -> Progress -> Progress
140> enqueuedProgress amount = left +~ amount
141> {-# INLINE enqueuedProgress #-}
142
143> -- | Used when leecher leave client session.
144> -- (e.g. user deletes not completed torrent)
145> dequeuedProgress :: Integer -> Progress -> Progress
146> dequeuedProgress amount = left -~ amount
147> {-# INLINE dequeuedProgress #-}
148
149> startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
150> startService s port m = do
151> stopService s
152> putMVar s =<< spawn
153> where
154> spawn = ClientService port <$> forkIO (m port)
155
156> stopService :: MVar ClientService -> IO ()
157> stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread)
158
159Service A might depend on service B.
160
161> withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
162> withRunning dep failure action = tryTakeMVar dep >>= maybe failure action
163
164Torrent presence
165------------------------------------------------------------------------
166
167> data TorrentPresence = Active SwarmSession
168> | Registered TorrentLoc
169> | Unknown
170
171> torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence
172> torrentPresence ClientSession {..} ih = do
173> sws <- readTVarIO swarmSessions
174> case M.lookup ih sws of
175> Just ss -> return $ Active ss
176> Nothing -> do
177> tm <- readTVarIO torrentMap
178> return $ maybe Unknown Registered $ HM.lookup ih tm
179
180Retrieving client info
181------------------------------------------------------------------------
182
183> -- | Get current global progress of the client. This value is usually
184> -- shown to a user.
185> getCurrentProgress :: MonadIO m => ClientSession -> m Progress
186> getCurrentProgress = liftIO . readTVarIO . currentProgress
187
188> -- | Get number of swarms client aware of.
189> getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
190> getSwarmCount ClientSession {..} = liftIO $
191> M.size <$> readTVarIO swarmSessions
192
193> -- | Get number of peers the client currently connected to.
194> getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
195> getPeerCount ClientSession {..} = liftIO $ do
196> unused <- peekAvail activeThreads
197> return (maxActive - unused)
198
199> -- | Create a new client session. The data passed to this function are
200> -- usually loaded from configuration file.
201> openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions.
202> -> [Extension] -- ^ Extensions allowed to use.
203> -> IO ClientSession -- ^ Client with unique peer ID.
204
205> openClientSession n exts = do
206> mgr <- Ev.new
207> -- TODO kill this thread when leave client
208> _ <- forkIO $ loop mgr
209>
210> ClientSession
211> <$> genPeerId
212> <*> pure exts
213> <*> newEmptyMVar
214> <*> newEmptyMVar
215> <*> MSem.new n
216> <*> pure n
217> <*> newTVarIO M.empty
218> <*> pure mgr
219> <*> newTVarIO (startProgress 0)
220> <*> newTVarIO HM.empty
221
222> closeClientSession :: ClientSession -> IO ()
223> closeClientSession ClientSession {..} =
224> stopService nodeListener `finally` stopService peerListener
225
226> withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO ()
227> withClientSession c es = bracket (openClientSession c es) closeClientSession
228
229> listenerPort :: ClientSession -> IO PortNumber
230> listenerPort ClientSession {..} = servPort <$> readMVar peerListener
231
232> dhtPort :: ClientSession -> IO PortNumber
233> dhtPort ClientSession {..} = servPort <$> readMVar nodeListener
234
235
236> defSeederConns :: SessionCount
237> defSeederConns = defaultUnchokeSlots
238
239> defLeacherConns :: SessionCount
240> defLeacherConns = defaultNumWant
241
242> newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
243> -> IO SwarmSession
244> newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
245> = SwarmSession t cs
246> <$> MSem.new n
247> <*> newTVarIO bf
248> <*> undefined
249> <*> newTVarIO S.empty
250> <*> newBroadcastTChanIO
251
252-- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession
253-- > openSwarmSession ClientSession {..} ih = do
254-- > loc <- HM.lookup <$> readTVarIO torrentMap
255-- > torrent <- validateLocation loc
256-- > return undefined
257
258> closeSwarmSession :: SwarmSession -> IO ()
259> closeSwarmSession se @ SwarmSession {..} = do
260> unregisterSwarmSession se
261> -- TODO stop discovery
262> -- TODO killall peer sessions
263> -- TODO the order is important!
264> closeStorage storage
265
266
267
268> unregisterSwarmSession :: SwarmSession -> IO ()
269> unregisterSwarmSession SwarmSession {..} =
270> atomically $ modifyTVar (swarmSessions clientSession) $
271> M.delete $ tInfoHash torrentMeta
272
273getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
274getSwarm cs @ ClientSession {..} ih = do
275 ss <- readTVarIO $ swarmSessions
276 case HM.lookup ih ss of
277 Just sw -> return sw
278 Nothing -> openSwarm cs
279
280> newSeeder :: ClientSession -> Torrent -> IO SwarmSession
281> newSeeder cs t @ Torrent {..}
282> = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
283
284> -- | New swarm in which the client allowed both download and upload.
285> newLeecher :: ClientSession -> Torrent -> IO SwarmSession
286> newLeecher cs t @ Torrent {..} = do
287> se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
288> atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
289> return se
290
291> --isLeacher :: SwarmSession -> IO Bool
292> --isLeacher = undefined
293
294> -- | Get the number of connected peers in the given swarm.
295> getSessionCount :: SwarmSession -> IO SessionCount
296> getSessionCount SwarmSession {..} = do
297> S.size <$> readTVarIO connectedPeers
298
299> getClientBitfield :: SwarmSession -> IO Bitfield
300> getClientBitfield = readTVarIO . clientBitfield
301
302> swarmHandshake :: SwarmSession -> Handshake
303> swarmHandshake SwarmSession {..} = Handshake {
304> hsProtocol = defaultBTProtocol
305> , hsReserved = encodeExts $ allowedExtensions $ clientSession
306> , hsInfoHash = tInfoHash torrentMeta
307> , hsPeerId = clientPeerId $ clientSession
308> }
309
310> {-
311> haveDone :: MonadIO m => PieceIx -> SwarmSession -> m ()
312> haveDone ix =
313> liftIO $ atomically $ do
314> bf <- readTVar clientBitfield
315> writeTVar (have ix bf)
316> currentProgress
317> -}
318
319Peer sessions throttling
320------------------------------------------------------------------------
321
322> -- | The number of threads suitable for a typical BT client.
323> defaultThreadCount :: ThreadCount
324> defaultThreadCount = 1000
325
326> enterSwarm :: SwarmSession -> IO ()
327> enterSwarm SwarmSession {..} = do
328> MSem.wait (activeThreads clientSession)
329> MSem.wait vacantPeers
330
331> leaveSwarm :: SwarmSession -> IO ()
332> leaveSwarm SwarmSession {..} = do
333> MSem.signal vacantPeers
334> MSem.signal (activeThreads clientSession)
335
336> waitVacancy :: SwarmSession -> IO () -> IO ()
337> waitVacancy se =
338> bracket (enterSwarm se) (const (leaveSwarm se))
339> . const
340
341> forkThrottle :: SwarmSession -> IO () -> IO ThreadId
342> forkThrottle se action = do
343> enterSwarm se
344> (forkIO $ do
345> action `finally` leaveSwarm se)
346> `onException` leaveSwarm se
347
348
349> findPieceCount :: PeerSession -> PieceCount
350> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
351
352TODO: check content files location;
353
354> validateLocation :: TorrentLoc -> IO Torrent
355> validateLocation = fromFile . metafilePath
356
357> registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO ()
358> registerTorrent = error "registerTorrent"
359> {-
360> Torrent {..} <- validateTorrent tl
361> atomically $ modifyTVar' torrentMap $ HM.insert tInfoHash tl
362> return (Just t)
363> -}
364
365> unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
366> unregisterTorrent = error "unregisterTorrent"
367> -- modifyTVar' torrentMap $ HM.delete ih
368
369> torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
370> torrentSwarm _ _ (Active sws) = return sws
371> torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc
372> torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih
373
374> lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession
375> lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih
376
377Peer session creation
378------------------------------------------------------------------------
379
380The peer session cycle looks like:
381
382 * acquire vacant session and vacant thread slot;
383 * (fork could be here, but not necessary)
384 * establish peer connection;
385 * register peer session;
386 * ... exchange process ...
387 * unregister peer session;
388 * close peer connection;
389 * release acquired session and thread slot.
390
391TODO: explain why this order
392TODO: thread throttling
393TODO: check if it connected yet peer
394TODO: utilize peer Id.
395TODO: use STM semaphore
396
397> openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
398> openSession ss @ SwarmSession {..} addr Handshake {..} = do
399> let clientCaps = encodeExts $ allowedExtensions $ clientSession
400> let enabled = decodeExts (enabledCaps clientCaps hsReserved)
401> ps <- PeerSession addr ss enabled
402> <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ())
403> <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ())
404> <*> atomically (dupTChan broadcastMessages)
405> <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield)
406> -- TODO we could implement more interesting throtling scheme
407> -- using connected peer information
408> atomically $ modifyTVar' connectedPeers (S.insert ps)
409> return ps
410
411> closeSession :: PeerSession -> IO ()
412> closeSession ps @ PeerSession {..} = do
413> atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)
414
415> type PeerConn = (Socket, PeerSession)
416> type Exchange = PeerConn -> IO ()
417
418> sendClientStatus :: PeerConn -> IO ()
419> sendClientStatus (sock, PeerSession {..}) = do
420> cbf <- readTVarIO $ clientBitfield $ swarmSession
421> sendAll sock $ encode $ Bitfield cbf
422>
423> port <- dhtPort $ clientSession swarmSession
424> when (ExtDHT `elem` enabledExtensions) $ do
425> sendAll sock $ encode $ Port port
426
427Exchange action depends on session and socket, whereas session depends
428on socket:
429
430 socket------>-----exchange
431 | |
432 \-->--session-->--/
433
434To handle exceptions properly we double bracket socket and session
435then joining the resources and also ignoring session local exceptions.
436
437> runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
438> runSession connector opener action =
439> handle isSessionException $
440> bracket connector close $ \sock ->
441> bracket (opener sock) closeSession $ \ses ->
442> action (sock, ses)
443
444Used then the client want to connect to a peer.
445
446> initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
447> initiatePeerSession ss @ SwarmSession {..} addr
448> = runSession (connectToPeer addr) initiated
449> where
450> initiated sock = do
451> phs <- handshake sock (swarmHandshake ss)
452> ps <- openSession ss addr phs
453> sendClientStatus (sock, ps)
454> return ps
455
456Used the a peer want to connect to the client.
457
458> acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
459> acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
460> where
461> accepted sock = do
462> phs <- recvHandshake sock
463> swarm <- lookupSwarm cs $ hsInfoHash phs
464> ps <- openSession swarm addr phs
465> sendHandshake sock $ Handshake {
466> hsProtocol = defaultBTProtocol
467> , hsReserved = encodeExts $ enabledExtensions ps
468> , hsInfoHash = hsInfoHash phs
469> , hsPeerId = clientPeerId
470> }
471> sendClientStatus (sock, ps)
472> return ps
473
474
475> listener :: ClientSession -> Exchange -> PortNumber -> IO ()
476> listener cs action serverPort = bracket openListener close loop
477> where
478> loop sock = forever $ handle isIOError $ do
479> (conn, addr) <- accept sock
480> case addr of
481> SockAddrInet port host -> do
482> acceptPeerSession cs (PeerAddr Nothing host port) conn action
483> _ -> return ()
484>
485> isIOError :: IOError -> IO ()
486> isIOError _ = return ()
487>
488> openListener = do
489> sock <- socket AF_INET Stream defaultProtocol
490> bindSocket sock (SockAddrInet serverPort 0)
491> listen sock 1
492> return sock
493
494Broadcasting: Have, Cancel, Bitfield, SuggestPiece
495------------------------------------------------------------------------
496
497Here we should enqueue broadcast messages and keep in mind that:
498 * We should enqueue broadcast events as they are appear.
499 * We should yield broadcast messages as fast as we get them.
500
501these 2 phases might differ in time significantly
502
503**TODO**: do this; but only when it'll be clean which other broadcast
504messages & events we should send.
505
5061. Update client have bitfield --\____ in one transaction;
5072. Update downloaded stats --/
5083. Signal to the all other peer about this.
509
510> available :: Bitfield -> SwarmSession -> IO ()
511> available bf se @ SwarmSession {..} = {-# SCC available #-} do
512> mark >> atomically broadcast
513> where
514> mark = do
515> let piLen = ciPieceLength $ tInfo $ torrentMeta
516> let bytes = piLen * BF.haveCount bf
517> atomically $ do
518> modifyTVar' clientBitfield (BF.union bf)
519> modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
520>
521> broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
522
523
524TODO compute size of messages: if it's faster to send Bitfield
525instead many Have do that
526
527Also if there is single Have message in queue then the
528corresponding piece is likely still in memory or disc cache,
529when we can send SuggestPiece.
530
531Get pending messages queue appeared in result of asynchronously
532changed client state. Resulting queue should be sent to a peer
533immediately.
534
535> getPending :: PeerSession -> IO [Message]
536> getPending PeerSession {..} = {-# SCC getPending #-} do
537> atomically (readAvail pendingMessages)
538
539> readAvail :: TChan a -> STM [a]
540> readAvail chan = do
541> m <- tryReadTChan chan
542> case m of
543> Just a -> (:) <$> pure a <*> readAvail chan
544> Nothing -> return []
545
546Timeouts
547-----------------------------------------------------------------------
548
549for internal use only
550
551> sec :: Int
552> sec = 1000 * 1000
553
554> maxIncomingTime :: Int
555> maxIncomingTime = 120 * sec
556
557> maxOutcomingTime :: Int
558> maxOutcomingTime = 1 * sec
559
560> -- | Should be called after we have received any message from a peer.
561> updateIncoming :: PeerSession -> IO ()
562> updateIncoming PeerSession {..} = do
563> updateTimeout (eventManager (clientSession swarmSession))
564> incomingTimeout maxIncomingTime
565
566> -- | Should be called before we have send any message to a peer.
567> updateOutcoming :: PeerSession -> IO ()
568> updateOutcoming PeerSession {..} =
569> updateTimeout (eventManager (clientSession swarmSession))
570> outcomingTimeout maxOutcomingTime
571
572> sendKA :: Socket -> IO ()
573> sendKA sock {- SwarmSession {..} -} = do
574> return ()
575> -- print "I'm sending keep alive."
576> -- sendAll sock (encode BT.KeepAlive)
577> -- let mgr = eventManager clientSession
578> -- updateTimeout mgr
579> -- print "Done.."