summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-07-10 21:59:33 +0400
committerSam T <pxqr.sta@gmail.com>2013-07-10 21:59:33 +0400
commit6c1b01ccd8df86a57e6d60e36538a895934fce0a (patch)
treea9a4b5f7f70502d8429ebbca1ef0dee768143386
parentb57d508bba214ba59d7af609d4cac8a57d56dfeb (diff)
~ Peer session establishment.
* Fixed exception handling; * Add acceptPeerSession needed by peer listener. * Simplify initiatePeerSession
-rw-r--r--src/Network/BitTorrent/Exchange.hs2
-rw-r--r--src/Network/BitTorrent/Internal.lhs219
2 files changed, 146 insertions, 75 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index ca82181c..5be9ae73 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -156,7 +156,7 @@ instance MonadState SessionState P2P where
156runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () 156runSession :: SwarmSession -> PeerAddr -> P2P () -> IO ()
157runSession se addr p2p = 157runSession se addr p2p =
158 handle isIOException $ 158 handle isIOException $
159 withPeerSession se addr $ \(sock, pses) -> do 159 initiatePeerSession se addr $ \(sock, pses) -> do
160 runPeerWire sock (runReaderT (unP2P p2p) pses) 160 runPeerWire sock (runReaderT (unP2P p2p) pses)
161 where 161 where
162 isIOException :: IOException -> IO () 162 isIOException :: IOException -> IO ()
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
index c0562223..15606c57 100644
--- a/src/Network/BitTorrent/Internal.lhs
+++ b/src/Network/BitTorrent/Internal.lhs
@@ -24,7 +24,8 @@
24> Progress(..), startProgress 24> Progress(..), startProgress
25> 25>
26> -- * Client 26> -- * Client
27> , ClientSession (clientPeerId, allowedExtensions, listenerPort) 27> , ClientSession (clientPeerId, allowedExtensions)
28> , listenerPort, dhtPort
28> 29>
29> , ThreadCount 30> , ThreadCount
30> , defaultThreadCount 31> , defaultThreadCount
@@ -61,7 +62,8 @@
61> , sessionState 62> , sessionState
62> ) 63> )
63> , SessionState 64> , SessionState
64> , withPeerSession 65> , initiatePeerSession
66> , acceptPeerSession
65> 67>
66> -- ** Broadcasting 68> -- ** Broadcasting
67> , available 69> , available
@@ -87,6 +89,7 @@
87> import Control.Concurrent.STM 89> import Control.Concurrent.STM
88> import Control.Concurrent.MSem as MSem 90> import Control.Concurrent.MSem as MSem
89> import Control.Lens 91> import Control.Lens
92> import Control.Monad (when)
90> import Control.Exception 93> import Control.Exception
91> import Control.Monad.Trans 94> import Control.Monad.Trans
92 95
@@ -245,6 +248,11 @@ but nothing more. So to accept new 'PeerSession' we need to lookup
245torrent metainfo and content files (if there are some) by the 248torrent metainfo and content files (if there are some) by the
246'InfoHash' and only after that enter exchange loop. 249'InfoHash' and only after that enter exchange loop.
247 250
251TODO: check content files location;
252
253> validateLocation :: TorrentLoc -> IO Torrent
254> validateLocation = fromFile . metafilePath
255
248Solution with TorrentLoc is much better and takes much more less 256Solution with TorrentLoc is much better and takes much more less
249space, moreover it depends on count of torrents but not on count of 257space, moreover it depends on count of torrents but not on count of
250data itself. To scale further, in future we might add something like 258data itself. To scale further, in future we might add something like
@@ -331,11 +339,8 @@ and different enabled extensions at the same time.
331> -- 'PeerSession'. 339> -- 'PeerSession'.
332> , allowedExtensions :: [Extension] 340> , allowedExtensions :: [Extension]
333 341
334-- > , peerListener :: !ClientService 342> , peerListener :: !ClientService
335-- > , nodeListener :: !ClientService 343> , nodeListener :: !ClientService
336
337> -- | Port where client listen for the other peers.
338> , listenerPort :: PortNumber
339 344
340> -- | Semaphor used to bound number of active P2P sessions. 345> -- | Semaphor used to bound number of active P2P sessions.
341> , activeThreads :: !(MSem ThreadCount) 346> , activeThreads :: !(MSem ThreadCount)
@@ -414,7 +419,8 @@ Retrieving client info
414> ClientSession 419> ClientSession
415> <$> genPeerId 420> <$> genPeerId
416> <*> pure exts 421> <*> pure exts
417> <*> pure 10 -- forkListener (error "listener") 422> <*> pure (ClientService 10 undefined) -- TODO
423> <*> pure (ClientService 20 undefined) -- TODO
418> <*> MSem.new n 424> <*> MSem.new n
419> <*> pure n 425> <*> pure n
420> <*> newTVarIO M.empty 426> <*> newTVarIO M.empty
@@ -422,16 +428,11 @@ Retrieving client info
422> <*> newTVarIO (startProgress 0) 428> <*> newTVarIO (startProgress 0)
423> <*> newTVarIO HM.empty 429> <*> newTVarIO HM.empty
424 430
425> listenerHandler :: ClientSession -> Socket -> IO () 431> listenerPort :: ClientSession -> PortNumber
426> listenerHandler ses sock = do 432> listenerPort = servPort . peerListener
427> Handshake {..} <- recvHandshake sock 433
428> status <- torrentPresence ses hsInfoHash 434> dhtPort :: ClientSession -> PortNumber
429> case status of 435> dhtPort = servPort . nodeListener
430> Unknown -> return ()
431> Active ses -> error "listener handler"
432> -- TODO here we need to lookup local torrent status: BF e.t.c>
433> Registered _ -> return ()
434> return ()
435 436
436Swarm sessions 437Swarm sessions
437------------------------------------------------------------------------ 438------------------------------------------------------------------------
@@ -489,10 +490,9 @@ example consider the following very simle and realistic scenario:
489simultaneously. 490simultaneously.
490 491
491There some other situation the problem may occur: duplicates in 492There some other situation the problem may occur: duplicates in
492successive tracker responses, tracker and DHT returns. 493successive tracker responses, tracker and DHT returns. So without any
493 494protection we end up with two session between the same peers. That's
494So without any protection we end up with two session between the same 495bad because this could lead:
495peers. That's bad because this could lead:
496 496
497 * Reduced throughput - multiple sessions between the same peers will 497 * Reduced throughput - multiple sessions between the same peers will
498mutiply control overhead (control messages, session state). 498mutiply control overhead (control messages, session state).
@@ -559,6 +559,14 @@ INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers
559> pieceLength = ciPieceLength . tInfo . torrentMeta 559> pieceLength = ciPieceLength . tInfo . torrentMeta
560> {-# INLINE pieceLength #-} 560> {-# INLINE pieceLength #-}
561 561
562> swarmHandshake :: SwarmSession -> Handshake
563> swarmHandshake SwarmSession {..} = Handshake {
564> hsProtocol = defaultBTProtocol
565> , hsReserved = encodeExts $ allowedExtensions $ clientSession
566> , hsInfoHash = tInfoHash torrentMeta
567> , hsPeerId = clientPeerId $ clientSession
568> }
569
562> {- 570> {-
563> haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () 571> haveDone :: MonadIO m => PieceIx -> SwarmSession -> m ()
564> haveDone ix = 572> haveDone ix =
@@ -629,7 +637,14 @@ avoid reduntant KA messages.
629> , sessionState :: !(IORef SessionState) 637> , sessionState :: !(IORef SessionState)
630> } 638> }
631 639
632> -- TODO unpack some fields 640> instance Eq PeerSession where
641> (==) = (==) `on` connectedPeerAddr
642
643> instance Ord PeerSession where
644> compare = comparing connectedPeerAddr
645
646Peer session state
647------------------------------------------------------------------------
633 648
634> data SessionState = SessionState { 649> data SessionState = SessionState {
635> _bitfield :: !Bitfield -- ^ Other peer Have bitfield. 650> _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
@@ -638,11 +653,8 @@ avoid reduntant KA messages.
638 653
639> $(makeLenses ''SessionState) 654> $(makeLenses ''SessionState)
640 655
641> instance Eq PeerSession where 656> initialSessionState :: PieceCount -> SessionState
642> (==) = (==) `on` connectedPeerAddr 657> initialSessionState pc = SessionState (haveNone pc) def
643
644> instance Ord PeerSession where
645> compare = comparing connectedPeerAddr
646 658
647> findPieceCount :: PeerSession -> PieceCount 659> findPieceCount :: PeerSession -> PieceCount
648> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession 660> findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession
@@ -655,7 +667,8 @@ Peer session exceptions
655> -- tracker, or any other session. 667> -- tracker, or any other session.
656> -- 668> --
657> data SessionException = PeerDisconnected 669> data SessionException = PeerDisconnected
658> | ProtocolError Doc 670> | ProtocolError Doc
671> | UnknownTorrent InfoHash
659> deriving (Show, Typeable) 672> deriving (Show, Typeable)
660 673
661> instance Exception SessionException 674> instance Exception SessionException
@@ -670,51 +683,108 @@ Peer session exceptions
670> putSessionException :: SessionException -> IO () 683> putSessionException :: SessionException -> IO ()
671> putSessionException = print 684> putSessionException = print
672 685
686> torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
687> torrentSwarm _ _ (Active sws) = return sws
688> torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc
689> torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih
690
691> lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession
692> lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih
693
673Peer session creation 694Peer session creation
674------------------------------------------------------------------------ 695------------------------------------------------------------------------
675 696
676> -- TODO modify such that we can use this in listener loop 697The peer session cycle looks like:
677> -- TODO check if it connected yet peer 698
678> withPeerSession :: SwarmSession -> PeerAddr 699 * acquire vacant session and vacant thread slot;
679> -> ((Socket, PeerSession) -> IO ()) 700 * (fork could be here, but not necessary)
680> -> IO () 701 * establish peer connection;
681 702 * register peer session;
682> withPeerSession ss @ SwarmSession {..} addr 703 * ... exchange process ...
683> = handle isSessionException . bracket openSession closeSession 704 * unregister peer session;
705 * close peer connection;
706 * release acquired session and thread slot.
707
708TODO: explain why this order
709TODO: thread throttling
710TODO: check if it connected yet peer
711TODO: utilize peer Id.
712TODO: use STM semaphore
713
714> openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
715> openSession ss @ SwarmSession {..} addr Handshake {..} = do
716> let clientCaps = encodeExts $ allowedExtensions $ clientSession
717> let enabled = decodeExts (enabledCaps clientCaps hsReserved)
718> ps <- PeerSession addr ss enabled
719> <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ())
720> <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ())
721> <*> atomically (dupTChan broadcastMessages)
722> <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield)
723> -- TODO we could implement more interesting throtling scheme
724> -- using connected peer information
725> atomically $ modifyTVar' connectedPeers (S.insert ps)
726> return ps
727
728> closeSession :: PeerSession -> IO ()
729> closeSession ps @ PeerSession {..} = do
730> atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)
731
732> type PeerConn = (Socket, PeerSession)
733> type Exchange = PeerConn -> IO ()
734
735> sendClientStatus :: PeerConn -> IO ()
736> sendClientStatus (sock, PeerSession {..}) = do
737> cbf <- readTVarIO $ clientBitfield $ swarmSession
738> sendAll sock $ encode $ Bitfield cbf
739> when (ExtDHT `elem` enabledExtensions) $ do
740> sendAll sock $ encode $ Port $ dhtPort $ clientSession swarmSession
741
742Exchange action depends on session and socket, whereas session depends
743on socket:
744
745 socket------>-----exchange
746 | |
747 \-->--session-->--/
748
749To handle exceptions properly we double bracket socket and session
750then joining the resources and also ignoring session local exceptions.
751
752> runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
753> runSession connector opener action =
754> handle isSessionException $
755> bracket connector close $ \sock ->
756> bracket (opener sock) closeSession $ \ses ->
757> action (sock, ses)
758
759Used then the client want to connect to a peer.
760
761> initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
762> initiatePeerSession ss @ SwarmSession {..} addr
763> = runSession (connectToPeer addr) initiated
684> where 764> where
685> openSession = do 765> initiated sock = do
686> let caps = encodeExts $ allowedExtensions $ clientSession 766> phs <- handshake sock (swarmHandshake ss)
687> let ihash = tInfoHash torrentMeta 767> ps <- openSession ss addr phs
688> let pid = clientPeerId $ clientSession 768> sendClientStatus (sock, ps)
689> let chs = Handshake defaultBTProtocol caps ihash pid 769> return ps
690 770
691> sock <- connectToPeer addr 771Used the a peer want to connect to the client.
692> phs <- handshake sock chs `onException` close sock
693 772
694> cbf <- readTVarIO clientBitfield 773> acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
695> sendAll sock (encode (Bitfield cbf)) 774> acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
696 775> where
697> let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) 776> accepted sock = do
698> ps <- PeerSession addr ss enabled 777> phs <- recvHandshake sock
699> <$> registerTimeout (eventManager clientSession) 778> swarm <- lookupSwarm cs $ hsInfoHash phs
700> maxIncomingTime (return ()) 779> ps <- openSession swarm addr phs
701> <*> registerTimeout (eventManager clientSession) 780> sendHandshake sock $ Handshake {
702> maxOutcomingTime (sendKA sock) 781> hsProtocol = defaultBTProtocol
703> <*> atomically (dupTChan broadcastMessages) 782> , hsReserved = encodeExts $ enabledExtensions ps
704> <*> do { 783> , hsInfoHash = hsInfoHash phs
705> ; tc <- totalCount <$> readTVarIO clientBitfield 784> , hsPeerId = clientPeerId
706> ; newIORef (SessionState (haveNone tc) def) 785> }
707> } 786> sendClientStatus (sock, ps)
708 787> return ps
709> atomically $ modifyTVar' connectedPeers (S.insert ps)
710
711> return (sock, ps)
712
713> closeSession (sock, ps) = do
714> atomically $ modifyTVar' connectedPeers (S.delete ps)
715> close sock
716
717TODO: initiatePeerSession, acceptPeerSession
718 788
719Broadcasting: Have, Cancel, Bitfield, SuggestPiece 789Broadcasting: Have, Cancel, Bitfield, SuggestPiece
720------------------------------------------------------------------------ 790------------------------------------------------------------------------
@@ -748,13 +818,14 @@ messages & events we should send.
748TODO compute size of messages: if it's faster to send Bitfield 818TODO compute size of messages: if it's faster to send Bitfield
749instead many Have do that 819instead many Have do that
750 820
751also if there is single Have message in queue then the 821Also if there is single Have message in queue then the
752corresponding piece is likely still in memory or disc cache, 822corresponding piece is likely still in memory or disc cache,
753when we can send SuggestPiece 823when we can send SuggestPiece.
824
825Get pending messages queue appeared in result of asynchronously
826changed client state. Resulting queue should be sent to a peer
827immediately.
754 828
755> -- | Get pending messages queue appeared in result of asynchronously
756> -- changed client state. Resulting queue should be sent to a peer
757> -- immediately.
758> getPending :: PeerSession -> IO [Message] 829> getPending :: PeerSession -> IO [Message]
759> getPending PeerSession {..} = {-# SCC getPending #-} do 830> getPending PeerSession {..} = {-# SCC getPending #-} do
760> atomically (readAvail pendingMessages) 831> atomically (readAvail pendingMessages)
@@ -769,7 +840,7 @@ when we can send SuggestPiece
769Timeouts 840Timeouts
770----------------------------------------------------------------------- 841-----------------------------------------------------------------------
771 842
772> -- for internal use only 843for internal use only
773 844
774> sec :: Int 845> sec :: Int
775> sec = 1000 * 1000 846> sec = 1000 * 1000