diff options
Diffstat (limited to 'src/Network/BitTorrent/Internal.hs')
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 77 |
1 files changed, 61 insertions, 16 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index 0355c147..5c9efd1f 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -31,6 +31,7 @@ module Network.BitTorrent.Internal | |||
31 | -- * Swarm | 31 | -- * Swarm |
32 | , SwarmSession(SwarmSession, torrentMeta, clientSession) | 32 | , SwarmSession(SwarmSession, torrentMeta, clientSession) |
33 | , newLeacher, newSeeder | 33 | , newLeacher, newSeeder |
34 | , enterSwarm, leaveSwarm , waitVacancy | ||
34 | 35 | ||
35 | -- * Peer | 36 | -- * Peer |
36 | , PeerSession(PeerSession, connectedPeerAddr | 37 | , PeerSession(PeerSession, connectedPeerAddr |
@@ -83,7 +84,7 @@ import Data.Torrent | |||
83 | import Network.BitTorrent.Extension | 84 | import Network.BitTorrent.Extension |
84 | import Network.BitTorrent.Peer | 85 | import Network.BitTorrent.Peer |
85 | import Network.BitTorrent.Exchange.Protocol as BT | 86 | import Network.BitTorrent.Exchange.Protocol as BT |
86 | 87 | import Network.BitTorrent.Tracker.Protocol as BT | |
87 | 88 | ||
88 | 89 | ||
89 | -- | 'Progress' contains upload/download/left stats about | 90 | -- | 'Progress' contains upload/download/left stats about |
@@ -106,9 +107,10 @@ startProgress = Progress 0 0 | |||
106 | -----------------------------------------------------------------------} | 107 | -----------------------------------------------------------------------} |
107 | 108 | ||
108 | -- | In one application we could have many clients with difference | 109 | -- | In one application we could have many clients with difference |
109 | -- ID's and enabled extensions. | 110 | -- ID's and different enabled extensions. |
110 | data ClientSession = ClientSession { | 111 | data ClientSession = ClientSession { |
111 | -- | Our peer ID used in handshaked and discovery mechanism. | 112 | -- | Our peer ID used in handshaked and discovery mechanism. The |
113 | -- clientPeerID is unique 'ClientSession' identifier. | ||
112 | clientPeerID :: PeerID | 114 | clientPeerID :: PeerID |
113 | 115 | ||
114 | -- | Extensions we should try to use. Hovewer some particular peer | 116 | -- | Extensions we should try to use. Hovewer some particular peer |
@@ -116,7 +118,14 @@ data ClientSession = ClientSession { | |||
116 | -- 'PeerSession'. | 118 | -- 'PeerSession'. |
117 | , allowedExtensions :: [Extension] | 119 | , allowedExtensions :: [Extension] |
118 | 120 | ||
121 | -- | Semaphor used to bound number of active P2P sessions. | ||
122 | , activeThreads :: QSemN | ||
123 | |||
124 | -- | Max number of active connections. | ||
125 | , maxActive :: Int | ||
126 | |||
119 | , swarmSessions :: TVar (Set SwarmSession) | 127 | , swarmSessions :: TVar (Set SwarmSession) |
128 | |||
120 | , eventManager :: EventManager | 129 | , eventManager :: EventManager |
121 | , currentProgress :: TVar Progress | 130 | , currentProgress :: TVar Progress |
122 | } | 131 | } |
@@ -130,8 +139,11 @@ instance Ord ClientSession where | |||
130 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress | 139 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress |
131 | getCurrentProgress = liftIO . readTVarIO . currentProgress | 140 | getCurrentProgress = liftIO . readTVarIO . currentProgress |
132 | 141 | ||
133 | newClient :: [Extension] -> IO ClientSession | 142 | newClient :: Int -- ^ Maximum count of active P2P Sessions. |
134 | newClient exts = do | 143 | -> [Extension] -- ^ Extensions allowed to use. |
144 | -> IO ClientSession | ||
145 | |||
146 | newClient n exts = do | ||
135 | mgr <- Ev.new | 147 | mgr <- Ev.new |
136 | -- TODO kill this thread when leave client | 148 | -- TODO kill this thread when leave client |
137 | _ <- forkIO $ loop mgr | 149 | _ <- forkIO $ loop mgr |
@@ -139,6 +151,8 @@ newClient exts = do | |||
139 | ClientSession | 151 | ClientSession |
140 | <$> newPeerID | 152 | <$> newPeerID |
141 | <*> pure exts | 153 | <*> pure exts |
154 | <*> newQSemN n | ||
155 | <*> pure n | ||
142 | <*> newTVarIO S.empty | 156 | <*> newTVarIO S.empty |
143 | <*> pure mgr | 157 | <*> pure mgr |
144 | <*> newTVarIO (startProgress 0) | 158 | <*> newTVarIO (startProgress 0) |
@@ -153,6 +167,10 @@ data SwarmSession = SwarmSession { | |||
153 | torrentMeta :: Torrent | 167 | torrentMeta :: Torrent |
154 | , clientSession :: ClientSession | 168 | , clientSession :: ClientSession |
155 | 169 | ||
170 | -- | Represent count of peers we _currently_ can connect to in the | ||
171 | -- swarm. Used to bound number of concurrent threads. | ||
172 | , vacantPeers :: QSemN | ||
173 | |||
156 | -- | Modify this carefully updating global progress. | 174 | -- | Modify this carefully updating global progress. |
157 | , clientBitfield :: TVar Bitfield | 175 | , clientBitfield :: TVar Bitfield |
158 | , connectedPeers :: TVar (Set PeerSession) | 176 | , connectedPeers :: TVar (Set PeerSession) |
@@ -164,20 +182,28 @@ instance Eq SwarmSession where | |||
164 | instance Ord SwarmSession where | 182 | instance Ord SwarmSession where |
165 | compare = comparing (tInfoHash . torrentMeta) | 183 | compare = comparing (tInfoHash . torrentMeta) |
166 | 184 | ||
167 | newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession | 185 | newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent |
168 | newSwarmSession bf cs @ ClientSession {..} t @ Torrent {..} | 186 | -> IO SwarmSession |
187 | newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
169 | = SwarmSession <$> pure t | 188 | = SwarmSession <$> pure t |
170 | <*> pure cs | 189 | <*> pure cs |
190 | <*> newQSemN n | ||
171 | <*> newTVarIO bf | 191 | <*> newTVarIO bf |
172 | <*> newTVarIO S.empty | 192 | <*> newTVarIO S.empty |
173 | 193 | ||
174 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | 194 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession |
175 | newSeeder cs t @ Torrent {..} | 195 | newSeeder cs t @ Torrent {..} |
176 | = newSwarmSession (haveAll (pieceCount tInfo)) cs t | 196 | = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t |
177 | 197 | ||
178 | newLeacher :: ClientSession -> Torrent -> IO SwarmSession | 198 | newLeacher :: ClientSession -> Torrent -> IO SwarmSession |
179 | newLeacher cs t @ Torrent {..} | 199 | newLeacher cs t @ Torrent {..} |
180 | = newSwarmSession (haveNone (pieceCount tInfo)) cs t | 200 | = newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t |
201 | |||
202 | defSeederConns :: Int | ||
203 | defSeederConns = defaultUnchokeSlots | ||
204 | |||
205 | defLeacherConns :: Int | ||
206 | defLeacherConns = defaultNumWant | ||
181 | 207 | ||
182 | --isLeacher :: SwarmSession -> IO Bool | 208 | --isLeacher :: SwarmSession -> IO Bool |
183 | --isLeacher = undefined | 209 | --isLeacher = undefined |
@@ -190,6 +216,22 @@ haveDone ix = | |||
190 | writeTVar (have ix bf) | 216 | writeTVar (have ix bf) |
191 | currentProgress | 217 | currentProgress |
192 | -} | 218 | -} |
219 | |||
220 | enterSwarm :: SwarmSession -> IO () | ||
221 | enterSwarm SwarmSession {..} = do | ||
222 | waitQSemN (activeThreads clientSession) 1 | ||
223 | waitQSemN vacantPeers 1 | ||
224 | |||
225 | leaveSwarm :: SwarmSession -> IO () | ||
226 | leaveSwarm SwarmSession {..} = do | ||
227 | signalQSemN vacantPeers 1 | ||
228 | signalQSemN (activeThreads clientSession) 1 | ||
229 | |||
230 | waitVacancy :: SwarmSession -> IO () -> IO () | ||
231 | waitVacancy se = | ||
232 | bracket (enterSwarm se) (const (leaveSwarm se)) | ||
233 | . const | ||
234 | |||
193 | {----------------------------------------------------------------------- | 235 | {----------------------------------------------------------------------- |
194 | Peer session | 236 | Peer session |
195 | -----------------------------------------------------------------------} | 237 | -----------------------------------------------------------------------} |
@@ -263,16 +305,18 @@ sessionError msg | |||
263 | 305 | ||
264 | -- TODO check if it connected yet peer | 306 | -- TODO check if it connected yet peer |
265 | withPeerSession :: SwarmSession -> PeerAddr | 307 | withPeerSession :: SwarmSession -> PeerAddr |
266 | -> ((Socket, PeerSession) -> IO a) | 308 | -> ((Socket, PeerSession) -> IO ()) |
267 | -> IO a | 309 | -> IO () |
268 | 310 | ||
269 | withPeerSession ss @ SwarmSession {..} addr | 311 | withPeerSession ss @ SwarmSession {..} addr |
270 | = bracket openSession closeSession | 312 | = handle isSessionException . bracket openSession closeSession |
271 | where | 313 | where |
272 | openSession = do | 314 | openSession = do |
273 | let caps = encodeExts $ allowedExtensions $ clientSession | 315 | let caps = encodeExts $ allowedExtensions $ clientSession |
274 | let pid = clientPeerID $ clientSession | 316 | let ihash = tInfoHash torrentMeta |
275 | let chs = Handshake defaultBTProtocol caps (tInfoHash torrentMeta) pid | 317 | let pid = clientPeerID $ clientSession |
318 | let chs = Handshake defaultBTProtocol caps ihash pid | ||
319 | |||
276 | sock <- connectToPeer addr | 320 | sock <- connectToPeer addr |
277 | phs <- handshake sock chs `onException` close sock | 321 | phs <- handshake sock chs `onException` close sock |
278 | 322 | ||
@@ -292,7 +336,8 @@ withPeerSession ss @ SwarmSession {..} addr | |||
292 | } | 336 | } |
293 | return (sock, ps) | 337 | return (sock, ps) |
294 | 338 | ||
295 | closeSession = close . fst | 339 | closeSession (sock, _) = do |
340 | close sock | ||
296 | 341 | ||
297 | getPieceCount :: (MonadReader PeerSession m) => m PieceCount | 342 | getPieceCount :: (MonadReader PeerSession m) => m PieceCount |
298 | getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession) | 343 | getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession) |