diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Bus.hs | 63 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Message.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 147 |
3 files changed, 211 insertions, 1 deletions
diff --git a/src/Network/BitTorrent/Exchange/Bus.hs b/src/Network/BitTorrent/Exchange/Bus.hs new file mode 100644 index 00000000..4800c4a0 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Bus.hs | |||
@@ -0,0 +1,63 @@ | |||
1 | module Network.BitTorrent.Exchange.Bus ( ) where | ||
2 | |||
3 | type PeerWire = ConduitM Message Message IO | ||
4 | |||
5 | runPeerWire :: Socket -> PeerWire () -> IO () | ||
6 | runPeerWire sock action = | ||
7 | sourceSocket sock $= | ||
8 | S.conduitGet S.get $= | ||
9 | -- B.conduitDecode $= | ||
10 | action $= | ||
11 | S.conduitPut S.put $$ | ||
12 | -- B.conduitEncode $$ | ||
13 | sinkSocket sock | ||
14 | |||
15 | awaitMessage :: P2P Message | ||
16 | awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go | ||
17 | where | ||
18 | go = await >>= maybe (monadThrow PeerDisconnected) return | ||
19 | {-# INLINE awaitMessage #-} | ||
20 | |||
21 | yieldMessage :: Message -> P2P () | ||
22 | yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do | ||
23 | C.yield msg | ||
24 | {-# INLINE yieldMessage #-} | ||
25 | |||
26 | -- TODO send vectored | ||
27 | flushPending :: P2P () | ||
28 | flushPending = {-# SCC flushPending #-} do | ||
29 | session <- ask | ||
30 | queue <- liftIO (getPending session) | ||
31 | mapM_ yieldMessage queue | ||
32 | |||
33 | {----------------------------------------------------------------------- | ||
34 | P2P monad | ||
35 | -----------------------------------------------------------------------} | ||
36 | |||
37 | -- | | ||
38 | -- Exceptions: | ||
39 | -- | ||
40 | -- * SessionException: is visible only within one peer | ||
41 | -- session. Use this exception to terminate P2P session, but not | ||
42 | -- the swarm session. | ||
43 | -- | ||
44 | newtype P2P a = P2P { | ||
45 | unP2P :: ReaderT PeerSession PeerWire a | ||
46 | } deriving ( Functor, Applicative, Monad | ||
47 | , MonadIO, MonadThrow, MonadActive | ||
48 | , MonadReader PeerSession | ||
49 | ) | ||
50 | |||
51 | instance MonadState SessionState P2P where | ||
52 | get = asks sessionState >>= liftIO . readIORef | ||
53 | {-# INLINE get #-} | ||
54 | put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s | ||
55 | {-# INLINE put #-} | ||
56 | |||
57 | runP2P :: (Socket, PeerSession) -> P2P () -> IO () | ||
58 | runP2P (sock, ses) action = | ||
59 | handle isIOException $ | ||
60 | runPeerWire sock (runReaderT (unP2P action) ses) | ||
61 | where | ||
62 | isIOException :: IOException -> IO () | ||
63 | isIOException _ = return () | ||
diff --git a/src/Network/BitTorrent/Exchange/Message.hs b/src/Network/BitTorrent/Exchange/Message.hs index 4ef7baf3..4d4a97e2 100644 --- a/src/Network/BitTorrent/Exchange/Message.hs +++ b/src/Network/BitTorrent/Exchange/Message.hs | |||
@@ -27,7 +27,7 @@ | |||
27 | -- | 27 | -- |
28 | {-# LANGUAGE TemplateHaskell #-} | 28 | {-# LANGUAGE TemplateHaskell #-} |
29 | {-# OPTIONS -fno-warn-orphans #-} | 29 | {-# OPTIONS -fno-warn-orphans #-} |
30 | module Network.BitTorrent.Exchange.Protocol | 30 | module Network.BitTorrent.Exchange.Message |
31 | ( -- * Initial handshake | 31 | ( -- * Initial handshake |
32 | Handshake(..) | 32 | Handshake(..) |
33 | , handshake | 33 | , handshake |
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs new file mode 100644 index 00000000..ffc7816e --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -0,0 +1,147 @@ | |||
1 | {-# LANGUAGE TemplateHaskell #-} | ||
2 | {-# LANGUAGE DeriveDataTypeable #-} | ||
3 | module Network.BitTorrent.Exchange.Session | ||
4 | ( | ||
5 | ) where | ||
6 | |||
7 | import Control.Concurrent.STM | ||
8 | import Control.Exception | ||
9 | import Control.Lens | ||
10 | import Data.Function | ||
11 | import Data.IORef | ||
12 | import Data.Ord | ||
13 | import Data.Typeable | ||
14 | import Text.PrettyPrint | ||
15 | |||
16 | import Data.Torrent.Bitfield | ||
17 | import Data.Torrent.InfoHash | ||
18 | import Network.BitTorrent.Core.PeerAddr | ||
19 | import Network.BitTorrent.Exchange.Message | ||
20 | import Network.BitTorrent.Exchange.Status | ||
21 | |||
22 | |||
23 | type Extension = () | ||
24 | |||
25 | -- | Peer session contain all data necessary for peer to peer | ||
26 | -- communication. | ||
27 | data ExchangeSession = ExchangeSession | ||
28 | { -- | Used as unique identifier of the session. | ||
29 | connectedPeerAddr :: !PeerAddr | ||
30 | |||
31 | -- | Extensions such that both peer and client support. | ||
32 | , enabledExtensions :: [Extension] | ||
33 | |||
34 | -- | Broadcast messages waiting to be sent to peer. | ||
35 | , pendingMessages :: !(TChan Message) | ||
36 | |||
37 | -- | Dymanic P2P data. | ||
38 | , sessionState :: !(IORef SessionState) | ||
39 | } | ||
40 | |||
41 | instance Eq ExchangeSession where | ||
42 | (==) = (==) `on` connectedPeerAddr | ||
43 | {-# INLINE (==) #-} | ||
44 | |||
45 | instance Ord ExchangeSession where | ||
46 | compare = comparing connectedPeerAddr | ||
47 | {-# INLINE compare #-} | ||
48 | |||
49 | enqueueBroadcast :: ExchangeSession -> Message -> IO () | ||
50 | enqueueBroadcast = undefined | ||
51 | |||
52 | dequeueBroadcast :: ExchangeSession -> IO Message | ||
53 | dequeueBroadcast = undefined | ||
54 | |||
55 | {----------------------------------------------------------------------- | ||
56 | -- Session state | ||
57 | -----------------------------------------------------------------------} | ||
58 | |||
59 | data SessionState = SessionState | ||
60 | { _bitfield :: !Bitfield -- ^ Other peer Have bitfield. | ||
61 | , _status :: !SessionStatus -- ^ Status of both peers. | ||
62 | } deriving (Show, Eq) | ||
63 | |||
64 | $(makeLenses ''SessionState) | ||
65 | |||
66 | --initialSessionState :: PieceCount -> SessionState | ||
67 | --initialSessionState pc = SessionState (haveNone pc) def | ||
68 | |||
69 | --getSessionState :: PeerSession -> IO SessionState | ||
70 | --getSessionState PeerSession {..} = readIORef sessionState | ||
71 | |||
72 | {----------------------------------------------------------------------- | ||
73 | -- Exceptions | ||
74 | -----------------------------------------------------------------------} | ||
75 | |||
76 | -- | Exceptions used to interrupt the current P2P session. This | ||
77 | -- exceptions will NOT affect other P2P sessions, DHT, peer <-> | ||
78 | -- tracker, or any other session. | ||
79 | -- | ||
80 | data ExchangeFailure | ||
81 | = PeerDisconnected | ||
82 | | ProtocolError Doc | ||
83 | | UnknownTorrent InfoHash | ||
84 | deriving (Show, Typeable) | ||
85 | |||
86 | instance Exception ExchangeFailure | ||
87 | |||
88 | -- | Do nothing with exception, used with 'handle' or 'try'. | ||
89 | isSessionException :: Monad m => ExchangeFailure -> m () | ||
90 | isSessionException _ = return () | ||
91 | |||
92 | -- | The same as 'isSessionException' but output to stdout the catched | ||
93 | -- exception, for debugging purposes only. | ||
94 | putSessionException :: ExchangeFailure -> IO () | ||
95 | putSessionException = print | ||
96 | {- | ||
97 | {----------------------------------------------------------------------- | ||
98 | -- Broadcasting: Have, Cancel, Bitfield, SuggestPiece | ||
99 | -----------------------------------------------------------------------} | ||
100 | {- | ||
101 | Here we should enqueue broadcast messages and keep in mind that: | ||
102 | * We should enqueue broadcast events as they are appear. | ||
103 | * We should yield broadcast messages as fast as we get them. | ||
104 | |||
105 | these 2 phases might differ in time significantly | ||
106 | |||
107 | **TODO**: do this; but only when it'll be clean which other broadcast | ||
108 | messages & events we should send. | ||
109 | |||
110 | 1. Update client have bitfield --\____ in one transaction; | ||
111 | 2. Update downloaded stats --/ | ||
112 | 3. Signal to the all other peer about this. | ||
113 | -} | ||
114 | |||
115 | available :: Bitfield -> SwarmSession -> STM () | ||
116 | available bf SwarmSession {..} = {-# SCC available #-} do | ||
117 | updateProgress >> broadcast | ||
118 | where | ||
119 | updateProgress = do | ||
120 | let piLen = ciPieceLength $ tInfo $ torrentMeta | ||
121 | let bytes = piLen * BF.haveCount bf | ||
122 | modifyTVar' (currentProgress clientSession) (downloadedProgress bytes) | ||
123 | |||
124 | broadcast = mapM_ (writeTChan broadcastMessages . Have) (BF.toList bf) | ||
125 | |||
126 | -- TODO compute size of messages: if it's faster to send Bitfield | ||
127 | -- instead many Have do that | ||
128 | |||
129 | -- Also if there is single Have message in queue then the | ||
130 | -- corresponding piece is likely still in memory or disc cache, | ||
131 | -- when we can send SuggestPiece. | ||
132 | |||
133 | readAvail :: TChan a -> STM [a] | ||
134 | readAvail chan = do | ||
135 | m <- tryReadTChan chan | ||
136 | case m of | ||
137 | Just a -> (:) <$> pure a <*> readAvail chan | ||
138 | Nothing -> return [] | ||
139 | |||
140 | -- | Get pending messages queue appeared in result of asynchronously | ||
141 | -- changed client state. Resulting queue should be sent to a peer | ||
142 | -- immediately. | ||
143 | -- | ||
144 | getPending :: PeerSession -> IO [Message] | ||
145 | getPending PeerSession {..} = {-# SCC getPending #-} do | ||
146 | atomically (readAvail pendingMessages) | ||
147 | -} \ No newline at end of file | ||