diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-08-28 07:29:31 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-08-28 07:29:31 +0400 |
commit | 9737a06bff6c6539a6afd67f7970a6923b401d86 (patch) | |
tree | e53a54098480ab33c9d0c75c2cda50f7bc331a05 /src/Network | |
parent | 8661b97e62e785b8c95479ea0bb8855632f55dec (diff) |
~ Refactor tracker.
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 13 | ||||
-rw-r--r-- | src/Network/BitTorrent/Tracker.hs | 132 |
2 files changed, 64 insertions, 81 deletions
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs index 9713f438..0f0d7ecd 100644 --- a/src/Network/BitTorrent/Sessions.hs +++ b/src/Network/BitTorrent/Sessions.hs | |||
@@ -58,6 +58,7 @@ import Prelude hiding (mapM_, elem) | |||
58 | import Control.Applicative | 58 | import Control.Applicative |
59 | import Control.Concurrent | 59 | import Control.Concurrent |
60 | import Control.Concurrent.STM | 60 | import Control.Concurrent.STM |
61 | import Control.Concurrent.BoundedChan as BC | ||
61 | import Control.Concurrent.MSem as MSem | 62 | import Control.Concurrent.MSem as MSem |
62 | import Control.Monad (forever, (>=>)) | 63 | import Control.Monad (forever, (>=>)) |
63 | import Control.Exception | 64 | import Control.Exception |
@@ -202,12 +203,12 @@ discover swarm @ SwarmSession {..} = {-# SCC discover #-} do | |||
202 | , tconnPort = port | 203 | , tconnPort = port |
203 | } | 204 | } |
204 | 205 | ||
205 | progress <- getCurrentProgress clientSession | 206 | let progress = currentProgress clientSession |
206 | 207 | ch <- newBoundedChan 100 -- TODO | |
207 | withTracker progress conn $ \tses -> do | 208 | tid <- forkIO $ tracker ch progress conn |
208 | forever $ do | 209 | forever $ do |
209 | addr <- getPeerAddr tses | 210 | addr <- BC.readChan ch |
210 | forkThrottle swarm $ do | 211 | forkThrottle swarm $ do |
211 | initiatePeerSession swarm addr $ \pconn -> do | 212 | initiatePeerSession swarm addr $ \pconn -> do |
212 | print addr | 213 | print addr |
213 | runP2P pconn p2p | 214 | runP2P pconn p2p |
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index 74b0b593..147d1ea5 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs | |||
@@ -11,15 +11,13 @@ | |||
11 | -- | 11 | -- |
12 | {-# LANGUAGE TemplateHaskell #-} | 12 | {-# LANGUAGE TemplateHaskell #-} |
13 | module Network.BitTorrent.Tracker | 13 | module Network.BitTorrent.Tracker |
14 | ( withTracker, completedReq | 14 | ( -- * Connection |
15 | 15 | TConnection(..) | |
16 | -- * Connection | 16 | , tconnection |
17 | , TConnection(..), tconnection | ||
18 | 17 | ||
19 | -- * Session | 18 | -- * Session |
20 | , TSession | 19 | , TSession |
21 | , getPeerAddr | 20 | , tracker |
22 | , getProgress, waitInterval | ||
23 | 21 | ||
24 | -- * Re-export | 22 | -- * Re-export |
25 | , defaultPorts | 23 | , defaultPorts |
@@ -32,10 +30,9 @@ import Control.Concurrent.BoundedChan as BC | |||
32 | import Control.Concurrent.STM | 30 | import Control.Concurrent.STM |
33 | import Control.Exception | 31 | import Control.Exception |
34 | import Control.Monad | 32 | import Control.Monad |
35 | 33 | import Data.List as L | |
36 | import Data.List as L | 34 | import Data.IORef |
37 | import Data.IORef | 35 | import Data.Text as T |
38 | |||
39 | import Network | 36 | import Network |
40 | import Network.URI | 37 | import Network.URI |
41 | 38 | ||
@@ -77,10 +74,14 @@ instance Tracker BitTracker where | |||
77 | -- This data is considered as static within one session. | 74 | -- This data is considered as static within one session. |
78 | -- | 75 | -- |
79 | data TConnection = TConnection { | 76 | data TConnection = TConnection { |
80 | tconnAnnounce :: URI -- ^ Announce URL. | 77 | tconnAnnounce :: URI |
81 | , tconnInfoHash :: InfoHash -- ^ Hash of info part of current .torrent file. | 78 | -- ^ Announce URL. |
82 | , tconnPeerId :: PeerId -- ^ Client peer ID. | 79 | , tconnInfoHash :: InfoHash |
83 | , tconnPort :: PortNumber -- ^ The port number the client is listenning on. | 80 | -- ^ Hash of info part of current .torrent file. |
81 | , tconnPeerId :: PeerId | ||
82 | -- ^ Client peer ID. | ||
83 | , tconnPort :: PortNumber | ||
84 | -- ^ The port number the client is listenning on. | ||
84 | } deriving Show | 85 | } deriving Show |
85 | 86 | ||
86 | -- TODO tconnection :: SwarmSession -> TConnection | 87 | -- TODO tconnection :: SwarmSession -> TConnection |
@@ -166,45 +167,6 @@ network a long time than a new. | |||
166 | 167 | ||
167 | type TimeInterval = Int | 168 | type TimeInterval = Int |
168 | 169 | ||
169 | data TSession = TSession { | ||
170 | -- TODO synchonize progress with client session | ||
171 | seProgress :: TVar Progress | ||
172 | , seInterval :: IORef TimeInterval | ||
173 | , sePeers :: BoundedChan PeerAddr | ||
174 | , seTracker :: BitTracker | ||
175 | } | ||
176 | |||
177 | type PeerCount = Int | ||
178 | |||
179 | defaultChanSize :: PeerCount | ||
180 | defaultChanSize = defaultNumWant * 2 | ||
181 | |||
182 | getPeerAddr :: TSession -> IO PeerAddr | ||
183 | getPeerAddr = BC.readChan . sePeers | ||
184 | |||
185 | getProgress :: TSession -> IO Progress | ||
186 | getProgress = readTVarIO . seProgress | ||
187 | |||
188 | newSession :: PeerCount -> Progress -> TimeInterval -> [PeerAddr] | ||
189 | -> BitTracker | ||
190 | -> IO TSession | ||
191 | newSession chanSize pr i ps tr | ||
192 | | chanSize < 1 | ||
193 | = throwIO $ userError "size of chan should be more that 1" | ||
194 | |||
195 | | otherwise = do | ||
196 | chan <- newBoundedChan chanSize | ||
197 | |||
198 | -- if length of the "ps" is more than the "chanSize" we will block | ||
199 | -- forever; to avoid this we remove excessive peers | ||
200 | let ps' = take chanSize ps | ||
201 | BC.writeList2Chan chan ps' | ||
202 | |||
203 | TSession <$> newTVarIO pr | ||
204 | <*> newIORef i | ||
205 | <*> pure chan | ||
206 | <*> pure tr | ||
207 | |||
208 | waitInterval :: TSession -> IO () | 170 | waitInterval :: TSession -> IO () |
209 | waitInterval TSession {..} = do | 171 | waitInterval TSession {..} = do |
210 | delay <- readIORef seInterval | 172 | delay <- readIORef seInterval |
@@ -212,30 +174,53 @@ waitInterval TSession {..} = do | |||
212 | where | 174 | where |
213 | sec = 1000 * 1000 :: Int | 175 | sec = 1000 * 1000 :: Int |
214 | 176 | ||
215 | announceLoop :: IO (BoundedChan PeerAddr) | 177 | data TSession = TSession |
216 | announceLoop = undefined | 178 | { seConnection :: !TConnection |
217 | 179 | , seTracker :: !BitTracker | |
218 | openSession :: Progress -> TConnection -> IO TSession | 180 | , seProgress :: !(TVar Progress) |
219 | openSession initProgress conn = do | 181 | , sePeers :: !(BoundedChan PeerAddr) |
220 | t <- Tracker.connect (tconnAnnounce conn) | 182 | , seInterval :: {-# UNPACK #-} !(IORef TimeInterval) |
221 | resp <- Tracker.announce t (startedReq conn initProgress) | 183 | } |
222 | newSession defaultChanSize initProgress | ||
223 | (respInterval resp) (respPeers resp) t | ||
224 | 184 | ||
225 | closeSession :: TConnection -> TSession -> IO () | 185 | openSession :: BoundedChan PeerAddr |
226 | closeSession conn se @ TSession {..} = do | 186 | -> TVar Progress |
227 | pr <- getProgress se | 187 | -> TConnection -> IO TSession |
228 | Tracker.announce seTracker (stoppedReq conn pr) | 188 | openSession chan progress conn @ TConnection {..} = do |
189 | trac <- Tracker.connect tconnAnnounce | ||
190 | pr <- readTVarIO progress | ||
191 | resp <- Tracker.announce trac $ startedReq conn pr | ||
192 | print resp | ||
193 | case resp of | ||
194 | Failure e -> throwIO $ userError $ T.unpack e | ||
195 | AnnounceInfo {..} -> do | ||
196 | -- TODO make use of rest AnnounceInfo fields | ||
197 | BC.writeList2Chan chan respPeers | ||
198 | TSession conn trac progress chan | ||
199 | <$> newIORef respInterval | ||
200 | |||
201 | closeSession :: TSession -> IO () | ||
202 | closeSession TSession {..} = do | ||
203 | pr <- readTVarIO seProgress | ||
204 | _ <- Tracker.announce seTracker (stoppedReq seConnection pr) | ||
229 | return () | 205 | return () |
230 | 206 | ||
231 | syncSession :: TConnection -> TSession -> IO () | 207 | withSession :: BoundedChan PeerAddr |
232 | syncSession conn se @ TSession {..} = forever $ do | 208 | -> TVar Progress |
209 | -> TConnection -> (TSession -> IO a) -> IO a | ||
210 | withSession chan prog conn | ||
211 | = bracket (openSession chan prog conn) closeSession | ||
212 | |||
213 | askPeers :: TSession -> IO () | ||
214 | askPeers se @ TSession {..} = forever $ do | ||
233 | waitInterval se | 215 | waitInterval se |
234 | pr <- getProgress se | 216 | pr <- readTVarIO seProgress |
235 | resp <- tryJust isIOException $ do | 217 | resp <- tryJust isIOException $ do |
236 | Tracker.announce seTracker (regularReq defaultNumWant conn pr) | 218 | let req = regularReq defaultNumWant seConnection pr |
219 | Tracker.announce seTracker req | ||
220 | print resp | ||
237 | case resp of | 221 | case resp of |
238 | Left _ -> return () | 222 | Left _ -> return () |
223 | Right (Failure e) -> throwIO $ userError $ T.unpack e | ||
239 | Right (AnnounceInfo {..}) -> do | 224 | Right (AnnounceInfo {..}) -> do |
240 | writeIORef seInterval respInterval | 225 | writeIORef seInterval respInterval |
241 | 226 | ||
@@ -249,8 +234,5 @@ syncSession conn se @ TSession {..} = forever $ do | |||
249 | isIOException :: IOException -> Maybe IOException | 234 | isIOException :: IOException -> Maybe IOException |
250 | isIOException = return | 235 | isIOException = return |
251 | 236 | ||
252 | withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a | 237 | tracker :: BoundedChan PeerAddr -> TVar Progress -> TConnection -> IO () |
253 | withTracker initProgress conn | 238 | tracker chan prog conn = withSession chan prog conn askPeers \ No newline at end of file |
254 | = bracket | ||
255 | (openSession initProgress conn) | ||
256 | (closeSession conn) | ||