summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Exchange')
-rw-r--r--src/Network/BitTorrent/Exchange/Bus.hs63
-rw-r--r--src/Network/BitTorrent/Exchange/Message.hs2
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs147
3 files changed, 211 insertions, 1 deletions
diff --git a/src/Network/BitTorrent/Exchange/Bus.hs b/src/Network/BitTorrent/Exchange/Bus.hs
new file mode 100644
index 00000000..4800c4a0
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Bus.hs
@@ -0,0 +1,63 @@
1module Network.BitTorrent.Exchange.Bus ( ) where
2
3type PeerWire = ConduitM Message Message IO
4
5runPeerWire :: Socket -> PeerWire () -> IO ()
6runPeerWire sock action =
7 sourceSocket sock $=
8 S.conduitGet S.get $=
9-- B.conduitDecode $=
10 action $=
11 S.conduitPut S.put $$
12-- B.conduitEncode $$
13 sinkSocket sock
14
15awaitMessage :: P2P Message
16awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go
17 where
18 go = await >>= maybe (monadThrow PeerDisconnected) return
19{-# INLINE awaitMessage #-}
20
21yieldMessage :: Message -> P2P ()
22yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do
23 C.yield msg
24{-# INLINE yieldMessage #-}
25
26-- TODO send vectored
27flushPending :: P2P ()
28flushPending = {-# SCC flushPending #-} do
29 session <- ask
30 queue <- liftIO (getPending session)
31 mapM_ yieldMessage queue
32
33{-----------------------------------------------------------------------
34 P2P monad
35-----------------------------------------------------------------------}
36
37-- |
38-- Exceptions:
39--
40-- * SessionException: is visible only within one peer
41-- session. Use this exception to terminate P2P session, but not
42-- the swarm session.
43--
44newtype P2P a = P2P {
45 unP2P :: ReaderT PeerSession PeerWire a
46 } deriving ( Functor, Applicative, Monad
47 , MonadIO, MonadThrow, MonadActive
48 , MonadReader PeerSession
49 )
50
51instance MonadState SessionState P2P where
52 get = asks sessionState >>= liftIO . readIORef
53 {-# INLINE get #-}
54 put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s
55 {-# INLINE put #-}
56
57runP2P :: (Socket, PeerSession) -> P2P () -> IO ()
58runP2P (sock, ses) action =
59 handle isIOException $
60 runPeerWire sock (runReaderT (unP2P action) ses)
61 where
62 isIOException :: IOException -> IO ()
63 isIOException _ = return ()
diff --git a/src/Network/BitTorrent/Exchange/Message.hs b/src/Network/BitTorrent/Exchange/Message.hs
index 4ef7baf3..4d4a97e2 100644
--- a/src/Network/BitTorrent/Exchange/Message.hs
+++ b/src/Network/BitTorrent/Exchange/Message.hs
@@ -27,7 +27,7 @@
27-- 27--
28{-# LANGUAGE TemplateHaskell #-} 28{-# LANGUAGE TemplateHaskell #-}
29{-# OPTIONS -fno-warn-orphans #-} 29{-# OPTIONS -fno-warn-orphans #-}
30module Network.BitTorrent.Exchange.Protocol 30module Network.BitTorrent.Exchange.Message
31 ( -- * Initial handshake 31 ( -- * Initial handshake
32 Handshake(..) 32 Handshake(..)
33 , handshake 33 , handshake
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
new file mode 100644
index 00000000..ffc7816e
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -0,0 +1,147 @@
1{-# LANGUAGE TemplateHaskell #-}
2{-# LANGUAGE DeriveDataTypeable #-}
3module Network.BitTorrent.Exchange.Session
4 (
5 ) where
6
7import Control.Concurrent.STM
8import Control.Exception
9import Control.Lens
10import Data.Function
11import Data.IORef
12import Data.Ord
13import Data.Typeable
14import Text.PrettyPrint
15
16import Data.Torrent.Bitfield
17import Data.Torrent.InfoHash
18import Network.BitTorrent.Core.PeerAddr
19import Network.BitTorrent.Exchange.Message
20import Network.BitTorrent.Exchange.Status
21
22
23type Extension = ()
24
25-- | Peer session contain all data necessary for peer to peer
26-- communication.
27data ExchangeSession = ExchangeSession
28 { -- | Used as unique identifier of the session.
29 connectedPeerAddr :: !PeerAddr
30
31 -- | Extensions such that both peer and client support.
32 , enabledExtensions :: [Extension]
33
34 -- | Broadcast messages waiting to be sent to peer.
35 , pendingMessages :: !(TChan Message)
36
37 -- | Dymanic P2P data.
38 , sessionState :: !(IORef SessionState)
39 }
40
41instance Eq ExchangeSession where
42 (==) = (==) `on` connectedPeerAddr
43 {-# INLINE (==) #-}
44
45instance Ord ExchangeSession where
46 compare = comparing connectedPeerAddr
47 {-# INLINE compare #-}
48
49enqueueBroadcast :: ExchangeSession -> Message -> IO ()
50enqueueBroadcast = undefined
51
52dequeueBroadcast :: ExchangeSession -> IO Message
53dequeueBroadcast = undefined
54
55{-----------------------------------------------------------------------
56-- Session state
57-----------------------------------------------------------------------}
58
59data SessionState = SessionState
60 { _bitfield :: !Bitfield -- ^ Other peer Have bitfield.
61 , _status :: !SessionStatus -- ^ Status of both peers.
62 } deriving (Show, Eq)
63
64$(makeLenses ''SessionState)
65
66--initialSessionState :: PieceCount -> SessionState
67--initialSessionState pc = SessionState (haveNone pc) def
68
69--getSessionState :: PeerSession -> IO SessionState
70--getSessionState PeerSession {..} = readIORef sessionState
71
72{-----------------------------------------------------------------------
73-- Exceptions
74-----------------------------------------------------------------------}
75
76-- | Exceptions used to interrupt the current P2P session. This
77-- exceptions will NOT affect other P2P sessions, DHT, peer <->
78-- tracker, or any other session.
79--
80data ExchangeFailure
81 = PeerDisconnected
82 | ProtocolError Doc
83 | UnknownTorrent InfoHash
84 deriving (Show, Typeable)
85
86instance Exception ExchangeFailure
87
88-- | Do nothing with exception, used with 'handle' or 'try'.
89isSessionException :: Monad m => ExchangeFailure -> m ()
90isSessionException _ = return ()
91
92-- | The same as 'isSessionException' but output to stdout the catched
93-- exception, for debugging purposes only.
94putSessionException :: ExchangeFailure -> IO ()
95putSessionException = print
96{-
97{-----------------------------------------------------------------------
98-- Broadcasting: Have, Cancel, Bitfield, SuggestPiece
99-----------------------------------------------------------------------}
100{-
101Here we should enqueue broadcast messages and keep in mind that:
102 * We should enqueue broadcast events as they are appear.
103 * We should yield broadcast messages as fast as we get them.
104
105these 2 phases might differ in time significantly
106
107**TODO**: do this; but only when it'll be clean which other broadcast
108messages & events we should send.
109
1101. Update client have bitfield --\____ in one transaction;
1112. Update downloaded stats --/
1123. Signal to the all other peer about this.
113-}
114
115available :: Bitfield -> SwarmSession -> STM ()
116available bf SwarmSession {..} = {-# SCC available #-} do
117 updateProgress >> broadcast
118 where
119 updateProgress = do
120 let piLen = ciPieceLength $ tInfo $ torrentMeta
121 let bytes = piLen * BF.haveCount bf
122 modifyTVar' (currentProgress clientSession) (downloadedProgress bytes)
123
124 broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf)
125
126-- TODO compute size of messages: if it's faster to send Bitfield
127-- instead many Have do that
128
129-- Also if there is single Have message in queue then the
130-- corresponding piece is likely still in memory or disc cache,
131-- when we can send SuggestPiece.
132
133readAvail :: TChan a -> STM [a]
134readAvail chan = do
135 m <- tryReadTChan chan
136 case m of
137 Just a -> (:) <$> pure a <*> readAvail chan
138 Nothing -> return []
139
140-- | Get pending messages queue appeared in result of asynchronously
141-- changed client state. Resulting queue should be sent to a peer
142-- immediately.
143--
144getPending :: PeerSession -> IO [Message]
145getPending PeerSession {..} = {-# SCC getPending #-} do
146 atomically (readAvail pendingMessages)
147-} \ No newline at end of file