summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bittorrent.cabal16
-rw-r--r--exsamples/Main.hs2
-rw-r--r--src/Network/BitTorrent/Exchange.hs2
-rw-r--r--src/Network/BitTorrent/Internal.hs5
-rw-r--r--src/Network/BitTorrent/Tracker.hs103
5 files changed, 95 insertions, 33 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index 73e8bef2..c8f3d4ed 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -58,11 +58,19 @@ library
58 58
59 -- Control packages 59 -- Control packages
60 , mtl 60 , mtl
61 , resourcet
61 , lens 62 , lens
62 63
63 -- Concurrency packages 64 -- Concurrency packages
65 , monad-fork
64 , SafeSemaphore 66 , SafeSemaphore
65 , stm >= 2.4 67 , BoundedChan >= 1.0.1.0
68 , stm >= 2.4
69
70 -- Conduits
71 , conduit == 1.*
72 , network-conduit == 1.*
73 , cereal-conduit >= 0.5
66 74
67 -- Data packages 75 -- Data packages
68 , array >= 0.4 76 , array >= 0.4
@@ -87,12 +95,6 @@ library
87 , HTTP >= 4000.2 95 , HTTP >= 4000.2
88 , krpc 96 , krpc
89 97
90 -- Conduits
91 , conduit == 1.*
92 , network-conduit == 1.*
93 , cereal-conduit >= 0.5
94 , resourcet
95
96 -- Misc 98 -- Misc
97 , data-default 99 , data-default
98 , cryptohash 100 , cryptohash
diff --git a/exsamples/Main.hs b/exsamples/Main.hs
index 9ed311ae..ebc81b02 100644
--- a/exsamples/Main.hs
+++ b/exsamples/Main.hs
@@ -15,7 +15,7 @@ main = do
15 [path] <- getArgs 15 [path] <- getArgs
16 torrent <- fromFile path 16 torrent <- fromFile path
17 17
18 client <- defaultClient 18 client <- newClient 2 []
19 swarm <- newLeacher client torrent 19 swarm <- newLeacher client torrent
20 20
21 discover swarm $ do 21 discover swarm $ do
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 978e86db..be9a455b 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -26,6 +26,7 @@ import Control.Applicative
26import Control.Exception 26import Control.Exception
27import Control.Concurrent 27import Control.Concurrent
28import Control.Lens 28import Control.Lens
29import Control.Monad.Fork.Class
29import Control.Monad.Reader 30import Control.Monad.Reader
30import Control.Monad.Trans.Resource 31import Control.Monad.Trans.Resource
31 32
@@ -283,6 +284,7 @@ newtype P2P a = P2P {
283 , MonadIO, MonadThrow, MonadActive 284 , MonadIO, MonadThrow, MonadActive
284 , MonadReader PeerSession 285 , MonadReader PeerSession
285 ) 286 )
287-- TODO instance for MonadFork
286 288
287runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () 289runSession :: SwarmSession -> PeerAddr -> P2P () -> IO ()
288runSession se addr p2p = 290runSession se addr p2p =
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
index afe1fff1..918bfed7 100644
--- a/src/Network/BitTorrent/Internal.hs
+++ b/src/Network/BitTorrent/Internal.hs
@@ -89,6 +89,9 @@ import Network.BitTorrent.Peer
89import Network.BitTorrent.Exchange.Protocol as BT 89import Network.BitTorrent.Exchange.Protocol as BT
90import Network.BitTorrent.Tracker.Protocol as BT 90import Network.BitTorrent.Tracker.Protocol as BT
91 91
92{-----------------------------------------------------------------------
93 Progress
94-----------------------------------------------------------------------}
92 95
93-- | 'Progress' contains upload/download/left stats about 96-- | 'Progress' contains upload/download/left stats about
94-- current client state. 97-- current client state.
@@ -109,6 +112,7 @@ startProgress = Progress 0 0
109 Client session 112 Client session
110-----------------------------------------------------------------------} 113-----------------------------------------------------------------------}
111 114
115-- TODO comment thread count bounding
112type ThreadCount = Int 116type ThreadCount = Int
113 117
114defaultThreadCount :: ThreadCount 118defaultThreadCount :: ThreadCount
@@ -169,6 +173,7 @@ newClient n exts = do
169 Swarm session 173 Swarm session
170-----------------------------------------------------------------------} 174-----------------------------------------------------------------------}
171 175
176-- TODO document P2P sessions bounding
172type SessionCount = Int 177type SessionCount = Int
173 178
174defSeederConns :: SessionCount 179defSeederConns :: SessionCount
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs
index c3bce63a..ea45b75d 100644
--- a/src/Network/BitTorrent/Tracker.hs
+++ b/src/Network/BitTorrent/Tracker.hs
@@ -24,7 +24,7 @@ module Network.BitTorrent.Tracker
24 24
25 -- * Session 25 -- * Session
26 , TSession 26 , TSession
27 , getPeerAddr, getPeerList 27 , getPeerAddr
28 , getProgress, waitInterval 28 , getProgress, waitInterval
29 29
30 -- * Re-export 30 -- * Re-export
@@ -38,6 +38,7 @@ module Network.BitTorrent.Tracker
38 38
39import Control.Applicative 39import Control.Applicative
40import Control.Concurrent 40import Control.Concurrent
41import Control.Concurrent.BoundedChan as BC
41import Control.Concurrent.STM 42import Control.Concurrent.STM
42import Control.Exception 43import Control.Exception
43import Control.Monad 44import Control.Monad
@@ -45,6 +46,7 @@ import Data.BEncode
45import Data.ByteString (ByteString) 46import Data.ByteString (ByteString)
46import qualified Data.ByteString as B 47import qualified Data.ByteString as B
47import qualified Data.ByteString.Char8 as BC 48import qualified Data.ByteString.Char8 as BC
49import Data.List as L
48import Data.Map (Map) 50import Data.Map (Map)
49import qualified Data.Map as M 51import qualified Data.Map as M
50import Data.Monoid 52import Data.Monoid
@@ -59,9 +61,13 @@ import Network.BitTorrent.Internal
59import Network.BitTorrent.Peer 61import Network.BitTorrent.Peer
60import Network.BitTorrent.Tracker.Protocol 62import Network.BitTorrent.Tracker.Protocol
61 63
64{-----------------------------------------------------------------------
65 Tracker connection
66-----------------------------------------------------------------------}
62 67
63-- | 'TConnection' (shorthand for Tracker session) combines tracker request 68-- | 'TConnection' (shorthand for Tracker session) combines tracker
64-- fields neccessary for tracker, torrent and client identification. 69-- request fields neccessary for tracker, torrent and client
70-- identification.
65-- 71--
66-- This data is considered as static within one session. 72-- This data is considered as static within one session.
67-- 73--
@@ -136,49 +142,84 @@ completedReq ses pr = (genericReq ses pr) {
136 , reqEvent = Just Completed 142 , reqEvent = Just Completed
137 } 143 }
138 144
145{-----------------------------------------------------------------------
146 Tracker session
147-----------------------------------------------------------------------}
139 148
149{- Why use BoundedChan?
140 150
151Because most times we need just a list of peer at the start and all
152the rest time we will take little by little. On the other hand tracker
153will give us some constant count of peers and channel will grow with
154time. To avoid space leaks and long lists of peers (which we don't
155need) we use bounded chaan.
156
157 Chan size.
158
159Should be at least (count_of_workers * 2) to accumulate long enough
160peer list.
161
162 Order of peers in chan.
163
164Old peers in head, new ones in tail. Old peers should be used in the
165first place because by statistics they are most likely will present in
166network a long time than a new.
167
168-}
169
170type TimeInterval = Int
141 171
142data TSession = TSession { 172data TSession = TSession {
173 -- TODO synchonize progress with client session
143 seProgress :: TVar Progress 174 seProgress :: TVar Progress
144 , seInterval :: IORef Int 175 , seInterval :: IORef TimeInterval
145 , sePeers :: Chan PeerAddr 176 , sePeers :: BoundedChan PeerAddr
146 -- TODO use something like 'TVar (Set PeerAddr)'
147 -- otherwise we might get space leak
148 -- TODO or maybe BoundedChan?
149 } 177 }
150 178
151newSession :: Progress -> Int -> [PeerAddr] -> IO TSession 179type PeerCount = Int
152newSession pr i ps = do
153 chan <- newChan
154 writeList2Chan chan ps
155 TSession <$> newTVarIO pr
156 <*> newIORef i
157 <*> pure chan
158 180
159getPeerAddr :: TSession -> IO PeerAddr 181defaultChanSize :: PeerCount
160getPeerAddr = readChan . sePeers 182defaultChanSize = defaultNumWant * 2
161 183
162getPeerList :: TSession -> IO [PeerAddr] 184getPeerAddr :: TSession -> IO PeerAddr
163getPeerList = getChanContents . sePeers 185getPeerAddr = BC.readChan . sePeers
164 186
165getProgress :: TSession -> IO Progress 187getProgress :: TSession -> IO Progress
166getProgress = readTVarIO . seProgress 188getProgress = readTVarIO . seProgress
167 189
168sec :: Int 190newSession :: PeerCount -> Progress -> TimeInterval -> [PeerAddr]
169sec = 1000 * 1000 191 -> IO TSession
192newSession chanSize pr i ps
193 | chanSize < 1
194 = throwIO $ userError "size of chan should be more that 1"
195
196 | otherwise = do
197 chan <- newBoundedChan chanSize
198
199 -- if length of the "ps" is more than the "chanSize" we will block
200 -- forever; to avoid this we remove excessive peers
201 let ps' = take chanSize ps
202 BC.writeList2Chan chan ps'
203
204 TSession <$> newTVarIO pr
205 <*> newIORef i
206 <*> pure chan
170 207
171waitInterval :: TSession -> IO () 208waitInterval :: TSession -> IO ()
172waitInterval se @ TSession {..} = do 209waitInterval se @ TSession {..} = do
173 delay <- readIORef seInterval 210 delay <- readIORef seInterval
174 threadDelay (delay * sec) 211 threadDelay (delay * sec)
212 where
213 sec = 1000 * 1000 :: Int
175 214
176withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a 215withTracker :: Progress -> TConnection -> (TSession -> IO a) -> IO a
177withTracker initProgress conn action = bracket start end (action . fst) 216withTracker initProgress conn action = bracket start end (action . fst)
178 where 217 where
179 start = do 218 start = do
180 resp <- askTracker (startedReq conn initProgress) 219 resp <- askTracker (startedReq conn initProgress)
181 se <- newSession initProgress (respInterval resp) (respPeers resp) 220 se <- newSession defaultChanSize initProgress
221 (respInterval resp) (respPeers resp)
222
182 tid <- forkIO (syncSession se) 223 tid <- forkIO (syncSession se)
183 return (se, tid) 224 return (se, tid)
184 225
@@ -190,7 +231,16 @@ withTracker initProgress conn action = bracket start end (action . fst)
190 case resp of 231 case resp of
191 Right (OK {..}) -> do 232 Right (OK {..}) -> do
192 writeIORef seInterval respInterval 233 writeIORef seInterval respInterval
193 writeList2Chan sePeers respPeers 234
235 -- we rely on the fact that union on lists is not
236 -- commutative: this implements the heuristic "old peers
237 -- in head"
238 old <- BC.getChanContents sePeers
239 let new = respPeers
240 let combined = L.union old new
241
242 BC.writeList2Chan sePeers combined
243
194 _ -> return () 244 _ -> return ()
195 where 245 where
196 isIOException :: IOException -> Maybe IOException 246 isIOException :: IOException -> Maybe IOException
@@ -201,6 +251,9 @@ withTracker initProgress conn action = bracket start end (action . fst)
201 pr <- getProgress se 251 pr <- getProgress se
202 leaveTracker $ stoppedReq conn pr 252 leaveTracker $ stoppedReq conn pr
203 253
254{-----------------------------------------------------------------------
255 Scrape
256-----------------------------------------------------------------------}
204 257
205 258
206-- | Information about particular torrent. 259-- | Information about particular torrent.