diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-07-10 21:59:33 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-07-10 21:59:33 +0400 |
commit | 6c1b01ccd8df86a57e6d60e36538a895934fce0a (patch) | |
tree | a9a4b5f7f70502d8429ebbca1ef0dee768143386 | |
parent | b57d508bba214ba59d7af609d4cac8a57d56dfeb (diff) |
~ Peer session establishment.
* Fixed exception handling;
* Add acceptPeerSession needed by peer listener.
* Simplify initiatePeerSession
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.lhs | 219 |
2 files changed, 146 insertions, 75 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index ca82181c..5be9ae73 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -156,7 +156,7 @@ instance MonadState SessionState P2P where | |||
156 | runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () | 156 | runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () |
157 | runSession se addr p2p = | 157 | runSession se addr p2p = |
158 | handle isIOException $ | 158 | handle isIOException $ |
159 | withPeerSession se addr $ \(sock, pses) -> do | 159 | initiatePeerSession se addr $ \(sock, pses) -> do |
160 | runPeerWire sock (runReaderT (unP2P p2p) pses) | 160 | runPeerWire sock (runReaderT (unP2P p2p) pses) |
161 | where | 161 | where |
162 | isIOException :: IOException -> IO () | 162 | isIOException :: IOException -> IO () |
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs index c0562223..15606c57 100644 --- a/src/Network/BitTorrent/Internal.lhs +++ b/src/Network/BitTorrent/Internal.lhs | |||
@@ -24,7 +24,8 @@ | |||
24 | > Progress(..), startProgress | 24 | > Progress(..), startProgress |
25 | > | 25 | > |
26 | > -- * Client | 26 | > -- * Client |
27 | > , ClientSession (clientPeerId, allowedExtensions, listenerPort) | 27 | > , ClientSession (clientPeerId, allowedExtensions) |
28 | > , listenerPort, dhtPort | ||
28 | > | 29 | > |
29 | > , ThreadCount | 30 | > , ThreadCount |
30 | > , defaultThreadCount | 31 | > , defaultThreadCount |
@@ -61,7 +62,8 @@ | |||
61 | > , sessionState | 62 | > , sessionState |
62 | > ) | 63 | > ) |
63 | > , SessionState | 64 | > , SessionState |
64 | > , withPeerSession | 65 | > , initiatePeerSession |
66 | > , acceptPeerSession | ||
65 | > | 67 | > |
66 | > -- ** Broadcasting | 68 | > -- ** Broadcasting |
67 | > , available | 69 | > , available |
@@ -87,6 +89,7 @@ | |||
87 | > import Control.Concurrent.STM | 89 | > import Control.Concurrent.STM |
88 | > import Control.Concurrent.MSem as MSem | 90 | > import Control.Concurrent.MSem as MSem |
89 | > import Control.Lens | 91 | > import Control.Lens |
92 | > import Control.Monad (when) | ||
90 | > import Control.Exception | 93 | > import Control.Exception |
91 | > import Control.Monad.Trans | 94 | > import Control.Monad.Trans |
92 | 95 | ||
@@ -245,6 +248,11 @@ but nothing more. So to accept new 'PeerSession' we need to lookup | |||
245 | torrent metainfo and content files (if there are some) by the | 248 | torrent metainfo and content files (if there are some) by the |
246 | 'InfoHash' and only after that enter exchange loop. | 249 | 'InfoHash' and only after that enter exchange loop. |
247 | 250 | ||
251 | TODO: check content files location; | ||
252 | |||
253 | > validateLocation :: TorrentLoc -> IO Torrent | ||
254 | > validateLocation = fromFile . metafilePath | ||
255 | |||
248 | Solution with TorrentLoc is much better and takes much more less | 256 | Solution with TorrentLoc is much better and takes much more less |
249 | space, moreover it depends on count of torrents but not on count of | 257 | space, moreover it depends on count of torrents but not on count of |
250 | data itself. To scale further, in future we might add something like | 258 | data itself. To scale further, in future we might add something like |
@@ -331,11 +339,8 @@ and different enabled extensions at the same time. | |||
331 | > -- 'PeerSession'. | 339 | > -- 'PeerSession'. |
332 | > , allowedExtensions :: [Extension] | 340 | > , allowedExtensions :: [Extension] |
333 | 341 | ||
334 | -- > , peerListener :: !ClientService | 342 | > , peerListener :: !ClientService |
335 | -- > , nodeListener :: !ClientService | 343 | > , nodeListener :: !ClientService |
336 | |||
337 | > -- | Port where client listen for the other peers. | ||
338 | > , listenerPort :: PortNumber | ||
339 | 344 | ||
340 | > -- | Semaphor used to bound number of active P2P sessions. | 345 | > -- | Semaphor used to bound number of active P2P sessions. |
341 | > , activeThreads :: !(MSem ThreadCount) | 346 | > , activeThreads :: !(MSem ThreadCount) |
@@ -414,7 +419,8 @@ Retrieving client info | |||
414 | > ClientSession | 419 | > ClientSession |
415 | > <$> genPeerId | 420 | > <$> genPeerId |
416 | > <*> pure exts | 421 | > <*> pure exts |
417 | > <*> pure 10 -- forkListener (error "listener") | 422 | > <*> pure (ClientService 10 undefined) -- TODO |
423 | > <*> pure (ClientService 20 undefined) -- TODO | ||
418 | > <*> MSem.new n | 424 | > <*> MSem.new n |
419 | > <*> pure n | 425 | > <*> pure n |
420 | > <*> newTVarIO M.empty | 426 | > <*> newTVarIO M.empty |
@@ -422,16 +428,11 @@ Retrieving client info | |||
422 | > <*> newTVarIO (startProgress 0) | 428 | > <*> newTVarIO (startProgress 0) |
423 | > <*> newTVarIO HM.empty | 429 | > <*> newTVarIO HM.empty |
424 | 430 | ||
425 | > listenerHandler :: ClientSession -> Socket -> IO () | 431 | > listenerPort :: ClientSession -> PortNumber |
426 | > listenerHandler ses sock = do | 432 | > listenerPort = servPort . peerListener |
427 | > Handshake {..} <- recvHandshake sock | 433 | |
428 | > status <- torrentPresence ses hsInfoHash | 434 | > dhtPort :: ClientSession -> PortNumber |
429 | > case status of | 435 | > dhtPort = servPort . nodeListener |
430 | > Unknown -> return () | ||
431 | > Active ses -> error "listener handler" | ||
432 | > -- TODO here we need to lookup local torrent status: BF e.t.c> | ||
433 | > Registered _ -> return () | ||
434 | > return () | ||
435 | 436 | ||
436 | Swarm sessions | 437 | Swarm sessions |
437 | ------------------------------------------------------------------------ | 438 | ------------------------------------------------------------------------ |
@@ -489,10 +490,9 @@ example consider the following very simle and realistic scenario: | |||
489 | simultaneously. | 490 | simultaneously. |
490 | 491 | ||
491 | There some other situation the problem may occur: duplicates in | 492 | There some other situation the problem may occur: duplicates in |
492 | successive tracker responses, tracker and DHT returns. | 493 | successive tracker responses, tracker and DHT returns. So without any |
493 | 494 | protection we end up with two session between the same peers. That's | |
494 | So without any protection we end up with two session between the same | 495 | bad because this could lead: |
495 | peers. That's bad because this could lead: | ||
496 | 496 | ||
497 | * Reduced throughput - multiple sessions between the same peers will | 497 | * Reduced throughput - multiple sessions between the same peers will |
498 | mutiply control overhead (control messages, session state). | 498 | mutiply control overhead (control messages, session state). |
@@ -559,6 +559,14 @@ INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers | |||
559 | > pieceLength = ciPieceLength . tInfo . torrentMeta | 559 | > pieceLength = ciPieceLength . tInfo . torrentMeta |
560 | > {-# INLINE pieceLength #-} | 560 | > {-# INLINE pieceLength #-} |
561 | 561 | ||
562 | > swarmHandshake :: SwarmSession -> Handshake | ||
563 | > swarmHandshake SwarmSession {..} = Handshake { | ||
564 | > hsProtocol = defaultBTProtocol | ||
565 | > , hsReserved = encodeExts $ allowedExtensions $ clientSession | ||
566 | > , hsInfoHash = tInfoHash torrentMeta | ||
567 | > , hsPeerId = clientPeerId $ clientSession | ||
568 | > } | ||
569 | |||
562 | > {- | 570 | > {- |
563 | > haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () | 571 | > haveDone :: MonadIO m => PieceIx -> SwarmSession -> m () |
564 | > haveDone ix = | 572 | > haveDone ix = |
@@ -629,7 +637,14 @@ avoid reduntant KA messages. | |||
629 | > , sessionState :: !(IORef SessionState) | 637 | > , sessionState :: !(IORef SessionState) |
630 | > } | 638 | > } |
631 | 639 | ||
632 | > -- TODO unpack some fields | 640 | > instance Eq PeerSession where |
641 | > (==) = (==) `on` connectedPeerAddr | ||
642 | |||
643 | > instance Ord PeerSession where | ||
644 | > compare = comparing connectedPeerAddr | ||
645 | |||
646 | Peer session state | ||
647 | ------------------------------------------------------------------------ | ||
633 | 648 | ||
634 | > data SessionState = SessionState { | 649 | > data SessionState = SessionState { |
635 | > _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | 650 | > _bitfield :: !Bitfield -- ^ Other peer Have bitfield. |
@@ -638,11 +653,8 @@ avoid reduntant KA messages. | |||
638 | 653 | ||
639 | > $(makeLenses ''SessionState) | 654 | > $(makeLenses ''SessionState) |
640 | 655 | ||
641 | > instance Eq PeerSession where | 656 | > initialSessionState :: PieceCount -> SessionState |
642 | > (==) = (==) `on` connectedPeerAddr | 657 | > initialSessionState pc = SessionState (haveNone pc) def |
643 | |||
644 | > instance Ord PeerSession where | ||
645 | > compare = comparing connectedPeerAddr | ||
646 | 658 | ||
647 | > findPieceCount :: PeerSession -> PieceCount | 659 | > findPieceCount :: PeerSession -> PieceCount |
648 | > findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession | 660 | > findPieceCount = pieceCount . tInfo . torrentMeta . swarmSession |
@@ -655,7 +667,8 @@ Peer session exceptions | |||
655 | > -- tracker, or any other session. | 667 | > -- tracker, or any other session. |
656 | > -- | 668 | > -- |
657 | > data SessionException = PeerDisconnected | 669 | > data SessionException = PeerDisconnected |
658 | > | ProtocolError Doc | 670 | > | ProtocolError Doc |
671 | > | UnknownTorrent InfoHash | ||
659 | > deriving (Show, Typeable) | 672 | > deriving (Show, Typeable) |
660 | 673 | ||
661 | > instance Exception SessionException | 674 | > instance Exception SessionException |
@@ -670,51 +683,108 @@ Peer session exceptions | |||
670 | > putSessionException :: SessionException -> IO () | 683 | > putSessionException :: SessionException -> IO () |
671 | > putSessionException = print | 684 | > putSessionException = print |
672 | 685 | ||
686 | > torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession | ||
687 | > torrentSwarm _ _ (Active sws) = return sws | ||
688 | > torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc | ||
689 | > torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih | ||
690 | |||
691 | > lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
692 | > lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih | ||
693 | |||
673 | Peer session creation | 694 | Peer session creation |
674 | ------------------------------------------------------------------------ | 695 | ------------------------------------------------------------------------ |
675 | 696 | ||
676 | > -- TODO modify such that we can use this in listener loop | 697 | The peer session cycle looks like: |
677 | > -- TODO check if it connected yet peer | 698 | |
678 | > withPeerSession :: SwarmSession -> PeerAddr | 699 | * acquire vacant session and vacant thread slot; |
679 | > -> ((Socket, PeerSession) -> IO ()) | 700 | * (fork could be here, but not necessary) |
680 | > -> IO () | 701 | * establish peer connection; |
681 | 702 | * register peer session; | |
682 | > withPeerSession ss @ SwarmSession {..} addr | 703 | * ... exchange process ... |
683 | > = handle isSessionException . bracket openSession closeSession | 704 | * unregister peer session; |
705 | * close peer connection; | ||
706 | * release acquired session and thread slot. | ||
707 | |||
708 | TODO: explain why this order | ||
709 | TODO: thread throttling | ||
710 | TODO: check if it connected yet peer | ||
711 | TODO: utilize peer Id. | ||
712 | TODO: use STM semaphore | ||
713 | |||
714 | > openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | ||
715 | > openSession ss @ SwarmSession {..} addr Handshake {..} = do | ||
716 | > let clientCaps = encodeExts $ allowedExtensions $ clientSession | ||
717 | > let enabled = decodeExts (enabledCaps clientCaps hsReserved) | ||
718 | > ps <- PeerSession addr ss enabled | ||
719 | > <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ()) | ||
720 | > <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ()) | ||
721 | > <*> atomically (dupTChan broadcastMessages) | ||
722 | > <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) | ||
723 | > -- TODO we could implement more interesting throtling scheme | ||
724 | > -- using connected peer information | ||
725 | > atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
726 | > return ps | ||
727 | |||
728 | > closeSession :: PeerSession -> IO () | ||
729 | > closeSession ps @ PeerSession {..} = do | ||
730 | > atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps) | ||
731 | |||
732 | > type PeerConn = (Socket, PeerSession) | ||
733 | > type Exchange = PeerConn -> IO () | ||
734 | |||
735 | > sendClientStatus :: PeerConn -> IO () | ||
736 | > sendClientStatus (sock, PeerSession {..}) = do | ||
737 | > cbf <- readTVarIO $ clientBitfield $ swarmSession | ||
738 | > sendAll sock $ encode $ Bitfield cbf | ||
739 | > when (ExtDHT `elem` enabledExtensions) $ do | ||
740 | > sendAll sock $ encode $ Port $ dhtPort $ clientSession swarmSession | ||
741 | |||
742 | Exchange action depends on session and socket, whereas session depends | ||
743 | on socket: | ||
744 | |||
745 | socket------>-----exchange | ||
746 | | | | ||
747 | \-->--session-->--/ | ||
748 | |||
749 | To handle exceptions properly we double bracket socket and session | ||
750 | then joining the resources and also ignoring session local exceptions. | ||
751 | |||
752 | > runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO () | ||
753 | > runSession connector opener action = | ||
754 | > handle isSessionException $ | ||
755 | > bracket connector close $ \sock -> | ||
756 | > bracket (opener sock) closeSession $ \ses -> | ||
757 | > action (sock, ses) | ||
758 | |||
759 | Used then the client want to connect to a peer. | ||
760 | |||
761 | > initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | ||
762 | > initiatePeerSession ss @ SwarmSession {..} addr | ||
763 | > = runSession (connectToPeer addr) initiated | ||
684 | > where | 764 | > where |
685 | > openSession = do | 765 | > initiated sock = do |
686 | > let caps = encodeExts $ allowedExtensions $ clientSession | 766 | > phs <- handshake sock (swarmHandshake ss) |
687 | > let ihash = tInfoHash torrentMeta | 767 | > ps <- openSession ss addr phs |
688 | > let pid = clientPeerId $ clientSession | 768 | > sendClientStatus (sock, ps) |
689 | > let chs = Handshake defaultBTProtocol caps ihash pid | 769 | > return ps |
690 | 770 | ||
691 | > sock <- connectToPeer addr | 771 | Used the a peer want to connect to the client. |
692 | > phs <- handshake sock chs `onException` close sock | ||
693 | 772 | ||
694 | > cbf <- readTVarIO clientBitfield | 773 | > acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO () |
695 | > sendAll sock (encode (Bitfield cbf)) | 774 | > acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted |
696 | 775 | > where | |
697 | > let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) | 776 | > accepted sock = do |
698 | > ps <- PeerSession addr ss enabled | 777 | > phs <- recvHandshake sock |
699 | > <$> registerTimeout (eventManager clientSession) | 778 | > swarm <- lookupSwarm cs $ hsInfoHash phs |
700 | > maxIncomingTime (return ()) | 779 | > ps <- openSession swarm addr phs |
701 | > <*> registerTimeout (eventManager clientSession) | 780 | > sendHandshake sock $ Handshake { |
702 | > maxOutcomingTime (sendKA sock) | 781 | > hsProtocol = defaultBTProtocol |
703 | > <*> atomically (dupTChan broadcastMessages) | 782 | > , hsReserved = encodeExts $ enabledExtensions ps |
704 | > <*> do { | 783 | > , hsInfoHash = hsInfoHash phs |
705 | > ; tc <- totalCount <$> readTVarIO clientBitfield | 784 | > , hsPeerId = clientPeerId |
706 | > ; newIORef (SessionState (haveNone tc) def) | 785 | > } |
707 | > } | 786 | > sendClientStatus (sock, ps) |
708 | 787 | > return ps | |
709 | > atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
710 | |||
711 | > return (sock, ps) | ||
712 | |||
713 | > closeSession (sock, ps) = do | ||
714 | > atomically $ modifyTVar' connectedPeers (S.delete ps) | ||
715 | > close sock | ||
716 | |||
717 | TODO: initiatePeerSession, acceptPeerSession | ||
718 | 788 | ||
719 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece | 789 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece |
720 | ------------------------------------------------------------------------ | 790 | ------------------------------------------------------------------------ |
@@ -748,13 +818,14 @@ messages & events we should send. | |||
748 | TODO compute size of messages: if it's faster to send Bitfield | 818 | TODO compute size of messages: if it's faster to send Bitfield |
749 | instead many Have do that | 819 | instead many Have do that |
750 | 820 | ||
751 | also if there is single Have message in queue then the | 821 | Also if there is single Have message in queue then the |
752 | corresponding piece is likely still in memory or disc cache, | 822 | corresponding piece is likely still in memory or disc cache, |
753 | when we can send SuggestPiece | 823 | when we can send SuggestPiece. |
824 | |||
825 | Get pending messages queue appeared in result of asynchronously | ||
826 | changed client state. Resulting queue should be sent to a peer | ||
827 | immediately. | ||
754 | 828 | ||
755 | > -- | Get pending messages queue appeared in result of asynchronously | ||
756 | > -- changed client state. Resulting queue should be sent to a peer | ||
757 | > -- immediately. | ||
758 | > getPending :: PeerSession -> IO [Message] | 829 | > getPending :: PeerSession -> IO [Message] |
759 | > getPending PeerSession {..} = {-# SCC getPending #-} do | 830 | > getPending PeerSession {..} = {-# SCC getPending #-} do |
760 | > atomically (readAvail pendingMessages) | 831 | > atomically (readAvail pendingMessages) |
@@ -769,7 +840,7 @@ when we can send SuggestPiece | |||
769 | Timeouts | 840 | Timeouts |
770 | ----------------------------------------------------------------------- | 841 | ----------------------------------------------------------------------- |
771 | 842 | ||
772 | > -- for internal use only | 843 | for internal use only |
773 | 844 | ||
774 | > sec :: Int | 845 | > sec :: Int |
775 | > sec = 1000 * 1000 | 846 | > sec = 1000 * 1000 |