summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs111
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs263
2 files changed, 296 insertions, 78 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 88554807..1537efe1 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -7,11 +7,16 @@ module Network.BitTorrent.Exchange.Session
7 , newSession 7 , newSession
8 , closeSession 8 , closeSession
9 9
10 -- * Connections
10 , Network.BitTorrent.Exchange.Session.insert 11 , Network.BitTorrent.Exchange.Session.insert
12 , Network.BitTorrent.Exchange.Session.attach
13 , Network.BitTorrent.Exchange.Session.delete
14 , Network.BitTorrent.Exchange.Session.deleteAll
11 ) where 15 ) where
12 16
13import Control.Applicative 17import Control.Applicative
14import Control.Concurrent 18import Control.Concurrent
19import Control.Concurrent.STM
15import Control.Exception hiding (Handler) 20import Control.Exception hiding (Handler)
16import Control.Lens 21import Control.Lens
17import Control.Monad.Logger 22import Control.Monad.Logger
@@ -23,6 +28,7 @@ import Data.Conduit.List as CL (iterM)
23import Data.Maybe 28import Data.Maybe
24import Data.Map as M 29import Data.Map as M
25import Data.Monoid 30import Data.Monoid
31import Data.Set as S
26import Data.Text as T 32import Data.Text as T
27import Data.Typeable 33import Data.Typeable
28import Text.PrettyPrint hiding ((<>)) 34import Text.PrettyPrint hiding ((<>))
@@ -70,11 +76,6 @@ data Cached a = Cached
70cache :: BEncode a => a -> Cached a 76cache :: BEncode a => a -> Cached a
71cache s = Cached s (BE.encode s) 77cache s = Cached s (BE.encode s)
72 78
73data ConnectionEntry = ConnectionEntry
74 { initiatedBy :: !ChannelSide
75 , connection :: !(Connection Session)
76 }
77
78data Session = Session 79data Session = Session
79 { tpeerId :: PeerId 80 { tpeerId :: PeerId
80 , infohash :: InfoHash 81 , infohash :: InfoHash
@@ -82,12 +83,15 @@ data Session = Session
82 , storage :: Storage 83 , storage :: Storage
83 , status :: MVar SessionStatus 84 , status :: MVar SessionStatus
84 , unchoked :: [PeerAddr IP] 85 , unchoked :: [PeerAddr IP]
85 , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) 86 , pendingConnections :: TVar (Set (PeerAddr IP))
87 , establishedConnections :: TVar (Map (PeerAddr IP) (Connection Session))
86 , broadcast :: Chan Message 88 , broadcast :: Chan Message
87 , logger :: LogFun 89 , logger :: LogFun
88 , infodict :: MVar (Cached InfoDict) 90 , infodict :: MVar (Cached InfoDict)
89 } 91 }
90 92
93instance Ord IP
94
91-- | Logger function. 95-- | Logger function.
92type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () 96type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
93 97
@@ -97,18 +101,21 @@ newSession :: LogFun
97 -> InfoDict -- ^ torrent info dictionary; 101 -> InfoDict -- ^ torrent info dictionary;
98 -> IO Session -- ^ 102 -> IO Session -- ^
99newSession logFun addr rootPath dict = do 103newSession logFun addr rootPath dict = do
100 connVar <- newMVar M.empty 104 pconnVar <- newTVarIO S.empty
105 econnVar <- newTVarIO M.empty
101 store <- openInfoDict ReadWriteEx rootPath dict 106 store <- openInfoDict ReadWriteEx rootPath dict
102 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) 107 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store))
103 (piPieceLength (idPieceInfo dict)) 108 (piPieceLength (idPieceInfo dict))
104 chan <- newChan 109 chan <- newChan
105 return Session 110 return Session
106 { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) 111 { tpeerId = fromMaybe (error "newSession: impossible")
107 , infohash = idInfoHash dict 112 (peerId addr)
108 , status = statusVar 113 , infohash = idInfoHash dict
109 , storage = store 114 , status = statusVar
110 , unchoked = [] 115 , storage = store
111 , connections = connVar 116 , unchoked = []
117 , pendingConnections = pconnVar
118 , establishedConnections = econnVar
112 , broadcast = chan 119 , broadcast = chan
113 , logger = logFun 120 , logger = logFun
114 } 121 }
@@ -137,31 +144,79 @@ logEvent :: MonadLogger m => Text -> m ()
137logEvent = logInfoN 144logEvent = logInfoN
138 145
139{----------------------------------------------------------------------- 146{-----------------------------------------------------------------------
147-- Connection slots
148-----------------------------------------------------------------------}
149--- pending -> established -> closed
150--- | /|\
151--- \-------------------------|
152
153pendingConnection :: PeerAddr IP -> Session -> IO Bool
154pendingConnection addr Session {..} = atomically $ do
155 pSet <- readTVar pendingConnections
156 eSet <- readTVar establishedConnections
157 if (addr `S.member` pSet) || (addr `M.member` eSet)
158 then return False
159 else do
160 modifyTVar' pendingConnections (S.insert addr)
161 return True
162
163establishedConnection :: Connected Session ()
164establishedConnection = undefined --atomically $ do
165-- pSet <- readTVar pendingConnections
166-- eSet <- readTVar
167 undefined
168
169finishedConnection :: Connected Session ()
170finishedConnection = return ()
171
172-- | There are no state for this connection, remove it.
173closedConnection :: PeerAddr IP -> Session -> IO ()
174closedConnection addr Session {..} = atomically $ do
175 modifyTVar pendingConnections $ S.delete addr
176 modifyTVar establishedConnections $ M.delete addr
177
178{-----------------------------------------------------------------------
140-- Connections 179-- Connections
141-----------------------------------------------------------------------} 180-----------------------------------------------------------------------}
142-- TODO unmap storage on zero connections 181-- TODO unmap storage on zero connections
143 182
144insert :: PeerAddr IP 183mainWire :: Wire Session ()
145 -> {- Maybe Socket 184mainWire = do
146 -> -} Session -> IO () 185 lift establishedConnection
186 Session {..} <- asks connSession
187 lift $ resizeBitfield (totalPieces storage)
188 logEvent "Connection established"
189 iterM logMessage =$= exchange =$= iterM logMessage
190 lift finishedConnection
191
192getConnectionConfig :: Session -> IO (ConnectionConfig Session)
193getConnectionConfig s @ Session {..} = undefined --ConnectionConfig
194-- let caps = def
195-- let ecaps = def
196-- let hs = Handshake def caps infohash tpeerId
197-- chan <- dupChan broadcast
198
199-- { cfgPrefs = undefined
200-- , cfgSession = ConnectionSession undefined undefined s
201-- , cfgWire = mainWire
202-- }
203
204insert :: PeerAddr IP -> Session -> IO ()
147insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) 205insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup)
148 where 206 where
207 action = do
208 pendingConnection addr ses
209 cfg <- getConnectionConfig ses
210 connectWire addr cfg
211
149 cleanup = do 212 cleanup = do
150 runStatusUpdates status (SS.resetPending addr) 213 runStatusUpdates status (SS.resetPending addr)
151 -- TODO Metata.resetPending addr 214 -- TODO Metata.resetPending addr
215 closedConnection addr ses
152 216
153 action = do 217-- TODO closePending on error
154 let caps = def 218attach :: PendingConnection -> Session -> IO ()
155 let ecaps = def 219attach = undefined
156 let hs = Handshake def caps infohash tpeerId
157 chan <- dupChan broadcast
158 connectWire ses hs addr ecaps chan $ do
159 conn <- ask
160-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn
161 lift $ resizeBitfield (totalPieces storage)
162 logEvent "Connection established"
163 iterM logMessage =$= exchange =$= iterM logMessage
164-- liftIO $ modifyMVar_ connections $ pure . M.delete addr
165 220
166delete :: PeerAddr IP -> Session -> IO () 221delete :: PeerAddr IP -> Session -> IO ()
167delete = undefined 222delete = undefined
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index 27915724..d1bed146 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -7,6 +7,11 @@
7-- Stability : experimental 7-- Stability : experimental
8-- Portability : portable 8-- Portability : portable
9-- 9--
10-- Each peer wire connection is identified by triple @(topic,
11-- remote_addr, this_addr)@. This means that connections are the
12-- same if and only if their 'ConnectionId' are the same. Of course,
13-- you /must/ avoid duplicated connections.
14--
10-- This module control /integrity/ of data send and received. 15-- This module control /integrity/ of data send and received.
11-- 16--
12{-# LANGUAGE DeriveDataTypeable #-} 17{-# LANGUAGE DeriveDataTypeable #-}
@@ -21,6 +26,7 @@ module Network.BitTorrent.Exchange.Wire
21 26
22 -- * Connection 27 -- * Connection
23 , Connection 28 , Connection
29 , connInitiatedBy
24 30
25 -- ** Identity 31 -- ** Identity
26 , connRemoteAddr 32 , connRemoteAddr
@@ -44,9 +50,23 @@ module Network.BitTorrent.Exchange.Wire
44 , connStats 50 , connStats
45 51
46 -- * Setup 52 -- * Setup
47 , runWire 53 , ConnectionPrefs (..)
54 , ConnectionSession (..)
55 , ConnectionConfig (..)
56
57 -- ** Initiate
48 , connectWire 58 , connectWire
59
60 -- ** Accept
61 , PendingConnection
62 , newPendingConnection
63 , pendingPeer
64 , pendingCaps
65 , pendingTopic
66 , closePending
49 , acceptWire 67 , acceptWire
68
69 -- ** Post setup actions
50 , resizeBitfield 70 , resizeBitfield
51 71
52 -- * Messaging 72 -- * Messaging
@@ -143,7 +163,7 @@ data ProtocolError
143 163
144 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not 164 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not
145 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in 165 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in
146 -- 'connectWire' only. 166 -- 'connectWire' or 'acceptWire' only.
147 | UnexpectedTopic InfoHash 167 | UnexpectedTopic InfoHash
148 168
149 -- | Some trackers or DHT can return 'PeerId' of a peer. If a 169 -- | Some trackers or DHT can return 'PeerId' of a peer. If a
@@ -470,9 +490,20 @@ data ConnectionState = ConnectionState {
470 490
471makeLenses ''ConnectionState 491makeLenses ''ConnectionState
472 492
493instance Default ConnectionState where
494 def = ConnectionState
495 { _connExtCaps = def
496 , _connRemoteEhs = def
497 , _connStats = def
498 , _connStatus = def
499 , _connBitfield = BF.haveNone 0
500 }
501
473-- | Connection keep various info about both peers. 502-- | Connection keep various info about both peers.
474data Connection s = Connection 503data Connection s = Connection
475 { connRemoteAddr :: !(PeerAddr IP) 504 { connInitiatedBy :: !ChannelSide
505
506 , connRemoteAddr :: !(PeerAddr IP)
476 507
477 -- | /Both/ peers handshaked with this protocol string. The only 508 -- | /Both/ peers handshaked with this protocol string. The only
478 -- value is \"Bittorrent Protocol\" but this can be changed in 509 -- value is \"Bittorrent Protocol\" but this can be changed in
@@ -561,6 +592,29 @@ initiateHandshake sock hs = do
561 sendHandshake sock hs 592 sendHandshake sock hs
562 recvHandshake sock 593 recvHandshake sock
563 594
595data HandshakePair = HandshakePair
596 { handshakeSent :: !Handshake
597 , handshakeRecv :: !Handshake
598 } deriving (Show, Eq)
599
600validatePair :: HandshakePair -> PeerAddr IP -> IO ()
601validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp
602 [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs')
603 , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs')
604 , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs')
605 , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr)
606 , UnexpectedPeerId $ hsPeerId hs')
607 ]
608 where
609 checkProp (t, e) = unless t $ throwIO $ ProtocolError e
610
611-- | Connection state /right/ after handshaking.
612establishedStats :: HandshakePair -> ConnectionStats
613establishedStats HandshakePair {..} = ConnectionStats
614 { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent
615 , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv
616 }
617
564{----------------------------------------------------------------------- 618{-----------------------------------------------------------------------
565-- Wire 619-- Wire
566-----------------------------------------------------------------------} 620-----------------------------------------------------------------------}
@@ -670,53 +724,129 @@ rehandshake caps = undefined
670reconnect :: Wire s () 724reconnect :: Wire s ()
671reconnect = undefined 725reconnect = undefined
672 726
673-- | Initiate 'Wire' connection and handshake with a peer. This function will 727data ConnectionId = ConnectionId
674-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on 728 { topic :: !InfoHash
675-- both sides. 729 , remoteAddr :: !(PeerAddr IP)
730 , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node.
731 }
732
733-- | /Preffered/ settings of wire. To get the real use 'ask'.
734data ConnectionPrefs = ConnectionPrefs
735 { prefOptions :: !Options
736 , prefProtocol :: !ProtocolName
737 , prefCaps :: !Caps
738 , prefExtCaps :: !ExtendedCaps
739 }
740
741instance Default ConnectionPrefs where
742 def = ConnectionPrefs
743 { prefOptions = def
744 , prefProtocol = def
745 , prefCaps = def
746 , prefExtCaps = def
747 }
748
749normalize :: ConnectionPrefs -> ConnectionPrefs
750normalize = undefined
751
752data ConnectionSession s = ConnectionSession
753 { sessionTopic :: !(InfoHash)
754 , sessionPeerId :: !(PeerId)
755 , metadataSize :: !(Maybe Int)
756 , outputChan :: !(Maybe (Chan Message))
757 , connectionSession :: !(s)
758 }
759
760data ConnectionConfig s = ConnectionConfig
761 { cfgPrefs :: !(ConnectionPrefs)
762 , cfgSession :: !(ConnectionSession s)
763 , cfgWire :: !(Wire s ())
764 }
765
766configHandshake :: ConnectionConfig s -> Handshake
767configHandshake ConnectionConfig {..} = Handshake
768 { hsProtocol = prefProtocol cfgPrefs
769 , hsReserved = prefCaps cfgPrefs
770 , hsInfoHash = sessionTopic cfgSession
771 , hsPeerId = sessionPeerId cfgSession
772 }
773
774{-----------------------------------------------------------------------
775-- Pending connections
776-----------------------------------------------------------------------}
777
778-- | Connection in half opened state. A normal usage scenario:
676-- 779--
677-- This function can throw 'WireFailure' exception. 780-- * Opened using 'newPendingConnection', usually in the listener
781-- loop;
678-- 782--
679connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message 783-- * Closed using 'closePending' if 'pendingPeer' is banned,
680 -> Wire s () -> IO () 784-- 'pendingCaps' is prohibited or pendingTopic is unknown;
681connectWire session hs addr extCaps chan wire = do 785--
682 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return 786-- * Accepted using 'acceptWire' otherwise.
683 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do 787--
684 hs' <- initiateHandshake sock hs 788data PendingConnection = PendingConnection
789 { pendingSock :: Socket
790 , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty;
791 , pendingCaps :: Caps -- ^ advertised by the peer;
792 , pendingTopic :: InfoHash -- ^ possible non-existent topic.
793 }
685 794
686 Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [ 795-- | Reconstruct handshake sent by the remote peer.
687 (def == hsProtocol hs' 796pendingHandshake :: PendingConnection -> Handshake
688 , InvalidProtocol $ hsProtocol hs'), 797pendingHandshake PendingConnection {..} = Handshake
689 (hsProtocol hs == hsProtocol hs' 798 { hsProtocol = def
690 , UnexpectedProtocol $ hsProtocol hs'), 799 , hsReserved = pendingCaps
691 (hsInfoHash hs == hsInfoHash hs' 800 , hsInfoHash = pendingTopic
692 , UnexpectedTopic $ hsInfoHash hs'), 801 , hsPeerId = fromMaybe (error "pendingHandshake: impossible")
693 (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) 802 (peerId pendingPeer)
694 , UnexpectedPeerId $ hsPeerId hs') 803 }
695 ]
696
697 let caps = hsReserved hs <> hsReserved hs'
698 wire' = if ExtExtended `allowed` caps
699 then extendedHandshake extCaps >> wire
700 else wire
701 804
702 cstate <- newIORef $ ConnectionState { 805-- |
703 _connExtCaps = def 806--
704 , _connRemoteEhs = def 807-- This function can throw 'WireFailure' exception.
705 , _connStats = ConnectionStats { 808--
706 outcomingFlow = FlowStats 1 $ handshakeStats hs 809newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection
707 , incomingFlow = FlowStats 1 $ handshakeStats hs' 810newPendingConnection sock addr = do
708 } 811 Handshake {..} <- recvHandshake sock
709 , _connStatus = def 812 unless (hsProtocol == def) $ do
710 , _connBitfield = BF.haveNone 0 813 throwIO $ ProtocolError $ InvalidProtocol hsProtocol
711 } 814 return PendingConnection
815 { pendingSock = sock
816 , pendingPeer = addr { peerId = Just hsPeerId }
817 , pendingCaps = hsReserved
818 , pendingTopic = hsInfoHash
819 }
712 820
713 -- TODO make KA interval configurable 821-- | Release all resources associated with the given connection. Note
714 let kaInterval = defaultKeepAliveInterval 822-- that you /must not/ 'closePending' if you 'acceptWire'.
715 bracket 823closePending :: PendingConnection -> IO ()
716 (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) 824closePending PendingConnection {..} = do
717 (killThread) $ \ _ -> 825 close pendingSock
718 runWire wire' sock chan $ Connection 826
719 { connRemoteAddr = addr 827{-----------------------------------------------------------------------
828-- Connection setup
829-----------------------------------------------------------------------}
830
831chanToSock :: Int -> Chan Message -> Socket -> IO ()
832chanToSock ka chan sock =
833 sourceChan ka chan $= conduitPut S.put $$ sinkSocket sock
834
835afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair
836 -> ConnectionConfig s -> IO ()
837afterHandshaking initiator addr sock
838 hpair @ (HandshakePair hs hs')
839 (ConnectionConfig
840 { cfgPrefs = ConnectionPrefs {..}
841 , cfgSession = ConnectionSession {..}
842 , cfgWire = wire
843 }) = do
844 let caps = hsReserved hs <> hsReserved hs'
845 cstate <- newIORef def { _connStats = establishedStats hpair }
846 chan <- maybe newChan return outputChan
847 let conn = Connection {
848 connInitiatedBy = initiator
849 , connRemoteAddr = addr
720 , connProtocol = hsProtocol hs 850 , connProtocol = hsProtocol hs
721 , connCaps = caps 851 , connCaps = caps
722 , connTopic = hsInfoHash hs 852 , connTopic = hsInfoHash hs
@@ -724,20 +854,53 @@ connectWire session hs addr extCaps chan wire = do
724 , connThisPeerId = hsPeerId hs 854 , connThisPeerId = hsPeerId hs
725 , connOptions = def 855 , connOptions = def
726 , connState = cstate 856 , connState = cstate
727 , connSession = session 857 , connSession = connectionSession
728 , connChan = chan 858 , connChan = chan
729 } 859 }
730 860
861 -- TODO make KA interval configurable
862 let kaInterval = defaultKeepAliveInterval
863 wire' = if ExtExtended `allowed` caps
864 then extendedHandshake prefExtCaps >> wire
865 else wire
866
867 bracket (forkIO (chanToSock kaInterval chan sock))
868 (killThread)
869 (\ _ -> runWire wire' sock chan conn)
870
871-- | Initiate 'Wire' connection and handshake with a peer. This function will
872-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on
873-- both sides.
874--
875-- This function can throw 'WireFailure' exception.
876--
877connectWire :: PeerAddr IP -> ConnectionConfig s -> IO ()
878connectWire addr cfg = do
879 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return
880 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do
881 let hs = configHandshake cfg
882 hs' <- initiateHandshake sock hs
883 let hpair = HandshakePair hs hs'
884 validatePair hpair addr
885 afterHandshaking ThisPeer addr sock hpair cfg
886
731-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed 887-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
732-- socket. For peer listener loop the 'acceptSafe' should be 888-- socket. For peer listener loop the 'acceptSafe' should be
733-- prefered against 'accept'. The socket will be closed at exit. 889-- prefered against 'accept'. The socket will be closed at exit.
734-- 890--
735-- This function can throw 'WireFailure' exception. 891-- This function can throw 'WireFailure' exception.
736-- 892--
737acceptWire :: Socket -> PeerAddr IP -> Wire s () -> IO () 893acceptWire :: PendingConnection -> ConnectionConfig s -> IO ()
738acceptWire sock peerAddr wire = do 894acceptWire pc @ PendingConnection {..} cfg = do
739 bracket (return sock) close $ \ _ -> do 895 bracket (return pendingSock) close $ \ _ -> do
740 error "acceptWire: not implemented" 896 unless (sessionTopic (cfgSession cfg) == pendingTopic) $ do
897 throwIO (ProtocolError (UnexpectedTopic pendingTopic))
898
899 let hs = configHandshake cfg
900 sendHandshake pendingSock hs
901 let hpair = HandshakePair hs (pendingHandshake pc)
902
903 afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg
741 904
742-- | Used when size of bitfield becomes known. 905-- | Used when size of bitfield becomes known.
743resizeBitfield :: Int -> Connected s () 906resizeBitfield :: Int -> Connected s ()