diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent.hs | 17 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 37 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.hs | 77 |
3 files changed, 96 insertions, 35 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index 85571470..b9dc39eb 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs | |||
@@ -29,8 +29,10 @@ module Network.BitTorrent | |||
29 | , awaitEvent, yieldEvent | 29 | , awaitEvent, yieldEvent |
30 | ) where | 30 | ) where |
31 | 31 | ||
32 | import Control.Concurrent | ||
32 | import Control.Exception | 33 | import Control.Exception |
33 | import Control.Monad | 34 | import Control.Monad |
35 | import Control.Monad.Reader | ||
34 | 36 | ||
35 | import Network | 37 | import Network |
36 | 38 | ||
@@ -41,9 +43,8 @@ import Network.BitTorrent.Exchange.Protocol | |||
41 | import Network.BitTorrent.Tracker | 43 | import Network.BitTorrent.Tracker |
42 | 44 | ||
43 | 45 | ||
44 | |||
45 | -- discover should hide tracker and DHT communication under the hood | 46 | -- discover should hide tracker and DHT communication under the hood |
46 | -- thus we can obtain unified interface | 47 | -- thus we can obtain an unified interface |
47 | 48 | ||
48 | discover :: SwarmSession -> P2P () -> IO () | 49 | discover :: SwarmSession -> P2P () -> IO () |
49 | discover swarm action = do | 50 | discover swarm action = do |
@@ -58,14 +59,14 @@ discover swarm action = do | |||
58 | 59 | ||
59 | putStrLn "lookup peers" | 60 | putStrLn "lookup peers" |
60 | withTracker progress conn $ \tses -> do | 61 | withTracker progress conn $ \tses -> do |
62 | putStrLn "get peer list " | ||
61 | forever $ do | 63 | forever $ do |
62 | addr <- getPeerAddr tses | 64 | addr <- getPeerAddr tses |
63 | putStrLn "connecting to peer" | 65 | putStrLn "connect to peer" |
64 | handle handler (withPeer swarm addr action) | 66 | spawnP2P swarm addr $ do |
65 | 67 | liftIO $ putStrLn "run p2p session" | |
66 | where | 68 | action |
67 | handler :: IOException -> IO () | 69 | putStrLn "connected" |
68 | handler _ = return () | ||
69 | 70 | ||
70 | listener :: SwarmSession -> P2P () -> IO PortNumber | 71 | listener :: SwarmSession -> P2P () -> IO PortNumber |
71 | listener _ _ = do | 72 | listener _ _ = do |
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 3235a626..978e86db 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -17,12 +17,14 @@ module Network.BitTorrent.Exchange | |||
17 | -- * Event | 17 | -- * Event |
18 | , Event(..) | 18 | , Event(..) |
19 | 19 | ||
20 | , P2P, withPeer | 20 | , P2P |
21 | , runP2P, spawnP2P | ||
21 | , awaitEvent, yieldEvent | 22 | , awaitEvent, yieldEvent |
22 | ) where | 23 | ) where |
23 | 24 | ||
24 | import Control.Applicative | 25 | import Control.Applicative |
25 | import Control.Exception | 26 | import Control.Exception |
27 | import Control.Concurrent | ||
26 | import Control.Lens | 28 | import Control.Lens |
27 | import Control.Monad.Reader | 29 | import Control.Monad.Reader |
28 | import Control.Monad.Trans.Resource | 30 | import Control.Monad.Trans.Resource |
@@ -54,8 +56,8 @@ data Event = Available Bitfield | |||
54 | 56 | ||
55 | type PeerWire = ConduitM Message Message IO | 57 | type PeerWire = ConduitM Message Message IO |
56 | 58 | ||
57 | runConduit :: Socket -> PeerWire () -> IO () | 59 | runPeerWire :: Socket -> PeerWire () -> IO () |
58 | runConduit sock p2p = | 60 | runPeerWire sock p2p = |
59 | sourceSocket sock $= | 61 | sourceSocket sock $= |
60 | conduitGet S.get $= | 62 | conduitGet S.get $= |
61 | forever p2p $= | 63 | forever p2p $= |
@@ -81,7 +83,6 @@ yieldMessage msg = P2P $ ReaderT $ \se -> do | |||
81 | liftIO $ print $ "sent:" <+> ppMessage msg | 83 | liftIO $ print $ "sent:" <+> ppMessage msg |
82 | liftIO $ updateOutcoming se | 84 | liftIO $ updateOutcoming se |
83 | 85 | ||
84 | |||
85 | peerWant :: P2P Bitfield | 86 | peerWant :: P2P Bitfield |
86 | peerWant = BF.difference <$> getClientBF <*> use bitfield | 87 | peerWant = BF.difference <$> getClientBF <*> use bitfield |
87 | 88 | ||
@@ -272,18 +273,32 @@ checkPiece = undefined | |||
272 | -- | | 273 | -- | |
273 | -- Exceptions: | 274 | -- Exceptions: |
274 | -- | 275 | -- |
275 | -- * SessionException: is visible with one peer session. Use this | 276 | -- * SessionException: is visible only within one peer |
276 | -- exception to terminate P2P session, but not the swarm session. | 277 | -- session. Use this exception to terminate P2P session, but not |
278 | -- the swarm session. | ||
277 | -- | 279 | -- |
278 | newtype P2P a = P2P { | 280 | newtype P2P a = P2P { |
279 | runP2P :: ReaderT PeerSession PeerWire a | 281 | unP2P :: ReaderT PeerSession PeerWire a |
280 | } deriving ( Functor, Applicative, Monad | 282 | } deriving ( Functor, Applicative, Monad |
281 | , MonadIO, MonadThrow, MonadActive | 283 | , MonadIO, MonadThrow, MonadActive |
282 | , MonadReader PeerSession | 284 | , MonadReader PeerSession |
283 | ) | 285 | ) |
284 | 286 | ||
285 | withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () | 287 | runSession :: SwarmSession -> PeerAddr -> P2P () -> IO () |
286 | withPeer se addr p2p = | 288 | runSession se addr p2p = |
287 | withPeerSession se addr $ \(sock, pses) -> do | 289 | withPeerSession se addr $ \(sock, pses) -> do |
288 | handle putSessionException $ | 290 | runPeerWire sock (runReaderT (unP2P p2p) pses) |
289 | runConduit sock (runReaderT (runP2P p2p) pses) | 291 | |
292 | -- | Run P2P session in the current thread. Normally you don't need this | ||
293 | -- function in client application. | ||
294 | runP2P :: SwarmSession -> PeerAddr -> P2P () -> IO () | ||
295 | runP2P se addr p2p = waitVacancy se $ runSession se addr p2p | ||
296 | |||
297 | -- | Run P2P session in forked thread. Might be used in listener or | ||
298 | -- some other loop. Note that this function may block while waiting | ||
299 | -- for a vacant place: use forkIO and runP2P instead. | ||
300 | spawnP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ThreadId | ||
301 | spawnP2P se addr p2p = do | ||
302 | enterSwarm se | ||
303 | forkIO $ do | ||
304 | runSession se addr p2p `finally` leaveSwarm se | ||
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs index 0355c147..5c9efd1f 100644 --- a/src/Network/BitTorrent/Internal.hs +++ b/src/Network/BitTorrent/Internal.hs | |||
@@ -31,6 +31,7 @@ module Network.BitTorrent.Internal | |||
31 | -- * Swarm | 31 | -- * Swarm |
32 | , SwarmSession(SwarmSession, torrentMeta, clientSession) | 32 | , SwarmSession(SwarmSession, torrentMeta, clientSession) |
33 | , newLeacher, newSeeder | 33 | , newLeacher, newSeeder |
34 | , enterSwarm, leaveSwarm , waitVacancy | ||
34 | 35 | ||
35 | -- * Peer | 36 | -- * Peer |
36 | , PeerSession(PeerSession, connectedPeerAddr | 37 | , PeerSession(PeerSession, connectedPeerAddr |
@@ -83,7 +84,7 @@ import Data.Torrent | |||
83 | import Network.BitTorrent.Extension | 84 | import Network.BitTorrent.Extension |
84 | import Network.BitTorrent.Peer | 85 | import Network.BitTorrent.Peer |
85 | import Network.BitTorrent.Exchange.Protocol as BT | 86 | import Network.BitTorrent.Exchange.Protocol as BT |
86 | 87 | import Network.BitTorrent.Tracker.Protocol as BT | |
87 | 88 | ||
88 | 89 | ||
89 | -- | 'Progress' contains upload/download/left stats about | 90 | -- | 'Progress' contains upload/download/left stats about |
@@ -106,9 +107,10 @@ startProgress = Progress 0 0 | |||
106 | -----------------------------------------------------------------------} | 107 | -----------------------------------------------------------------------} |
107 | 108 | ||
108 | -- | In one application we could have many clients with difference | 109 | -- | In one application we could have many clients with difference |
109 | -- ID's and enabled extensions. | 110 | -- ID's and different enabled extensions. |
110 | data ClientSession = ClientSession { | 111 | data ClientSession = ClientSession { |
111 | -- | Our peer ID used in handshaked and discovery mechanism. | 112 | -- | Our peer ID used in handshaked and discovery mechanism. The |
113 | -- clientPeerID is unique 'ClientSession' identifier. | ||
112 | clientPeerID :: PeerID | 114 | clientPeerID :: PeerID |
113 | 115 | ||
114 | -- | Extensions we should try to use. Hovewer some particular peer | 116 | -- | Extensions we should try to use. Hovewer some particular peer |
@@ -116,7 +118,14 @@ data ClientSession = ClientSession { | |||
116 | -- 'PeerSession'. | 118 | -- 'PeerSession'. |
117 | , allowedExtensions :: [Extension] | 119 | , allowedExtensions :: [Extension] |
118 | 120 | ||
121 | -- | Semaphor used to bound number of active P2P sessions. | ||
122 | , activeThreads :: QSemN | ||
123 | |||
124 | -- | Max number of active connections. | ||
125 | , maxActive :: Int | ||
126 | |||
119 | , swarmSessions :: TVar (Set SwarmSession) | 127 | , swarmSessions :: TVar (Set SwarmSession) |
128 | |||
120 | , eventManager :: EventManager | 129 | , eventManager :: EventManager |
121 | , currentProgress :: TVar Progress | 130 | , currentProgress :: TVar Progress |
122 | } | 131 | } |
@@ -130,8 +139,11 @@ instance Ord ClientSession where | |||
130 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress | 139 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress |
131 | getCurrentProgress = liftIO . readTVarIO . currentProgress | 140 | getCurrentProgress = liftIO . readTVarIO . currentProgress |
132 | 141 | ||
133 | newClient :: [Extension] -> IO ClientSession | 142 | newClient :: Int -- ^ Maximum count of active P2P Sessions. |
134 | newClient exts = do | 143 | -> [Extension] -- ^ Extensions allowed to use. |
144 | -> IO ClientSession | ||
145 | |||
146 | newClient n exts = do | ||
135 | mgr <- Ev.new | 147 | mgr <- Ev.new |
136 | -- TODO kill this thread when leave client | 148 | -- TODO kill this thread when leave client |
137 | _ <- forkIO $ loop mgr | 149 | _ <- forkIO $ loop mgr |
@@ -139,6 +151,8 @@ newClient exts = do | |||
139 | ClientSession | 151 | ClientSession |
140 | <$> newPeerID | 152 | <$> newPeerID |
141 | <*> pure exts | 153 | <*> pure exts |
154 | <*> newQSemN n | ||
155 | <*> pure n | ||
142 | <*> newTVarIO S.empty | 156 | <*> newTVarIO S.empty |
143 | <*> pure mgr | 157 | <*> pure mgr |
144 | <*> newTVarIO (startProgress 0) | 158 | <*> newTVarIO (startProgress 0) |
@@ -153,6 +167,10 @@ data SwarmSession = SwarmSession { | |||
153 | torrentMeta :: Torrent | 167 | torrentMeta :: Torrent |
154 | , clientSession :: ClientSession | 168 | , clientSession :: ClientSession |
155 | 169 | ||
170 | -- | Represent count of peers we _currently_ can connect to in the | ||
171 | -- swarm. Used to bound number of concurrent threads. | ||
172 | , vacantPeers :: QSemN | ||
173 | |||
156 | -- | Modify this carefully updating global progress. | 174 | -- | Modify this carefully updating global progress. |
157 | , clientBitfield :: TVar Bitfield | 175 | , clientBitfield :: TVar Bitfield |
158 | , connectedPeers :: TVar (Set PeerSession) | 176 | , connectedPeers :: TVar (Set PeerSession) |
@@ -164,20 +182,28 @@ instance Eq SwarmSession where | |||
164 | instance Ord SwarmSession where | 182 | instance Ord SwarmSession where |
165 | compare = comparing (tInfoHash . torrentMeta) | 183 | compare = comparing (tInfoHash . torrentMeta) |
166 | 184 | ||
167 | newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession | 185 | newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent |
168 | newSwarmSession bf cs @ ClientSession {..} t @ Torrent {..} | 186 | -> IO SwarmSession |
187 | newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
169 | = SwarmSession <$> pure t | 188 | = SwarmSession <$> pure t |
170 | <*> pure cs | 189 | <*> pure cs |
190 | <*> newQSemN n | ||
171 | <*> newTVarIO bf | 191 | <*> newTVarIO bf |
172 | <*> newTVarIO S.empty | 192 | <*> newTVarIO S.empty |
173 | 193 | ||
174 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | 194 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession |
175 | newSeeder cs t @ Torrent {..} | 195 | newSeeder cs t @ Torrent {..} |
176 | = newSwarmSession (haveAll (pieceCount tInfo)) cs t | 196 | = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t |
177 | 197 | ||
178 | newLeacher :: ClientSession -> Torrent -> IO SwarmSession | 198 | newLeacher :: ClientSession -> Torrent -> IO SwarmSession |
179 | newLeacher cs t @ Torrent {..} | 199 | newLeacher cs t @ Torrent {..} |
180 | = newSwarmSession (haveNone (pieceCount tInfo)) cs t | 200 | = newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t |
201 | |||
202 | defSeederConns :: Int | ||
203 | defSeederConns = defaultUnchokeSlots | ||
204 | |||
205 | defLeacherConns :: Int | ||
206 | defLeacherConns = defaultNumWant | ||
181 | 207 | ||
182 | --isLeacher :: SwarmSession -> IO Bool | 208 | --isLeacher :: SwarmSession -> IO Bool |
183 | --isLeacher = undefined | 209 | --isLeacher = undefined |
@@ -190,6 +216,22 @@ haveDone ix = | |||
190 | writeTVar (have ix bf) | 216 | writeTVar (have ix bf) |
191 | currentProgress | 217 | currentProgress |
192 | -} | 218 | -} |
219 | |||
220 | enterSwarm :: SwarmSession -> IO () | ||
221 | enterSwarm SwarmSession {..} = do | ||
222 | waitQSemN (activeThreads clientSession) 1 | ||
223 | waitQSemN vacantPeers 1 | ||
224 | |||
225 | leaveSwarm :: SwarmSession -> IO () | ||
226 | leaveSwarm SwarmSession {..} = do | ||
227 | signalQSemN vacantPeers 1 | ||
228 | signalQSemN (activeThreads clientSession) 1 | ||
229 | |||
230 | waitVacancy :: SwarmSession -> IO () -> IO () | ||
231 | waitVacancy se = | ||
232 | bracket (enterSwarm se) (const (leaveSwarm se)) | ||
233 | . const | ||
234 | |||
193 | {----------------------------------------------------------------------- | 235 | {----------------------------------------------------------------------- |
194 | Peer session | 236 | Peer session |
195 | -----------------------------------------------------------------------} | 237 | -----------------------------------------------------------------------} |
@@ -263,16 +305,18 @@ sessionError msg | |||
263 | 305 | ||
264 | -- TODO check if it connected yet peer | 306 | -- TODO check if it connected yet peer |
265 | withPeerSession :: SwarmSession -> PeerAddr | 307 | withPeerSession :: SwarmSession -> PeerAddr |
266 | -> ((Socket, PeerSession) -> IO a) | 308 | -> ((Socket, PeerSession) -> IO ()) |
267 | -> IO a | 309 | -> IO () |
268 | 310 | ||
269 | withPeerSession ss @ SwarmSession {..} addr | 311 | withPeerSession ss @ SwarmSession {..} addr |
270 | = bracket openSession closeSession | 312 | = handle isSessionException . bracket openSession closeSession |
271 | where | 313 | where |
272 | openSession = do | 314 | openSession = do |
273 | let caps = encodeExts $ allowedExtensions $ clientSession | 315 | let caps = encodeExts $ allowedExtensions $ clientSession |
274 | let pid = clientPeerID $ clientSession | 316 | let ihash = tInfoHash torrentMeta |
275 | let chs = Handshake defaultBTProtocol caps (tInfoHash torrentMeta) pid | 317 | let pid = clientPeerID $ clientSession |
318 | let chs = Handshake defaultBTProtocol caps ihash pid | ||
319 | |||
276 | sock <- connectToPeer addr | 320 | sock <- connectToPeer addr |
277 | phs <- handshake sock chs `onException` close sock | 321 | phs <- handshake sock chs `onException` close sock |
278 | 322 | ||
@@ -292,7 +336,8 @@ withPeerSession ss @ SwarmSession {..} addr | |||
292 | } | 336 | } |
293 | return (sock, ps) | 337 | return (sock, ps) |
294 | 338 | ||
295 | closeSession = close . fst | 339 | closeSession (sock, _) = do |
340 | close sock | ||
296 | 341 | ||
297 | getPieceCount :: (MonadReader PeerSession m) => m PieceCount | 342 | getPieceCount :: (MonadReader PeerSession m) => m PieceCount |
298 | getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession) | 343 | getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession) |