summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Internal.hs
diff options
context:
space:
mode:
authorSam T <pxqr.sta@gmail.com>2013-06-13 06:32:45 +0400
committerSam T <pxqr.sta@gmail.com>2013-06-13 06:32:45 +0400
commit49caf224c2d51cae2ca2a345e9bcca4368e66701 (patch)
treedf778b5f24f110b38fa8b211a322f681e024cae9 /src/Network/BitTorrent/Internal.hs
parent6042f69d711cddc0bb42457e0d16d45e7b34e431 (diff)
~ Bound count of concurrent sessions.
Diffstat (limited to 'src/Network/BitTorrent/Internal.hs')
-rw-r--r--src/Network/BitTorrent/Internal.hs77
1 files changed, 61 insertions, 16 deletions
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
index 0355c147..5c9efd1f 100644
--- a/src/Network/BitTorrent/Internal.hs
+++ b/src/Network/BitTorrent/Internal.hs
@@ -31,6 +31,7 @@ module Network.BitTorrent.Internal
31 -- * Swarm 31 -- * Swarm
32 , SwarmSession(SwarmSession, torrentMeta, clientSession) 32 , SwarmSession(SwarmSession, torrentMeta, clientSession)
33 , newLeacher, newSeeder 33 , newLeacher, newSeeder
34 , enterSwarm, leaveSwarm , waitVacancy
34 35
35 -- * Peer 36 -- * Peer
36 , PeerSession(PeerSession, connectedPeerAddr 37 , PeerSession(PeerSession, connectedPeerAddr
@@ -83,7 +84,7 @@ import Data.Torrent
83import Network.BitTorrent.Extension 84import Network.BitTorrent.Extension
84import Network.BitTorrent.Peer 85import Network.BitTorrent.Peer
85import Network.BitTorrent.Exchange.Protocol as BT 86import Network.BitTorrent.Exchange.Protocol as BT
86 87import Network.BitTorrent.Tracker.Protocol as BT
87 88
88 89
89-- | 'Progress' contains upload/download/left stats about 90-- | 'Progress' contains upload/download/left stats about
@@ -106,9 +107,10 @@ startProgress = Progress 0 0
106-----------------------------------------------------------------------} 107-----------------------------------------------------------------------}
107 108
108-- | In one application we could have many clients with difference 109-- | In one application we could have many clients with difference
109-- ID's and enabled extensions. 110-- ID's and different enabled extensions.
110data ClientSession = ClientSession { 111data ClientSession = ClientSession {
111 -- | Our peer ID used in handshaked and discovery mechanism. 112 -- | Our peer ID used in handshaked and discovery mechanism. The
113 -- clientPeerID is unique 'ClientSession' identifier.
112 clientPeerID :: PeerID 114 clientPeerID :: PeerID
113 115
114 -- | Extensions we should try to use. Hovewer some particular peer 116 -- | Extensions we should try to use. Hovewer some particular peer
@@ -116,7 +118,14 @@ data ClientSession = ClientSession {
116 -- 'PeerSession'. 118 -- 'PeerSession'.
117 , allowedExtensions :: [Extension] 119 , allowedExtensions :: [Extension]
118 120
121 -- | Semaphor used to bound number of active P2P sessions.
122 , activeThreads :: QSemN
123
124 -- | Max number of active connections.
125 , maxActive :: Int
126
119 , swarmSessions :: TVar (Set SwarmSession) 127 , swarmSessions :: TVar (Set SwarmSession)
128
120 , eventManager :: EventManager 129 , eventManager :: EventManager
121 , currentProgress :: TVar Progress 130 , currentProgress :: TVar Progress
122 } 131 }
@@ -130,8 +139,11 @@ instance Ord ClientSession where
130getCurrentProgress :: MonadIO m => ClientSession -> m Progress 139getCurrentProgress :: MonadIO m => ClientSession -> m Progress
131getCurrentProgress = liftIO . readTVarIO . currentProgress 140getCurrentProgress = liftIO . readTVarIO . currentProgress
132 141
133newClient :: [Extension] -> IO ClientSession 142newClient :: Int -- ^ Maximum count of active P2P Sessions.
134newClient exts = do 143 -> [Extension] -- ^ Extensions allowed to use.
144 -> IO ClientSession
145
146newClient n exts = do
135 mgr <- Ev.new 147 mgr <- Ev.new
136 -- TODO kill this thread when leave client 148 -- TODO kill this thread when leave client
137 _ <- forkIO $ loop mgr 149 _ <- forkIO $ loop mgr
@@ -139,6 +151,8 @@ newClient exts = do
139 ClientSession 151 ClientSession
140 <$> newPeerID 152 <$> newPeerID
141 <*> pure exts 153 <*> pure exts
154 <*> newQSemN n
155 <*> pure n
142 <*> newTVarIO S.empty 156 <*> newTVarIO S.empty
143 <*> pure mgr 157 <*> pure mgr
144 <*> newTVarIO (startProgress 0) 158 <*> newTVarIO (startProgress 0)
@@ -153,6 +167,10 @@ data SwarmSession = SwarmSession {
153 torrentMeta :: Torrent 167 torrentMeta :: Torrent
154 , clientSession :: ClientSession 168 , clientSession :: ClientSession
155 169
170 -- | Represent count of peers we _currently_ can connect to in the
171 -- swarm. Used to bound number of concurrent threads.
172 , vacantPeers :: QSemN
173
156 -- | Modify this carefully updating global progress. 174 -- | Modify this carefully updating global progress.
157 , clientBitfield :: TVar Bitfield 175 , clientBitfield :: TVar Bitfield
158 , connectedPeers :: TVar (Set PeerSession) 176 , connectedPeers :: TVar (Set PeerSession)
@@ -164,20 +182,28 @@ instance Eq SwarmSession where
164instance Ord SwarmSession where 182instance Ord SwarmSession where
165 compare = comparing (tInfoHash . torrentMeta) 183 compare = comparing (tInfoHash . torrentMeta)
166 184
167newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession 185newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
168newSwarmSession bf cs @ ClientSession {..} t @ Torrent {..} 186 -> IO SwarmSession
187newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
169 = SwarmSession <$> pure t 188 = SwarmSession <$> pure t
170 <*> pure cs 189 <*> pure cs
190 <*> newQSemN n
171 <*> newTVarIO bf 191 <*> newTVarIO bf
172 <*> newTVarIO S.empty 192 <*> newTVarIO S.empty
173 193
174newSeeder :: ClientSession -> Torrent -> IO SwarmSession 194newSeeder :: ClientSession -> Torrent -> IO SwarmSession
175newSeeder cs t @ Torrent {..} 195newSeeder cs t @ Torrent {..}
176 = newSwarmSession (haveAll (pieceCount tInfo)) cs t 196 = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
177 197
178newLeacher :: ClientSession -> Torrent -> IO SwarmSession 198newLeacher :: ClientSession -> Torrent -> IO SwarmSession
179newLeacher cs t @ Torrent {..} 199newLeacher cs t @ Torrent {..}
180 = newSwarmSession (haveNone (pieceCount tInfo)) cs t 200 = newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
201
202defSeederConns :: Int
203defSeederConns = defaultUnchokeSlots
204
205defLeacherConns :: Int
206defLeacherConns = defaultNumWant
181 207
182--isLeacher :: SwarmSession -> IO Bool 208--isLeacher :: SwarmSession -> IO Bool
183--isLeacher = undefined 209--isLeacher = undefined
@@ -190,6 +216,22 @@ haveDone ix =
190 writeTVar (have ix bf) 216 writeTVar (have ix bf)
191 currentProgress 217 currentProgress
192-} 218-}
219
220enterSwarm :: SwarmSession -> IO ()
221enterSwarm SwarmSession {..} = do
222 waitQSemN (activeThreads clientSession) 1
223 waitQSemN vacantPeers 1
224
225leaveSwarm :: SwarmSession -> IO ()
226leaveSwarm SwarmSession {..} = do
227 signalQSemN vacantPeers 1
228 signalQSemN (activeThreads clientSession) 1
229
230waitVacancy :: SwarmSession -> IO () -> IO ()
231waitVacancy se =
232 bracket (enterSwarm se) (const (leaveSwarm se))
233 . const
234
193{----------------------------------------------------------------------- 235{-----------------------------------------------------------------------
194 Peer session 236 Peer session
195-----------------------------------------------------------------------} 237-----------------------------------------------------------------------}
@@ -263,16 +305,18 @@ sessionError msg
263 305
264-- TODO check if it connected yet peer 306-- TODO check if it connected yet peer
265withPeerSession :: SwarmSession -> PeerAddr 307withPeerSession :: SwarmSession -> PeerAddr
266 -> ((Socket, PeerSession) -> IO a) 308 -> ((Socket, PeerSession) -> IO ())
267 -> IO a 309 -> IO ()
268 310
269withPeerSession ss @ SwarmSession {..} addr 311withPeerSession ss @ SwarmSession {..} addr
270 = bracket openSession closeSession 312 = handle isSessionException . bracket openSession closeSession
271 where 313 where
272 openSession = do 314 openSession = do
273 let caps = encodeExts $ allowedExtensions $ clientSession 315 let caps = encodeExts $ allowedExtensions $ clientSession
274 let pid = clientPeerID $ clientSession 316 let ihash = tInfoHash torrentMeta
275 let chs = Handshake defaultBTProtocol caps (tInfoHash torrentMeta) pid 317 let pid = clientPeerID $ clientSession
318 let chs = Handshake defaultBTProtocol caps ihash pid
319
276 sock <- connectToPeer addr 320 sock <- connectToPeer addr
277 phs <- handshake sock chs `onException` close sock 321 phs <- handshake sock chs `onException` close sock
278 322
@@ -292,7 +336,8 @@ withPeerSession ss @ SwarmSession {..} addr
292 } 336 }
293 return (sock, ps) 337 return (sock, ps)
294 338
295 closeSession = close . fst 339 closeSession (sock, _) = do
340 close sock
296 341
297getPieceCount :: (MonadReader PeerSession m) => m PieceCount 342getPieceCount :: (MonadReader PeerSession m) => m PieceCount
298getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession) 343getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession)