summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2013-12-05 03:26:56 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2013-12-05 03:26:56 +0400
commit32b0f3570237e4d4742fc8874980f2b479c1ae75 (patch)
treeace6425ecb6170679c36327bbf3694d80d472574 /src/Network/BitTorrent/Exchange
parent1e8a6a7d5267811d035afda764e90092eb0e994c (diff)
Add Wire module
Diffstat (limited to 'src/Network/BitTorrent/Exchange')
-rw-r--r--src/Network/BitTorrent/Exchange/Bus.hs66
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs29
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs205
3 files changed, 210 insertions, 90 deletions
diff --git a/src/Network/BitTorrent/Exchange/Bus.hs b/src/Network/BitTorrent/Exchange/Bus.hs
deleted file mode 100644
index 7de91180..00000000
--- a/src/Network/BitTorrent/Exchange/Bus.hs
+++ /dev/null
@@ -1,66 +0,0 @@
1module Network.BitTorrent.Exchange.Bus ( ) where
2
3type PeerWire = ConduitM Message Message IO
4
5runPeerWire :: Socket -> PeerWire () -> IO ()
6runPeerWire sock action =
7 sourceSocket sock $=
8 S.conduitGet S.get $=
9-- B.conduitDecode $=
10 action $=
11 S.conduitPut S.put $$
12-- B.conduitEncode $$
13 sinkSocket sock
14
15awaitMessage :: P2P Message
16awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go
17 where
18 go = await >>= maybe (monadThrow PeerDisconnected) return
19{-# INLINE awaitMessage #-}
20
21yieldMessage :: Message -> P2P ()
22yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do
23 C.yield msg
24{-# INLINE yieldMessage #-}
25
26-- TODO send vectored
27flushPending :: P2P ()
28flushPending = {-# SCC flushPending #-} do
29 session <- ask
30 queue <- liftIO (getPending session)
31 mapM_ yieldMessage queue
32
33{-----------------------------------------------------------------------
34 P2P monad
35-----------------------------------------------------------------------}
36
37filterMeaninless :: P2P Message Message
38filterMeaninless = undefined
39
40-- |
41-- Exceptions:
42--
43-- * SessionException: is visible only within one peer
44-- session. Use this exception to terminate P2P session, but not
45-- the swarm session.
46--
47newtype P2P a = P2P {
48 unP2P :: ReaderT PeerSession PeerWire a
49 } deriving ( Functor, Applicative, Monad
50 , MonadIO, MonadThrow, MonadActive
51 , MonadReader PeerSession
52 )
53
54instance MonadState SessionState P2P where
55 get = asks sessionState >>= liftIO . readIORef
56 {-# INLINE get #-}
57 put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s
58 {-# INLINE put #-}
59
60runP2P :: (Socket, PeerSession) -> P2P () -> IO ()
61runP2P (sock, ses) action =
62 handle isIOException $
63 runPeerWire sock (runReaderT (unP2P action) ses)
64 where
65 isIOException :: IOException -> IO ()
66 isIOException _ = return ()
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index ffc7816e..d2a9aaaf 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -22,6 +22,11 @@ import Network.BitTorrent.Exchange.Status
22 22
23type Extension = () 23type Extension = ()
24 24
25data ExchangeError
26 = InvalidPieceIx PieceIx
27 | InvalidBlock BlockIx
28 | CorruptedPiece PieceIx
29
25-- | Peer session contain all data necessary for peer to peer 30-- | Peer session contain all data necessary for peer to peer
26-- communication. 31-- communication.
27data ExchangeSession = ExchangeSession 32data ExchangeSession = ExchangeSession
@@ -69,30 +74,6 @@ $(makeLenses ''SessionState)
69--getSessionState :: PeerSession -> IO SessionState 74--getSessionState :: PeerSession -> IO SessionState
70--getSessionState PeerSession {..} = readIORef sessionState 75--getSessionState PeerSession {..} = readIORef sessionState
71 76
72{-----------------------------------------------------------------------
73-- Exceptions
74-----------------------------------------------------------------------}
75
76-- | Exceptions used to interrupt the current P2P session. This
77-- exceptions will NOT affect other P2P sessions, DHT, peer <->
78-- tracker, or any other session.
79--
80data ExchangeFailure
81 = PeerDisconnected
82 | ProtocolError Doc
83 | UnknownTorrent InfoHash
84 deriving (Show, Typeable)
85
86instance Exception ExchangeFailure
87
88-- | Do nothing with exception, used with 'handle' or 'try'.
89isSessionException :: Monad m => ExchangeFailure -> m ()
90isSessionException _ = return ()
91
92-- | The same as 'isSessionException' but output to stdout the catched
93-- exception, for debugging purposes only.
94putSessionException :: ExchangeFailure -> IO ()
95putSessionException = print
96{- 77{-
97{----------------------------------------------------------------------- 78{-----------------------------------------------------------------------
98-- Broadcasting: Have, Cancel, Bitfield, SuggestPiece 79-- Broadcasting: Have, Cancel, Bitfield, SuggestPiece
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
new file mode 100644
index 00000000..dd77a915
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -0,0 +1,205 @@
1-- |
2--
3-- Message flow
4-- Duplex channell
5-- This module control /integrity/ of data send and received.
6--
7
8{-# LANGUAGE DeriveDataTypeable #-}
9module Network.BitTorrent.Exchange.Wire
10 ( -- * Exception
11 ProtocolError (..)
12 , WireFailure (..)
13 , isWireFailure
14
15 -- * Wire
16 , Connection (..)
17 , Wire
18 , runWire
19 , connectWire
20 , acceptWire
21 ) where
22
23import Control.Exception
24import Control.Monad.Reader
25import Data.ByteString as BS
26import Data.Conduit
27import Data.Conduit.Cereal as S
28import Data.Conduit.Network
29import Data.Default
30import Data.Maybe
31import Data.Monoid
32import Data.Serialize as S
33import Data.Typeable
34import Network
35import Network.Socket
36import Network.Socket.ByteString as BS
37import Text.PrettyPrint as PP hiding (($$), (<>))
38import Text.PrettyPrint.Class
39
40import Data.Torrent.InfoHash
41import Network.BitTorrent.Core.PeerId
42import Network.BitTorrent.Core.PeerAddr
43import Network.BitTorrent.Exchange.Message
44
45
46{-----------------------------------------------------------------------
47-- Exceptions
48-----------------------------------------------------------------------}
49
50data ChannelSide
51 = ThisPeer
52 | RemotePeer
53 deriving (Show, Eq, Enum)
54
55-- | Errors occur when a remote peer violates protocol specification.
56data ProtocolError
57 = UnexpectedTopic InfoHash -- ^ peer replied with unexpected infohash.
58 | UnexpectedPeerId PeerId -- ^ peer replied with unexpected peer id.
59 | UnknownTopic InfoHash -- ^ peer requested unknown torrent.
60 | InvalidMessage
61 { violentSender :: ChannelSide -- ^ endpoint sent invalid message
62 , extensionRequired :: Extension -- ^
63 }
64 deriving Show
65
66instance Pretty ProtocolError where
67 pretty = PP.text . show
68
69-- | Exceptions used to interrupt the current P2P session.
70data WireFailure
71 = PeerDisconnected -- ^ A peer not responding.
72 | DisconnectPeer -- ^
73 | ProtocolError ProtocolError
74 deriving (Show, Typeable)
75
76instance Exception WireFailure
77
78instance Pretty WireFailure where
79 pretty = PP.text . show
80
81-- | Do nothing with exception, used with 'handle' or 'try'.
82isWireFailure :: Monad m => WireFailure -> m ()
83isWireFailure _ = return ()
84
85{-----------------------------------------------------------------------
86-- Connection
87-----------------------------------------------------------------------}
88
89data Connection = Connection
90 { connCaps :: !Caps
91 , connExtCaps :: !ExtendedCaps -- TODO caps can be enabled during communication
92 , connTopic :: !InfoHash
93 , connRemotePeerId :: !PeerId
94 , connThisPeerId :: !PeerId
95 } deriving Show
96
97instance Pretty Connection where
98 pretty Connection {..} = "Connection"
99
100isAllowed :: Connection -> Message -> Bool
101isAllowed Connection {..} msg
102 | Just ext <- requires msg = allowed connCaps ext
103 | otherwise = True
104
105{-----------------------------------------------------------------------
106-- Hanshaking
107-----------------------------------------------------------------------}
108
109-- | TODO remove socket stuff to corresponding module
110sendHandshake :: Socket -> Handshake -> IO ()
111sendHandshake sock hs = sendAll sock (S.encode hs)
112
113recvHandshake :: Socket -> IO Handshake
114recvHandshake sock = do
115 header <- BS.recv sock 1
116 unless (BS.length header == 1) $
117 throw $ userError "Unable to receive handshake header."
118
119 let protocolLen = BS.head header
120 let restLen = handshakeSize protocolLen - 1
121
122 body <- BS.recv sock restLen
123 let resp = BS.cons protocolLen body
124 either (throwIO . userError) return $ S.decode resp
125
126-- | Handshaking with a peer specified by the second argument.
127--
128-- It's important to send handshake first because /accepting/ peer
129-- do not know handshake topic and will wait until /connecting/ peer
130-- will send handshake.
131--
132initiateHandshake :: Socket -> Handshake -> IO Handshake
133initiateHandshake sock hs = do
134 sendHandshake sock hs
135 recvHandshake sock
136
137-- | Tries to connect to peer using reasonable default parameters.
138connectToPeer :: PeerAddr -> IO Socket
139connectToPeer p = do
140 sock <- socket AF_INET Stream Network.Socket.defaultProtocol
141 connect sock (peerSockAddr p)
142 return sock
143
144{-----------------------------------------------------------------------
145-- Wire
146-----------------------------------------------------------------------}
147
148type Wire = ConduitM Message Message (ReaderT Connection IO)
149
150validate :: Wire ()
151validate = do
152 mmsg <- await
153 case mmsg of
154 Nothing -> return ()
155 Just msg -> do
156 valid <- lift $ asks (`isAllowed` msg)
157 if valid then yield msg else error "TODO"
158
159
160runWire :: Wire () -> Socket -> Connection -> IO ()
161runWire action sock = runReaderT $
162 sourceSocket sock $=
163 S.conduitGet S.get $=
164 action $=
165 S.conduitPut S.put $$
166 sinkSocket sock
167
168sendMessage :: PeerMessage msg => msg -> Wire ()
169sendMessage msg = do
170 ecaps <- lift $ asks connExtCaps
171 yield $ envelop ecaps msg
172
173recvMessage :: Wire Message
174recvMessage = undefined
175
176
177
178extendedHandshake :: Wire ()
179extendedHandshake = undefined
180
181connectWire :: Handshake -> PeerAddr -> ExtendedCaps -> Wire () -> IO ()
182connectWire hs addr caps wire =
183 bracket (connectToPeer addr) close $ \ sock -> do
184 hs' <- initiateHandshake sock hs
185
186 unless (hsInfoHash hs == hsInfoHash hs') $ do
187 throwIO $ ProtocolError $ UnexpectedTopic (hsInfoHash hs')
188
189 unless (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerID addr)) $ do
190 throwIO $ ProtocolError $ UnexpectedPeerId (hsPeerId hs')
191
192 let caps = hsReserved hs <> hsReserved hs'
193 if allowed caps ExtExtended
194 then return () else return ()
195
196 runWire wire sock $ Connection
197 { connCaps = caps
198 , connExtCaps = def
199 , connTopic = hsInfoHash hs
200 , connRemotePeerId = hsPeerId hs'
201 , connThisPeerId = hsPeerId hs
202 }
203
204acceptWire :: Wire () -> Socket -> IO ()
205acceptWire = undefined