diff options
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r-- | src/Network/BitTorrent/Core/PeerAddr.hs | 8 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 1 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Bus.hs | 66 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 29 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 205 |
5 files changed, 210 insertions, 99 deletions
diff --git a/src/Network/BitTorrent/Core/PeerAddr.hs b/src/Network/BitTorrent/Core/PeerAddr.hs index ed2dc672..81754e5e 100644 --- a/src/Network/BitTorrent/Core/PeerAddr.hs +++ b/src/Network/BitTorrent/Core/PeerAddr.hs | |||
@@ -18,7 +18,6 @@ module Network.BitTorrent.Core.PeerAddr | |||
18 | PeerAddr(..) | 18 | PeerAddr(..) |
19 | , defaultPorts | 19 | , defaultPorts |
20 | , peerSockAddr | 20 | , peerSockAddr |
21 | , connectToPeer | ||
22 | ) where | 21 | ) where |
23 | 22 | ||
24 | import Control.Applicative | 23 | import Control.Applicative |
@@ -116,10 +115,3 @@ peerSockAddr = SockAddrInet <$> (g . peerPort) <*> (htonl . peerIP) | |||
116 | 115 | ||
117 | g :: PortNumber -> PortNumber | 116 | g :: PortNumber -> PortNumber |
118 | g = id | 117 | g = id |
119 | |||
120 | -- | Tries to connect to peer using reasonable default parameters. | ||
121 | connectToPeer :: PeerAddr -> IO Socket | ||
122 | connectToPeer p = do | ||
123 | sock <- socket AF_INET Stream Network.Socket.defaultProtocol | ||
124 | connect sock (peerSockAddr p) | ||
125 | return sock | ||
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 57b2c81f..c1377449 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -84,7 +84,6 @@ import Control.Monad.Trans.Resource | |||
84 | import Data.IORef | 84 | import Data.IORef |
85 | import Data.Conduit as C | 85 | import Data.Conduit as C |
86 | import Data.Conduit.Cereal as S | 86 | import Data.Conduit.Cereal as S |
87 | --import Data.Conduit.Serialization.Binary as B | ||
88 | import Data.Conduit.Network | 87 | import Data.Conduit.Network |
89 | import Data.Serialize as S | 88 | import Data.Serialize as S |
90 | import Text.PrettyPrint as PP hiding (($$)) | 89 | import Text.PrettyPrint as PP hiding (($$)) |
diff --git a/src/Network/BitTorrent/Exchange/Bus.hs b/src/Network/BitTorrent/Exchange/Bus.hs deleted file mode 100644 index 7de91180..00000000 --- a/src/Network/BitTorrent/Exchange/Bus.hs +++ /dev/null | |||
@@ -1,66 +0,0 @@ | |||
1 | module Network.BitTorrent.Exchange.Bus ( ) where | ||
2 | |||
3 | type PeerWire = ConduitM Message Message IO | ||
4 | |||
5 | runPeerWire :: Socket -> PeerWire () -> IO () | ||
6 | runPeerWire sock action = | ||
7 | sourceSocket sock $= | ||
8 | S.conduitGet S.get $= | ||
9 | -- B.conduitDecode $= | ||
10 | action $= | ||
11 | S.conduitPut S.put $$ | ||
12 | -- B.conduitEncode $$ | ||
13 | sinkSocket sock | ||
14 | |||
15 | awaitMessage :: P2P Message | ||
16 | awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go | ||
17 | where | ||
18 | go = await >>= maybe (monadThrow PeerDisconnected) return | ||
19 | {-# INLINE awaitMessage #-} | ||
20 | |||
21 | yieldMessage :: Message -> P2P () | ||
22 | yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do | ||
23 | C.yield msg | ||
24 | {-# INLINE yieldMessage #-} | ||
25 | |||
26 | -- TODO send vectored | ||
27 | flushPending :: P2P () | ||
28 | flushPending = {-# SCC flushPending #-} do | ||
29 | session <- ask | ||
30 | queue <- liftIO (getPending session) | ||
31 | mapM_ yieldMessage queue | ||
32 | |||
33 | {----------------------------------------------------------------------- | ||
34 | P2P monad | ||
35 | -----------------------------------------------------------------------} | ||
36 | |||
37 | filterMeaninless :: P2P Message Message | ||
38 | filterMeaninless = undefined | ||
39 | |||
40 | -- | | ||
41 | -- Exceptions: | ||
42 | -- | ||
43 | -- * SessionException: is visible only within one peer | ||
44 | -- session. Use this exception to terminate P2P session, but not | ||
45 | -- the swarm session. | ||
46 | -- | ||
47 | newtype P2P a = P2P { | ||
48 | unP2P :: ReaderT PeerSession PeerWire a | ||
49 | } deriving ( Functor, Applicative, Monad | ||
50 | , MonadIO, MonadThrow, MonadActive | ||
51 | , MonadReader PeerSession | ||
52 | ) | ||
53 | |||
54 | instance MonadState SessionState P2P where | ||
55 | get = asks sessionState >>= liftIO . readIORef | ||
56 | {-# INLINE get #-} | ||
57 | put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s | ||
58 | {-# INLINE put #-} | ||
59 | |||
60 | runP2P :: (Socket, PeerSession) -> P2P () -> IO () | ||
61 | runP2P (sock, ses) action = | ||
62 | handle isIOException $ | ||
63 | runPeerWire sock (runReaderT (unP2P action) ses) | ||
64 | where | ||
65 | isIOException :: IOException -> IO () | ||
66 | isIOException _ = return () | ||
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index ffc7816e..d2a9aaaf 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -22,6 +22,11 @@ import Network.BitTorrent.Exchange.Status | |||
22 | 22 | ||
23 | type Extension = () | 23 | type Extension = () |
24 | 24 | ||
25 | data ExchangeError | ||
26 | = InvalidPieceIx PieceIx | ||
27 | | InvalidBlock BlockIx | ||
28 | | CorruptedPiece PieceIx | ||
29 | |||
25 | -- | Peer session contain all data necessary for peer to peer | 30 | -- | Peer session contain all data necessary for peer to peer |
26 | -- communication. | 31 | -- communication. |
27 | data ExchangeSession = ExchangeSession | 32 | data ExchangeSession = ExchangeSession |
@@ -69,30 +74,6 @@ $(makeLenses ''SessionState) | |||
69 | --getSessionState :: PeerSession -> IO SessionState | 74 | --getSessionState :: PeerSession -> IO SessionState |
70 | --getSessionState PeerSession {..} = readIORef sessionState | 75 | --getSessionState PeerSession {..} = readIORef sessionState |
71 | 76 | ||
72 | {----------------------------------------------------------------------- | ||
73 | -- Exceptions | ||
74 | -----------------------------------------------------------------------} | ||
75 | |||
76 | -- | Exceptions used to interrupt the current P2P session. This | ||
77 | -- exceptions will NOT affect other P2P sessions, DHT, peer <-> | ||
78 | -- tracker, or any other session. | ||
79 | -- | ||
80 | data ExchangeFailure | ||
81 | = PeerDisconnected | ||
82 | | ProtocolError Doc | ||
83 | | UnknownTorrent InfoHash | ||
84 | deriving (Show, Typeable) | ||
85 | |||
86 | instance Exception ExchangeFailure | ||
87 | |||
88 | -- | Do nothing with exception, used with 'handle' or 'try'. | ||
89 | isSessionException :: Monad m => ExchangeFailure -> m () | ||
90 | isSessionException _ = return () | ||
91 | |||
92 | -- | The same as 'isSessionException' but output to stdout the catched | ||
93 | -- exception, for debugging purposes only. | ||
94 | putSessionException :: ExchangeFailure -> IO () | ||
95 | putSessionException = print | ||
96 | {- | 77 | {- |
97 | {----------------------------------------------------------------------- | 78 | {----------------------------------------------------------------------- |
98 | -- Broadcasting: Have, Cancel, Bitfield, SuggestPiece | 79 | -- Broadcasting: Have, Cancel, Bitfield, SuggestPiece |
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs new file mode 100644 index 00000000..dd77a915 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -0,0 +1,205 @@ | |||
1 | -- | | ||
2 | -- | ||
3 | -- Message flow | ||
4 | -- Duplex channell | ||
5 | -- This module control /integrity/ of data send and received. | ||
6 | -- | ||
7 | |||
8 | {-# LANGUAGE DeriveDataTypeable #-} | ||
9 | module Network.BitTorrent.Exchange.Wire | ||
10 | ( -- * Exception | ||
11 | ProtocolError (..) | ||
12 | , WireFailure (..) | ||
13 | , isWireFailure | ||
14 | |||
15 | -- * Wire | ||
16 | , Connection (..) | ||
17 | , Wire | ||
18 | , runWire | ||
19 | , connectWire | ||
20 | , acceptWire | ||
21 | ) where | ||
22 | |||
23 | import Control.Exception | ||
24 | import Control.Monad.Reader | ||
25 | import Data.ByteString as BS | ||
26 | import Data.Conduit | ||
27 | import Data.Conduit.Cereal as S | ||
28 | import Data.Conduit.Network | ||
29 | import Data.Default | ||
30 | import Data.Maybe | ||
31 | import Data.Monoid | ||
32 | import Data.Serialize as S | ||
33 | import Data.Typeable | ||
34 | import Network | ||
35 | import Network.Socket | ||
36 | import Network.Socket.ByteString as BS | ||
37 | import Text.PrettyPrint as PP hiding (($$), (<>)) | ||
38 | import Text.PrettyPrint.Class | ||
39 | |||
40 | import Data.Torrent.InfoHash | ||
41 | import Network.BitTorrent.Core.PeerId | ||
42 | import Network.BitTorrent.Core.PeerAddr | ||
43 | import Network.BitTorrent.Exchange.Message | ||
44 | |||
45 | |||
46 | {----------------------------------------------------------------------- | ||
47 | -- Exceptions | ||
48 | -----------------------------------------------------------------------} | ||
49 | |||
50 | data ChannelSide | ||
51 | = ThisPeer | ||
52 | | RemotePeer | ||
53 | deriving (Show, Eq, Enum) | ||
54 | |||
55 | -- | Errors occur when a remote peer violates protocol specification. | ||
56 | data ProtocolError | ||
57 | = UnexpectedTopic InfoHash -- ^ peer replied with unexpected infohash. | ||
58 | | UnexpectedPeerId PeerId -- ^ peer replied with unexpected peer id. | ||
59 | | UnknownTopic InfoHash -- ^ peer requested unknown torrent. | ||
60 | | InvalidMessage | ||
61 | { violentSender :: ChannelSide -- ^ endpoint sent invalid message | ||
62 | , extensionRequired :: Extension -- ^ | ||
63 | } | ||
64 | deriving Show | ||
65 | |||
66 | instance Pretty ProtocolError where | ||
67 | pretty = PP.text . show | ||
68 | |||
69 | -- | Exceptions used to interrupt the current P2P session. | ||
70 | data WireFailure | ||
71 | = PeerDisconnected -- ^ A peer not responding. | ||
72 | | DisconnectPeer -- ^ | ||
73 | | ProtocolError ProtocolError | ||
74 | deriving (Show, Typeable) | ||
75 | |||
76 | instance Exception WireFailure | ||
77 | |||
78 | instance Pretty WireFailure where | ||
79 | pretty = PP.text . show | ||
80 | |||
81 | -- | Do nothing with exception, used with 'handle' or 'try'. | ||
82 | isWireFailure :: Monad m => WireFailure -> m () | ||
83 | isWireFailure _ = return () | ||
84 | |||
85 | {----------------------------------------------------------------------- | ||
86 | -- Connection | ||
87 | -----------------------------------------------------------------------} | ||
88 | |||
89 | data Connection = Connection | ||
90 | { connCaps :: !Caps | ||
91 | , connExtCaps :: !ExtendedCaps -- TODO caps can be enabled during communication | ||
92 | , connTopic :: !InfoHash | ||
93 | , connRemotePeerId :: !PeerId | ||
94 | , connThisPeerId :: !PeerId | ||
95 | } deriving Show | ||
96 | |||
97 | instance Pretty Connection where | ||
98 | pretty Connection {..} = "Connection" | ||
99 | |||
100 | isAllowed :: Connection -> Message -> Bool | ||
101 | isAllowed Connection {..} msg | ||
102 | | Just ext <- requires msg = allowed connCaps ext | ||
103 | | otherwise = True | ||
104 | |||
105 | {----------------------------------------------------------------------- | ||
106 | -- Hanshaking | ||
107 | -----------------------------------------------------------------------} | ||
108 | |||
109 | -- | TODO remove socket stuff to corresponding module | ||
110 | sendHandshake :: Socket -> Handshake -> IO () | ||
111 | sendHandshake sock hs = sendAll sock (S.encode hs) | ||
112 | |||
113 | recvHandshake :: Socket -> IO Handshake | ||
114 | recvHandshake sock = do | ||
115 | header <- BS.recv sock 1 | ||
116 | unless (BS.length header == 1) $ | ||
117 | throw $ userError "Unable to receive handshake header." | ||
118 | |||
119 | let protocolLen = BS.head header | ||
120 | let restLen = handshakeSize protocolLen - 1 | ||
121 | |||
122 | body <- BS.recv sock restLen | ||
123 | let resp = BS.cons protocolLen body | ||
124 | either (throwIO . userError) return $ S.decode resp | ||
125 | |||
126 | -- | Handshaking with a peer specified by the second argument. | ||
127 | -- | ||
128 | -- It's important to send handshake first because /accepting/ peer | ||
129 | -- do not know handshake topic and will wait until /connecting/ peer | ||
130 | -- will send handshake. | ||
131 | -- | ||
132 | initiateHandshake :: Socket -> Handshake -> IO Handshake | ||
133 | initiateHandshake sock hs = do | ||
134 | sendHandshake sock hs | ||
135 | recvHandshake sock | ||
136 | |||
137 | -- | Tries to connect to peer using reasonable default parameters. | ||
138 | connectToPeer :: PeerAddr -> IO Socket | ||
139 | connectToPeer p = do | ||
140 | sock <- socket AF_INET Stream Network.Socket.defaultProtocol | ||
141 | connect sock (peerSockAddr p) | ||
142 | return sock | ||
143 | |||
144 | {----------------------------------------------------------------------- | ||
145 | -- Wire | ||
146 | -----------------------------------------------------------------------} | ||
147 | |||
148 | type Wire = ConduitM Message Message (ReaderT Connection IO) | ||
149 | |||
150 | validate :: Wire () | ||
151 | validate = do | ||
152 | mmsg <- await | ||
153 | case mmsg of | ||
154 | Nothing -> return () | ||
155 | Just msg -> do | ||
156 | valid <- lift $ asks (`isAllowed` msg) | ||
157 | if valid then yield msg else error "TODO" | ||
158 | |||
159 | |||
160 | runWire :: Wire () -> Socket -> Connection -> IO () | ||
161 | runWire action sock = runReaderT $ | ||
162 | sourceSocket sock $= | ||
163 | S.conduitGet S.get $= | ||
164 | action $= | ||
165 | S.conduitPut S.put $$ | ||
166 | sinkSocket sock | ||
167 | |||
168 | sendMessage :: PeerMessage msg => msg -> Wire () | ||
169 | sendMessage msg = do | ||
170 | ecaps <- lift $ asks connExtCaps | ||
171 | yield $ envelop ecaps msg | ||
172 | |||
173 | recvMessage :: Wire Message | ||
174 | recvMessage = undefined | ||
175 | |||
176 | |||
177 | |||
178 | extendedHandshake :: Wire () | ||
179 | extendedHandshake = undefined | ||
180 | |||
181 | connectWire :: Handshake -> PeerAddr -> ExtendedCaps -> Wire () -> IO () | ||
182 | connectWire hs addr caps wire = | ||
183 | bracket (connectToPeer addr) close $ \ sock -> do | ||
184 | hs' <- initiateHandshake sock hs | ||
185 | |||
186 | unless (hsInfoHash hs == hsInfoHash hs') $ do | ||
187 | throwIO $ ProtocolError $ UnexpectedTopic (hsInfoHash hs') | ||
188 | |||
189 | unless (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerID addr)) $ do | ||
190 | throwIO $ ProtocolError $ UnexpectedPeerId (hsPeerId hs') | ||
191 | |||
192 | let caps = hsReserved hs <> hsReserved hs' | ||
193 | if allowed caps ExtExtended | ||
194 | then return () else return () | ||
195 | |||
196 | runWire wire sock $ Connection | ||
197 | { connCaps = caps | ||
198 | , connExtCaps = def | ||
199 | , connTopic = hsInfoHash hs | ||
200 | , connRemotePeerId = hsPeerId hs' | ||
201 | , connThisPeerId = hsPeerId hs | ||
202 | } | ||
203 | |||
204 | acceptWire :: Wire () -> Socket -> IO () | ||
205 | acceptWire = undefined | ||