summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Internal.lhs
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-07-13 05:22:31 +0400
committerSam T <pxqr.sta@gmail.com>2013-07-13 05:22:31 +0400
commiteecd91150b33d6363419daa8e0461984061ed06c (patch)
treef69c7c9231f717f6755dc9e59618f4afa4ba3c1a /src/Network/BitTorrent/Internal.lhs
parent9c9924831ccd975b359ea20101663502b467c99f (diff)
+ Add listener service.
Diffstat (limited to 'src/Network/BitTorrent/Internal.lhs')
-rw-r--r--src/Network/BitTorrent/Internal.lhs82
1 files changed, 60 insertions, 22 deletions
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
index 15606c57..8461a841 100644
--- a/src/Network/BitTorrent/Internal.lhs
+++ b/src/Network/BitTorrent/Internal.lhs
@@ -23,10 +23,18 @@
23> ( -- * Progress 23> ( -- * Progress
24> Progress(..), startProgress 24> Progress(..), startProgress
25> 25>
26> , ClientService(..)
27>
26> -- * Client 28> -- * Client
27> , ClientSession (clientPeerId, allowedExtensions) 29> , ClientSession ( ClientSession
30> , clientPeerId, allowedExtensions
31> , nodeListener, peerListener
32> )
33> , withClientSession
28> , listenerPort, dhtPort 34> , listenerPort, dhtPort
29> 35>
36> , startService
37>
30> , ThreadCount 38> , ThreadCount
31> , defaultThreadCount 39> , defaultThreadCount
32> 40>
@@ -34,8 +42,6 @@
34> , registerTorrent 42> , registerTorrent
35> , unregisterTorrent 43> , unregisterTorrent
36> 44>
37> , newClient
38>
39> , getCurrentProgress 45> , getCurrentProgress
40> , getSwarmCount 46> , getSwarmCount
41> , getPeerCount 47> , getPeerCount
@@ -64,6 +70,7 @@
64> , SessionState 70> , SessionState
65> , initiatePeerSession 71> , initiatePeerSession
66> , acceptPeerSession 72> , acceptPeerSession
73> , listener
67> 74>
68> -- ** Broadcasting 75> -- ** Broadcasting
69> , available 76> , available
@@ -89,7 +96,7 @@
89> import Control.Concurrent.STM 96> import Control.Concurrent.STM
90> import Control.Concurrent.MSem as MSem 97> import Control.Concurrent.MSem as MSem
91> import Control.Lens 98> import Control.Lens
92> import Control.Monad (when) 99> import Control.Monad (when, forever)
93> import Control.Exception 100> import Control.Exception
94> import Control.Monad.Trans 101> import Control.Monad.Trans
95 102
@@ -106,7 +113,7 @@
106> import Data.Serialize hiding (get) 113> import Data.Serialize hiding (get)
107> import Text.PrettyPrint 114> import Text.PrettyPrint
108 115
109> import Network 116> import Network hiding (accept)
110> import Network.Socket 117> import Network.Socket
111> import Network.Socket.ByteString 118> import Network.Socket.ByteString
112 119
@@ -118,6 +125,7 @@
118> import Network.BitTorrent.Peer 125> import Network.BitTorrent.Peer
119> import Network.BitTorrent.Exchange.Protocol as BT 126> import Network.BitTorrent.Exchange.Protocol as BT
120> import Network.BitTorrent.Tracker.Protocol as BT 127> import Network.BitTorrent.Tracker.Protocol as BT
128> import Network.BitTorrent.DHT.Protocol as BT
121 129
122Progress 130Progress
123------------------------------------------------------------------------ 131------------------------------------------------------------------------
@@ -195,7 +203,7 @@ Peer session is one always forked thread.
195When client\/swarm\/peer session gets closed kill the corresponding 203When client\/swarm\/peer session gets closed kill the corresponding
196threads, but flush data to disc. (for e.g. storage block map) 204threads, but flush data to disc. (for e.g. storage block map)
197 205
198So for e.g., in order to obtain our first block we need to run at 206So for e.g., in order to obtain our first block we need to spawn at
199least 7 threads: main thread, 2 client session threads, 3 swarm session 207least 7 threads: main thread, 2 client session threads, 3 swarm session
200threads and PeerSession thread. 208threads and PeerSession thread.
201 209
@@ -296,10 +304,8 @@ so we can abstract out into ClientService:
296> , servThread :: !ThreadId 304> , servThread :: !ThreadId
297> } deriving Show 305> } deriving Show
298 306
299startService :: PortNumber -> IO a -> IO ClientService 307> startService :: PortNumber -> (PortNumber -> IO ()) -> IO ClientService
300startService p m = forkIO $ handle $ m p 308> startService port m = ClientService port <$> forkIO (m port)
301 where
302 handle :: IOError -> IO ()
303 309
304> stopService :: ClientService -> IO () 310> stopService :: ClientService -> IO ()
305> stopService ClientService {..} = killThread servThread 311> stopService ClientService {..} = killThread servThread
@@ -339,8 +345,8 @@ and different enabled extensions at the same time.
339> -- 'PeerSession'. 345> -- 'PeerSession'.
340> , allowedExtensions :: [Extension] 346> , allowedExtensions :: [Extension]
341 347
342> , peerListener :: !ClientService 348> , peerListener :: !(MVar ClientService)
343> , nodeListener :: !ClientService 349> , nodeListener :: !(MVar ClientService)
344 350
345> -- | Semaphor used to bound number of active P2P sessions. 351> -- | Semaphor used to bound number of active P2P sessions.
346> , activeThreads :: !(MSem ThreadCount) 352> , activeThreads :: !(MSem ThreadCount)
@@ -407,20 +413,20 @@ Retrieving client info
407 413
408> -- | Create a new client session. The data passed to this function are 414> -- | Create a new client session. The data passed to this function are
409> -- usually loaded from configuration file. 415> -- usually loaded from configuration file.
410> newClient :: SessionCount -- ^ Maximum count of active P2P Sessions. 416> openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions.
411> -> [Extension] -- ^ Extensions allowed to use. 417> -> [Extension] -- ^ Extensions allowed to use.
412> -> IO ClientSession -- ^ Client with unique peer ID. 418> -> IO ClientSession -- ^ Client with unique peer ID.
413 419
414> newClient n exts = do 420> openClientSession n exts = do
415> mgr <- Ev.new 421> mgr <- Ev.new
416> -- TODO kill this thread when leave client 422> -- TODO kill this thread when leave client
417> _ <- forkIO $ loop mgr 423> _ <- forkIO $ loop mgr
418 424>
419> ClientSession 425> ClientSession
420> <$> genPeerId 426> <$> genPeerId
421> <*> pure exts 427> <*> pure exts
422> <*> pure (ClientService 10 undefined) -- TODO 428> <*> newEmptyMVar
423> <*> pure (ClientService 20 undefined) -- TODO 429> <*> newEmptyMVar
424> <*> MSem.new n 430> <*> MSem.new n
425> <*> pure n 431> <*> pure n
426> <*> newTVarIO M.empty 432> <*> newTVarIO M.empty
@@ -428,11 +434,21 @@ Retrieving client info
428> <*> newTVarIO (startProgress 0) 434> <*> newTVarIO (startProgress 0)
429> <*> newTVarIO HM.empty 435> <*> newTVarIO HM.empty
430 436
431> listenerPort :: ClientSession -> PortNumber 437> closeClientSession :: ClientSession -> IO ()
432> listenerPort = servPort . peerListener 438> closeClientSession ClientSession {..} =
439> maybeStop (tryTakeMVar peerListener) `finally`
440> maybeStop (tryTakeMVar nodeListener)
441> where
442> maybeStop m = maybe (return ()) stopService =<< m
443
444> withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO ()
445> withClientSession c es = bracket (openClientSession c es) closeClientSession
433 446
434> dhtPort :: ClientSession -> PortNumber 447> listenerPort :: ClientSession -> IO PortNumber
435> dhtPort = servPort . nodeListener 448> listenerPort ClientSession {..} = servPort <$> readMVar peerListener
449
450> dhtPort :: ClientSession -> IO PortNumber
451> dhtPort ClientSession {..} = servPort <$> readMVar nodeListener
436 452
437Swarm sessions 453Swarm sessions
438------------------------------------------------------------------------ 454------------------------------------------------------------------------
@@ -736,8 +752,10 @@ TODO: use STM semaphore
736> sendClientStatus (sock, PeerSession {..}) = do 752> sendClientStatus (sock, PeerSession {..}) = do
737> cbf <- readTVarIO $ clientBitfield $ swarmSession 753> cbf <- readTVarIO $ clientBitfield $ swarmSession
738> sendAll sock $ encode $ Bitfield cbf 754> sendAll sock $ encode $ Bitfield cbf
755>
756> port <- dhtPort $ clientSession swarmSession
739> when (ExtDHT `elem` enabledExtensions) $ do 757> when (ExtDHT `elem` enabledExtensions) $ do
740> sendAll sock $ encode $ Port $ dhtPort $ clientSession swarmSession 758> sendAll sock $ encode $ Port port
741 759
742Exchange action depends on session and socket, whereas session depends 760Exchange action depends on session and socket, whereas session depends
743on socket: 761on socket:
@@ -786,6 +804,26 @@ Used the a peer want to connect to the client.
786> sendClientStatus (sock, ps) 804> sendClientStatus (sock, ps)
787> return ps 805> return ps
788 806
807
808> listener :: ClientSession -> Exchange -> PortNumber -> IO ()
809> listener cs action serverPort = bracket openListener close loop
810> where
811> loop sock = forever $ handle isIOError $ do
812> (conn, addr) <- accept sock
813> case addr of
814> SockAddrInet port host -> do
815> acceptPeerSession cs (PeerAddr Nothing host port) conn action
816> _ -> return ()
817>
818> isIOError :: IOError -> IO ()
819> isIOError _ = return ()
820>
821> openListener = do
822> sock <- socket AF_INET Stream defaultProtocol
823> bindSocket sock (SockAddrInet serverPort 0)
824> listen sock 1
825> return sock
826
789Broadcasting: Have, Cancel, Bitfield, SuggestPiece 827Broadcasting: Have, Cancel, Bitfield, SuggestPiece
790------------------------------------------------------------------------ 828------------------------------------------------------------------------
791 829