diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-06-13 09:52:40 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-06-13 09:52:40 +0400 |
commit | b1413145f58be6c3d7e536574a19e1b6c333cd54 (patch) | |
tree | 2d2464533a4ec45d572d191c6439b84c73567a9c /src/Network/BitTorrent/Tracker.hs | |
parent | 38d8eb046cefce18a6689488994c05abf1223ffe (diff) |
~ Use bounded chan to avoid space leaks.
Diffstat (limited to 'src/Network/BitTorrent/Tracker.hs')
-rw-r--r-- | src/Network/BitTorrent/Tracker.hs | 103 |
1 files changed, 78 insertions, 25 deletions
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index c3bce63a..ea45b75d 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs | |||
@@ -24,7 +24,7 @@ module Network.BitTorrent.Tracker | |||
24 | 24 | ||
25 | -- * Session | 25 | -- * Session |
26 | , TSession | 26 | , TSession |
27 | , getPeerAddr, getPeerList | 27 | , getPeerAddr |
28 | , getProgress, waitInterval | 28 | , getProgress, waitInterval |
29 | 29 | ||
30 | -- * Re-export | 30 | -- * Re-export |
@@ -38,6 +38,7 @@ module Network.BitTorrent.Tracker | |||
38 | 38 | ||
39 | import Control.Applicative | 39 | import Control.Applicative |
40 | import Control.Concurrent | 40 | import Control.Concurrent |
41 | import Control.Concurrent.BoundedChan as BC | ||
41 | import Control.Concurrent.STM | 42 | import Control.Concurrent.STM |
42 | import Control.Exception | 43 | import Control.Exception |
43 | import Control.Monad | 44 | import Control.Monad |
@@ -45,6 +46,7 @@ import Data.BEncode | |||
45 | import Data.ByteString (ByteString) | 46 | import Data.ByteString (ByteString) |
46 | import qualified Data.ByteString as B | 47 | import qualified Data.ByteString as B |
47 | import qualified Data.ByteString.Char8 as BC | 48 | import qualified Data.ByteString.Char8 as BC |
49 | import Data.List as L | ||
48 | import Data.Map (Map) | 50 | import Data.Map (Map) |
49 | import qualified Data.Map as M | 51 | import qualified Data.Map as M |
50 | import Data.Monoid | 52 | import Data.Monoid |
@@ -59,9 +61,13 @@ import Network.BitTorrent.Internal | |||
59 | import Network.BitTorrent.Peer | 61 | import Network.BitTorrent.Peer |
60 | import Network.BitTorrent.Tracker.Protocol | 62 | import Network.BitTorrent.Tracker.Protocol |
61 | 63 | ||
64 | {----------------------------------------------------------------------- | ||
65 | Tracker connection | ||
66 | -----------------------------------------------------------------------} | ||
62 | 67 | ||
63 | -- | 'TConnection' (shorthand for Tracker session) combines tracker request | 68 | -- | 'TConnection' (shorthand for Tracker session) combines tracker |
64 | -- fields neccessary for tracker, torrent and client identification. | 69 | -- request fields neccessary for tracker, torrent and client |
70 | -- identification. | ||
65 | -- | 71 | -- |
66 | -- This data is considered as static within one session. | 72 | -- This data is considered as static within one session. |
67 | -- | 73 | -- |
@@ -136,49 +142,84 @@ completedReq ses pr = (genericReq ses pr) { | |||
136 | , reqEvent = Just Completed | 142 | , reqEvent = Just Completed |
137 | } | 143 | } |
138 | 144 | ||
145 | {----------------------------------------------------------------------- | ||
146 | Tracker session | ||
147 | -----------------------------------------------------------------------} | ||
139 | 148 | ||
149 | {- Why use BoundedChan? | ||
140 | 150 | ||
151 | Because most times we need just a list of peer at the start and all | ||
152 | the rest time we will take little by little. On the other hand tracker | ||
153 | will give us some constant count of peers and channel will grow with | ||
154 | time. To avoid space leaks and long lists of peers (which we don't | ||
155 | need) we use bounded chaan. | ||
156 | |||
157 | Chan size. | ||
158 | |||
159 | Should be at least (count_of_workers * 2) to accumulate long enough | ||
160 | peer list. | ||
161 | |||
162 | Order of peers in chan. | ||
163 | |||
164 | Old peers in head, new ones in tail. Old peers should be used in the | ||
165 | first place because by statistics they are most likely will present in | ||
166 | network a long time than a new. | ||
167 | |||
168 | -} | ||
169 | |||
170 | type TimeInterval = Int | ||
141 | 171 | ||
142 | data TSession = TSession { | 172 | data TSession = TSession { |
173 | -- TODO synchonize progress with client session | ||
143 | seProgress :: TVar Progress | 174 | seProgress :: TVar Progress |
144 | , seInterval :: IORef Int | 175 | , seInterval :: IORef TimeInterval |
145 | , sePeers :: Chan PeerAddr | 176 | , sePeers :: BoundedChan PeerAddr |
146 | -- TODO use something like 'TVar (Set PeerAddr)' | ||
147 | -- otherwise we might get space leak | ||
148 | -- TODO or maybe BoundedChan? | ||
149 | } | 177 | } |
150 | 178 | ||
151 | newSession :: Progress -> Int -> [PeerAddr] -> IO TSession | 179 | type PeerCount = Int |
152 | newSession pr i ps = do | ||
153 | chan <- newChan | ||
154 | writeList2Chan chan ps | ||
155 | TSession <$> newTVarIO pr | ||
156 | <*> newIORef i | ||
157 | <*> pure chan | ||
158 | 180 | ||
159 | getPeerAddr :: TSession -> IO PeerAddr | 181 | defaultChanSize :: PeerCount |
160 | getPeerAddr = readChan . sePeers | 182 | defaultChanSize = defaultNumWant * 2 |
161 | 183 | ||
162 | getPeerList :: TSession -> IO [PeerAddr] | 184 | getPeerAddr :: TSession -> IO PeerAddr |
163 | getPeerList = getChanContents . sePeers | 185 | getPeerAddr = BC.readChan . sePeers |
164 | 186 | ||
165 | getProgress :: TSession -> IO Progress | 187 | getProgress :: TSession -> IO Progress |
166 | getProgress = readTVarIO . seProgress | 188 | getProgress = readTVarIO . seProgress |
167 | 189 | ||
168 | sec :: Int | 190 | newSession :: PeerCount -> Progress -> TimeInterval -> [PeerAddr] |
169 | sec = 1000 * 1000 | 191 | -> IO TSession |
192 | newSession chanSize pr i ps | ||
193 | | chanSize < 1 | ||
194 | = throwIO $ userError "size of chan should be more that 1" | ||
195 | |||
196 | | otherwise = do | ||
197 | chan <- newBoundedChan chanSize | ||
198 | |||
199 | -- if length of the "ps" is more than the "chanSize" we will block | ||
200 | -- forever; to avoid this we remove excessive peers | ||
201 | let ps' = take chanSize ps | ||
202 | BC.writeList2Chan chan ps' | ||
203 | |||
204 | TSession <$> newTVarIO pr | ||
205 | <*> newIORef i | ||
206 | <*> pure chan | ||
170 | 207 | ||
171 | waitInterval :: TSession -> IO () | 208 | waitInterval :: TSession -> IO () |
172 | waitInterval se @ TSession {..} = do | 209 | waitInterval se @ TSession {..} = do |
173 | delay <- readIORef seInterval | 210 | delay <- readIORef seInterval |
174 | threadDelay (delay * sec) | 211 | threadDelay (delay * sec) |
212 | where | ||
213 | sec = 1000 * 1000 :: Int | ||
175 | 214 | ||
176 | withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a | 215 | withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a |
177 | withTracker initProgress conn action = bracket start end (action . fst) | 216 | withTracker initProgress conn action = bracket start end (action . fst) |
178 | where | 217 | where |
179 | start = do | 218 | start = do |
180 | resp <- askTracker (startedReq conn initProgress) | 219 | resp <- askTracker (startedReq conn initProgress) |
181 | se <- newSession initProgress (respInterval resp) (respPeers resp) | 220 | se <- newSession defaultChanSize initProgress |
221 | (respInterval resp) (respPeers resp) | ||
222 | |||
182 | tid <- forkIO (syncSession se) | 223 | tid <- forkIO (syncSession se) |
183 | return (se, tid) | 224 | return (se, tid) |
184 | 225 | ||
@@ -190,7 +231,16 @@ withTracker initProgress conn action = bracket start end (action . fst) | |||
190 | case resp of | 231 | case resp of |
191 | Right (OK {..}) -> do | 232 | Right (OK {..}) -> do |
192 | writeIORef seInterval respInterval | 233 | writeIORef seInterval respInterval |
193 | writeList2Chan sePeers respPeers | 234 | |
235 | -- we rely on the fact that union on lists is not | ||
236 | -- commutative: this implements the heuristic "old peers | ||
237 | -- in head" | ||
238 | old <- BC.getChanContents sePeers | ||
239 | let new = respPeers | ||
240 | let combined = L.union old new | ||
241 | |||
242 | BC.writeList2Chan sePeers combined | ||
243 | |||
194 | _ -> return () | 244 | _ -> return () |
195 | where | 245 | where |
196 | isIOException :: IOException -> Maybe IOException | 246 | isIOException :: IOException -> Maybe IOException |
@@ -201,6 +251,9 @@ withTracker initProgress conn action = bracket start end (action . fst) | |||
201 | pr <- getProgress se | 251 | pr <- getProgress se |
202 | leaveTracker $ stoppedReq conn pr | 252 | leaveTracker $ stoppedReq conn pr |
203 | 253 | ||
254 | {----------------------------------------------------------------------- | ||
255 | Scrape | ||
256 | -----------------------------------------------------------------------} | ||
204 | 257 | ||
205 | 258 | ||
206 | -- | Information about particular torrent. | 259 | -- | Information about particular torrent. |