diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 5 | ||||
-rw-r--r-- | src/Network/BitTorrent/Tracker.hs | 103 |
3 files changed, 85 insertions, 25 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 978e86db..be9a455b 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -26,6 +26,7 @@ import Control.Applicative | |||
26 | import Control.Exception | 26 | import Control.Exception |
27 | import Control.Concurrent | 27 | import Control.Concurrent |
28 | import Control.Lens | 28 | import Control.Lens |
29 | import Control.Monad.Fork.Class | ||
29 | import Control.Monad.Reader | 30 | import Control.Monad.Reader |
30 | import Control.Monad.Trans.Resource | 31 | import Control.Monad.Trans.Resource |
31 | 32 | ||
@@ -283,6 +284,7 @@ newtype P2P a = P2P { | |||
283 | , MonadIO, MonadThrow, MonadActive | 284 | , MonadIO, MonadThrow, MonadActive |
284 | , MonadReader PeerSession | 285 | , MonadReader PeerSession |
285 | ) | 286 | ) |
287 | -- TODO instance for MonadFork | ||
286 | 288 | ||
287 | runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () | 289 | runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () |
288 | runSession se addr p2p = | 290 | runSession se addr p2p = |
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index afe1fff1..918bfed7 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -89,6 +89,9 @@ import Network.BitTorrent.Peer | |||
89 | import Network.BitTorrent.Exchange.Protocol as BT | 89 | import Network.BitTorrent.Exchange.Protocol as BT |
90 | import Network.BitTorrent.Tracker.Protocol as BT | 90 | import Network.BitTorrent.Tracker.Protocol as BT |
91 | 91 | ||
92 | {----------------------------------------------------------------------- | ||
93 | Progress | ||
94 | -----------------------------------------------------------------------} | ||
92 | 95 | ||
93 | -- | 'Progress' contains upload/download/left stats about | 96 | -- | 'Progress' contains upload/download/left stats about |
94 | -- current client state. | 97 | -- current client state. |
@@ -109,6 +112,7 @@ startProgress = Progress 0 0 | |||
109 | Client session | 112 | Client session |
110 | -----------------------------------------------------------------------} | 113 | -----------------------------------------------------------------------} |
111 | 114 | ||
115 | -- TODO comment thread count bounding | ||
112 | type ThreadCount = Int | 116 | type ThreadCount = Int |
113 | 117 | ||
114 | defaultThreadCount :: ThreadCount | 118 | defaultThreadCount :: ThreadCount |
@@ -169,6 +173,7 @@ newClient n exts = do | |||
169 | Swarm session | 173 | Swarm session |
170 | -----------------------------------------------------------------------} | 174 | -----------------------------------------------------------------------} |
171 | 175 | ||
176 | -- TODO document P2P sessions bounding | ||
172 | type SessionCount = Int | 177 | type SessionCount = Int |
173 | 178 | ||
174 | defSeederConns :: SessionCount | 179 | defSeederConns :: SessionCount |
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index c3bce63a..ea45b75d 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs | |||
@@ -24,7 +24,7 @@ module Network.BitTorrent.Tracker | |||
24 | 24 | ||
25 | -- * Session | 25 | -- * Session |
26 | , TSession | 26 | , TSession |
27 | , getPeerAddr, getPeerList | 27 | , getPeerAddr |
28 | , getProgress, waitInterval | 28 | , getProgress, waitInterval |
29 | 29 | ||
30 | -- * Re-export | 30 | -- * Re-export |
@@ -38,6 +38,7 @@ module Network.BitTorrent.Tracker | |||
38 | 38 | ||
39 | import Control.Applicative | 39 | import Control.Applicative |
40 | import Control.Concurrent | 40 | import Control.Concurrent |
41 | import Control.Concurrent.BoundedChan as BC | ||
41 | import Control.Concurrent.STM | 42 | import Control.Concurrent.STM |
42 | import Control.Exception | 43 | import Control.Exception |
43 | import Control.Monad | 44 | import Control.Monad |
@@ -45,6 +46,7 @@ import Data.BEncode | |||
45 | import Data.ByteString (ByteString) | 46 | import Data.ByteString (ByteString) |
46 | import qualified Data.ByteString as B | 47 | import qualified Data.ByteString as B |
47 | import qualified Data.ByteString.Char8 as BC | 48 | import qualified Data.ByteString.Char8 as BC |
49 | import Data.List as L | ||
48 | import Data.Map (Map) | 50 | import Data.Map (Map) |
49 | import qualified Data.Map as M | 51 | import qualified Data.Map as M |
50 | import Data.Monoid | 52 | import Data.Monoid |
@@ -59,9 +61,13 @@ import Network.BitTorrent.Internal | |||
59 | import Network.BitTorrent.Peer | 61 | import Network.BitTorrent.Peer |
60 | import Network.BitTorrent.Tracker.Protocol | 62 | import Network.BitTorrent.Tracker.Protocol |
61 | 63 | ||
64 | {----------------------------------------------------------------------- | ||
65 | Tracker connection | ||
66 | -----------------------------------------------------------------------} | ||
62 | 67 | ||
63 | -- | 'TConnection' (shorthand for Tracker session) combines tracker request | 68 | -- | 'TConnection' (shorthand for Tracker session) combines tracker |
64 | -- fields neccessary for tracker, torrent and client identification. | 69 | -- request fields neccessary for tracker, torrent and client |
70 | -- identification. | ||
65 | -- | 71 | -- |
66 | -- This data is considered as static within one session. | 72 | -- This data is considered as static within one session. |
67 | -- | 73 | -- |
@@ -136,49 +142,84 @@ completedReq ses pr = (genericReq ses pr) { | |||
136 | , reqEvent = Just Completed | 142 | , reqEvent = Just Completed |
137 | } | 143 | } |
138 | 144 | ||
145 | {----------------------------------------------------------------------- | ||
146 | Tracker session | ||
147 | -----------------------------------------------------------------------} | ||
139 | 148 | ||
149 | {- Why use BoundedChan? | ||
140 | 150 | ||
151 | Because most times we need just a list of peer at the start and all | ||
152 | the rest time we will take little by little. On the other hand tracker | ||
153 | will give us some constant count of peers and channel will grow with | ||
154 | time. To avoid space leaks and long lists of peers (which we don't | ||
155 | need) we use bounded chaan. | ||
156 | |||
157 | Chan size. | ||
158 | |||
159 | Should be at least (count_of_workers * 2) to accumulate long enough | ||
160 | peer list. | ||
161 | |||
162 | Order of peers in chan. | ||
163 | |||
164 | Old peers in head, new ones in tail. Old peers should be used in the | ||
165 | first place because by statistics they are most likely will present in | ||
166 | network a long time than a new. | ||
167 | |||
168 | -} | ||
169 | |||
170 | type TimeInterval = Int | ||
141 | 171 | ||
142 | data TSession = TSession { | 172 | data TSession = TSession { |
173 | -- TODO synchonize progress with client session | ||
143 | seProgress :: TVar Progress | 174 | seProgress :: TVar Progress |
144 | , seInterval :: IORef Int | 175 | , seInterval :: IORef TimeInterval |
145 | , sePeers :: Chan PeerAddr | 176 | , sePeers :: BoundedChan PeerAddr |
146 | -- TODO use something like 'TVar (Set PeerAddr)' | ||
147 | -- otherwise we might get space leak | ||
148 | -- TODO or maybe BoundedChan? | ||
149 | } | 177 | } |
150 | 178 | ||
151 | newSession :: Progress -> Int -> [PeerAddr] -> IO TSession | 179 | type PeerCount = Int |
152 | newSession pr i ps = do | ||
153 | chan <- newChan | ||
154 | writeList2Chan chan ps | ||
155 | TSession <$> newTVarIO pr | ||
156 | <*> newIORef i | ||
157 | <*> pure chan | ||
158 | 180 | ||
159 | getPeerAddr :: TSession -> IO PeerAddr | 181 | defaultChanSize :: PeerCount |
160 | getPeerAddr = readChan . sePeers | 182 | defaultChanSize = defaultNumWant * 2 |
161 | 183 | ||
162 | getPeerList :: TSession -> IO [PeerAddr] | 184 | getPeerAddr :: TSession -> IO PeerAddr |
163 | getPeerList = getChanContents . sePeers | 185 | getPeerAddr = BC.readChan . sePeers |
164 | 186 | ||
165 | getProgress :: TSession -> IO Progress | 187 | getProgress :: TSession -> IO Progress |
166 | getProgress = readTVarIO . seProgress | 188 | getProgress = readTVarIO . seProgress |
167 | 189 | ||
168 | sec :: Int | 190 | newSession :: PeerCount -> Progress -> TimeInterval -> [PeerAddr] |
169 | sec = 1000 * 1000 | 191 | -> IO TSession |
192 | newSession chanSize pr i ps | ||
193 | | chanSize < 1 | ||
194 | = throwIO $ userError "size of chan should be more that 1" | ||
195 | |||
196 | | otherwise = do | ||
197 | chan <- newBoundedChan chanSize | ||
198 | |||
199 | -- if length of the "ps" is more than the "chanSize" we will block | ||
200 | -- forever; to avoid this we remove excessive peers | ||
201 | let ps' = take chanSize ps | ||
202 | BC.writeList2Chan chan ps' | ||
203 | |||
204 | TSession <$> newTVarIO pr | ||
205 | <*> newIORef i | ||
206 | <*> pure chan | ||
170 | 207 | ||
171 | waitInterval :: TSession -> IO () | 208 | waitInterval :: TSession -> IO () |
172 | waitInterval se @ TSession {..} = do | 209 | waitInterval se @ TSession {..} = do |
173 | delay <- readIORef seInterval | 210 | delay <- readIORef seInterval |
174 | threadDelay (delay * sec) | 211 | threadDelay (delay * sec) |
212 | where | ||
213 | sec = 1000 * 1000 :: Int | ||
175 | 214 | ||
176 | withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a | 215 | withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a |
177 | withTracker initProgress conn action = bracket start end (action . fst) | 216 | withTracker initProgress conn action = bracket start end (action . fst) |
178 | where | 217 | where |
179 | start = do | 218 | start = do |
180 | resp <- askTracker (startedReq conn initProgress) | 219 | resp <- askTracker (startedReq conn initProgress) |
181 | se <- newSession initProgress (respInterval resp) (respPeers resp) | 220 | se <- newSession defaultChanSize initProgress |
221 | (respInterval resp) (respPeers resp) | ||
222 | |||
182 | tid <- forkIO (syncSession se) | 223 | tid <- forkIO (syncSession se) |
183 | return (se, tid) | 224 | return (se, tid) |
184 | 225 | ||
@@ -190,7 +231,16 @@ withTracker initProgress conn action = bracket start end (action . fst) | |||
190 | case resp of | 231 | case resp of |
191 | Right (OK {..}) -> do | 232 | Right (OK {..}) -> do |
192 | writeIORef seInterval respInterval | 233 | writeIORef seInterval respInterval |
193 | writeList2Chan sePeers respPeers | 234 | |
235 | -- we rely on the fact that union on lists is not | ||
236 | -- commutative: this implements the heuristic "old peers | ||
237 | -- in head" | ||
238 | old <- BC.getChanContents sePeers | ||
239 | let new = respPeers | ||
240 | let combined = L.union old new | ||
241 | |||
242 | BC.writeList2Chan sePeers combined | ||
243 | |||
194 | _ -> return () | 244 | _ -> return () |
195 | where | 245 | where |
196 | isIOException :: IOException -> Maybe IOException | 246 | isIOException :: IOException -> Maybe IOException |
@@ -201,6 +251,9 @@ withTracker initProgress conn action = bracket start end (action . fst) | |||
201 | pr <- getProgress se | 251 | pr <- getProgress se |
202 | leaveTracker $ stoppedReq conn pr | 252 | leaveTracker $ stoppedReq conn pr |
203 | 253 | ||
254 | {----------------------------------------------------------------------- | ||
255 | Scrape | ||
256 | -----------------------------------------------------------------------} | ||
204 | 257 | ||
205 | 258 | ||
206 | -- | Information about particular torrent. | 259 | -- | Information about particular torrent. |