diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-06-11 04:27:12 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-06-11 04:27:12 +0400 |
commit | cdb75165ee0e4f2c36f5766fba4c7bc4bd31db2b (patch) | |
tree | 890a091aceab0b62dd31e697efa2eed8793d31d0 /src/Network/BitTorrent/Internal.hs | |
parent | 7313c139eb5a75edf4fca36e5d0f401584ab7502 (diff) |
~ Add keepalive timeouts.
Diffstat (limited to 'src/Network/BitTorrent/Internal.hs')
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs new file mode 100644 index 00000000..d34c6236 --- /dev/null +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -0,0 +1,210 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam T. 2013 | ||
3 | -- License : MIT | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module implement opaque broadcast message passing. It | ||
9 | -- provides sessions needed by Network.BitTorrent and | ||
10 | -- Network.BitTorrent.Exchange and modules. To hide some internals | ||
11 | -- of this module we detach it from Exchange. | ||
12 | -- | ||
13 | {-# LANGUAGE RecordWildCards #-} | ||
14 | module Network.BitTorrent.Internal | ||
15 | ( ClientSession(..), newClient | ||
16 | , SwarmSession(..), newLeacher, newSeeder | ||
17 | , PeerSession(..), withPeerSession | ||
18 | |||
19 | -- * Timeouts | ||
20 | , updateIncoming, updateOutcoming | ||
21 | ) where | ||
22 | |||
23 | import Control.Applicative | ||
24 | import Control.Concurrent | ||
25 | import Control.Concurrent.STM | ||
26 | import Control.Exception | ||
27 | |||
28 | import Data.IORef | ||
29 | import Data.Function | ||
30 | import Data.Ord | ||
31 | import Data.Set as S | ||
32 | |||
33 | import Data.Conduit | ||
34 | import Data.Conduit.Cereal | ||
35 | import Data.Conduit.Network | ||
36 | import Data.Serialize | ||
37 | |||
38 | import Network | ||
39 | import Network.Socket | ||
40 | import Network.Socket.ByteString | ||
41 | |||
42 | import GHC.Event as Ev | ||
43 | |||
44 | import Data.Bitfield as BF | ||
45 | import Data.Torrent | ||
46 | import Network.BitTorrent.Extension | ||
47 | import Network.BitTorrent.Peer | ||
48 | import Network.BitTorrent.Exchange.Protocol as BT | ||
49 | |||
50 | |||
51 | |||
52 | {----------------------------------------------------------------------- | ||
53 | Client session | ||
54 | -----------------------------------------------------------------------} | ||
55 | |||
56 | -- | In one application you could have many clients. | ||
57 | data ClientSession = ClientSession { | ||
58 | clientPeerID :: PeerID -- ^ | ||
59 | , allowedExtensions :: [Extension] -- ^ | ||
60 | , swarmSessions :: TVar (Set SwarmSession) | ||
61 | , eventManager :: EventManager | ||
62 | } | ||
63 | |||
64 | instance Eq ClientSession where | ||
65 | (==) = (==) `on` clientPeerID | ||
66 | |||
67 | instance Ord ClientSession where | ||
68 | compare = comparing clientPeerID | ||
69 | |||
70 | newClient :: [Extension] -> IO ClientSession | ||
71 | newClient exts = ClientSession <$> newPeerID | ||
72 | <*> pure exts | ||
73 | <*> newTVarIO S.empty | ||
74 | <*> Ev.new | ||
75 | |||
76 | {----------------------------------------------------------------------- | ||
77 | Swarm session | ||
78 | -----------------------------------------------------------------------} | ||
79 | |||
80 | -- | Extensions are set globally by | ||
81 | -- Swarm session are un | ||
82 | data SwarmSession = SwarmSession { | ||
83 | torrentInfoHash :: InfoHash | ||
84 | , clientSession :: ClientSession | ||
85 | , clientBitfield :: IORef Bitfield | ||
86 | , connectedPeers :: TVar (Set PeerSession) | ||
87 | } | ||
88 | |||
89 | instance Eq SwarmSession where | ||
90 | (==) = (==) `on` torrentInfoHash | ||
91 | |||
92 | instance Ord SwarmSession where | ||
93 | compare = comparing torrentInfoHash | ||
94 | |||
95 | newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession | ||
96 | newSwarmSession bf cs @ ClientSession {..} Torrent {..} | ||
97 | = SwarmSession <$> pure tInfoHash | ||
98 | <*> pure cs | ||
99 | <*> newIORef bf | ||
100 | <*> newTVarIO S.empty | ||
101 | |||
102 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
103 | newSeeder cs t @ Torrent {..} | ||
104 | = newSwarmSession (haveAll (pieceCount tInfo)) cs t | ||
105 | |||
106 | newLeacher :: ClientSession -> Torrent -> IO SwarmSession | ||
107 | newLeacher cs t @ Torrent {..} | ||
108 | = newSwarmSession (haveNone (pieceCount tInfo)) cs t | ||
109 | |||
110 | isLeacher :: SwarmSession -> IO Bool | ||
111 | isLeacher = undefined | ||
112 | |||
113 | {----------------------------------------------------------------------- | ||
114 | Peer session | ||
115 | -----------------------------------------------------------------------} | ||
116 | |||
117 | data PeerSession = PeerSession { | ||
118 | connectedPeerAddr :: PeerAddr | ||
119 | , swarmSession :: SwarmSession | ||
120 | , enabledExtensions :: [Extension] | ||
121 | |||
122 | -- | To dissconnect from died peers appropriately we should check | ||
123 | -- if a peer do not sent the KA message within given interval. If | ||
124 | -- yes, we should throw an exception in 'TimeoutCallback' and | ||
125 | -- close session between peers. | ||
126 | -- | ||
127 | -- We should update timeout if we /receive/ any message within | ||
128 | -- timeout interval to keep connection up. | ||
129 | , incomingTimeout :: TimeoutKey | ||
130 | |||
131 | -- | To send KA message appropriately we should know when was last | ||
132 | -- time we sent a message to a peer. To do that we keep registered | ||
133 | -- timeout in event manager and if we do not sent any message to | ||
134 | -- the peer within given interval then we send KA message in | ||
135 | -- 'TimeoutCallback'. | ||
136 | -- | ||
137 | -- We should update timeout if we /send/ any message within timeout | ||
138 | -- to avoid reduntant KA messages. | ||
139 | , outcomingTimeout :: TimeoutKey | ||
140 | |||
141 | , broadcastMessages :: Chan [Message] | ||
142 | , peerBitfield :: IORef Bitfield | ||
143 | , peerSessionStatus :: IORef SessionStatus | ||
144 | } | ||
145 | |||
146 | instance Eq PeerSession where | ||
147 | (==) = (==) `on` connectedPeerAddr | ||
148 | |||
149 | instance Ord PeerSession where | ||
150 | compare = comparing connectedPeerAddr | ||
151 | |||
152 | -- TODO check if it connected yet peer | ||
153 | withPeerSession :: SwarmSession -> PeerAddr | ||
154 | -> ((Socket, PeerSession) -> IO a) | ||
155 | -> IO a | ||
156 | |||
157 | withPeerSession ss @ SwarmSession {..} addr | ||
158 | = bracket openSession closeSession | ||
159 | where | ||
160 | openSession = do | ||
161 | let caps = encodeExts $ allowedExtensions $ clientSession | ||
162 | let pid = clientPeerID $ clientSession | ||
163 | let chs = Handshake defaultBTProtocol caps torrentInfoHash pid | ||
164 | |||
165 | sock <- connectToPeer addr | ||
166 | phs <- handshake sock chs `onException` close sock | ||
167 | |||
168 | let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) | ||
169 | ps <- PeerSession addr ss enabled | ||
170 | <$> registerTimeout (eventManager clientSession) | ||
171 | maxIncomingTime abortSession | ||
172 | <*> registerTimeout (eventManager clientSession) | ||
173 | maxOutcomingTime (sendKA sock) | ||
174 | <*> newChan | ||
175 | <*> pure clientBitfield | ||
176 | <*> newIORef initSessionStatus | ||
177 | return (sock, ps) | ||
178 | |||
179 | closeSession = close . fst | ||
180 | |||
181 | {----------------------------------------------------------------------- | ||
182 | Timeouts | ||
183 | -----------------------------------------------------------------------} | ||
184 | |||
185 | sec :: Int | ||
186 | sec = 1000 * 1000 | ||
187 | |||
188 | maxIncomingTime :: Int | ||
189 | maxIncomingTime = 120 * sec | ||
190 | |||
191 | maxOutcomingTime :: Int | ||
192 | maxOutcomingTime = 60 * sec | ||
193 | |||
194 | -- | Should be called after we have received any message from a peer. | ||
195 | updateIncoming :: PeerSession -> IO () | ||
196 | updateIncoming PeerSession {..} = do | ||
197 | updateTimeout (eventManager (clientSession swarmSession)) | ||
198 | incomingTimeout maxIncomingTime | ||
199 | |||
200 | -- | Should be called before we have send any message to a peer. | ||
201 | updateOutcoming :: PeerSession -> IO () | ||
202 | updateOutcoming PeerSession {..} = | ||
203 | updateTimeout (eventManager (clientSession swarmSession)) | ||
204 | outcomingTimeout maxOutcomingTime | ||
205 | |||
206 | sendKA :: Socket -> IO () | ||
207 | sendKA sock = sendAll sock (encode BT.KeepAlive) | ||
208 | |||
209 | abortSession :: IO () | ||
210 | abortSession = error "abortSession: not implemented" | ||