summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exsamples/Main.hs7
-rw-r--r--src/Network/BitTorrent.hs17
-rw-r--r--src/Network/BitTorrent/Exchange.hs37
-rw-r--r--src/Network/BitTorrent/Internal.hs77
4 files changed, 99 insertions, 39 deletions
diff --git a/exsamples/Main.hs b/exsamples/Main.hs
index a094b87a..2c879378 100644
--- a/exsamples/Main.hs
+++ b/exsamples/Main.hs
@@ -15,12 +15,12 @@ main = do
15 [path] <- getArgs 15 [path] <- getArgs
16 torrent <- fromFile path 16 torrent <- fromFile path
17 17
18 client <- newClient [] 18 client <- newClient 1 []
19 swarm <- newLeacher client torrent 19 swarm <- newLeacher client torrent
20 20
21 ref <- newIORef 0
22
23 discover swarm $ do 21 discover swarm $ do
22 ref <- liftIO $ newIORef 0
23
24 addr <- asks connectedPeerAddr 24 addr <- asks connectedPeerAddr
25 liftIO $ print $ "connected to" ++ show addr 25 liftIO $ print $ "connected to" ++ show addr
26 26
@@ -36,7 +36,6 @@ main = do
36 liftIO $ do 36 liftIO $ do
37 readIORef ref >>= print 37 readIORef ref >>= print
38 modifyIORef ref succ 38 modifyIORef ref succ
39 print (ppBlock blk)
40 39
41 yieldEvent (Want (BlockIx 0 0 (16 * 1024))) 40 yieldEvent (Want (BlockIx 0 0 (16 * 1024)))
42 41
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs
index 85571470..b9dc39eb 100644
--- a/src/Network/BitTorrent.hs
+++ b/src/Network/BitTorrent.hs
@@ -29,8 +29,10 @@ module Network.BitTorrent
29 , awaitEvent, yieldEvent 29 , awaitEvent, yieldEvent
30 ) where 30 ) where
31 31
32import Control.Concurrent
32import Control.Exception 33import Control.Exception
33import Control.Monad 34import Control.Monad
35import Control.Monad.Reader
34 36
35import Network 37import Network
36 38
@@ -41,9 +43,8 @@ import Network.BitTorrent.Exchange.Protocol
41import Network.BitTorrent.Tracker 43import Network.BitTorrent.Tracker
42 44
43 45
44
45-- discover should hide tracker and DHT communication under the hood 46-- discover should hide tracker and DHT communication under the hood
46-- thus we can obtain unified interface 47-- thus we can obtain an unified interface
47 48
48discover :: SwarmSession -> P2P () -> IO () 49discover :: SwarmSession -> P2P () -> IO ()
49discover swarm action = do 50discover swarm action = do
@@ -58,14 +59,14 @@ discover swarm action = do
58 59
59 putStrLn "lookup peers" 60 putStrLn "lookup peers"
60 withTracker progress conn $ \tses -> do 61 withTracker progress conn $ \tses -> do
62 putStrLn "get peer list "
61 forever $ do 63 forever $ do
62 addr <- getPeerAddr tses 64 addr <- getPeerAddr tses
63 putStrLn "connecting to peer" 65 putStrLn "connect to peer"
64 handle handler (withPeer swarm addr action) 66 spawnP2P swarm addr $ do
65 67 liftIO $ putStrLn "run p2p session"
66 where 68 action
67 handler :: IOException -> IO () 69 putStrLn "connected"
68 handler _ = return ()
69 70
70listener :: SwarmSession -> P2P () -> IO PortNumber 71listener :: SwarmSession -> P2P () -> IO PortNumber
71listener _ _ = do 72listener _ _ = do
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 3235a626..978e86db 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -17,12 +17,14 @@ module Network.BitTorrent.Exchange
17 -- * Event 17 -- * Event
18 , Event(..) 18 , Event(..)
19 19
20 , P2P, withPeer 20 , P2P
21 , runP2P, spawnP2P
21 , awaitEvent, yieldEvent 22 , awaitEvent, yieldEvent
22 ) where 23 ) where
23 24
24import Control.Applicative 25import Control.Applicative
25import Control.Exception 26import Control.Exception
27import Control.Concurrent
26import Control.Lens 28import Control.Lens
27import Control.Monad.Reader 29import Control.Monad.Reader
28import Control.Monad.Trans.Resource 30import Control.Monad.Trans.Resource
@@ -54,8 +56,8 @@ data Event = Available Bitfield
54 56
55type PeerWire = ConduitM Message Message IO 57type PeerWire = ConduitM Message Message IO
56 58
57runConduit :: Socket -> PeerWire () -> IO () 59runPeerWire :: Socket -> PeerWire () -> IO ()
58runConduit sock p2p = 60runPeerWire sock p2p =
59 sourceSocket sock $= 61 sourceSocket sock $=
60 conduitGet S.get $= 62 conduitGet S.get $=
61 forever p2p $= 63 forever p2p $=
@@ -81,7 +83,6 @@ yieldMessage msg = P2P $ ReaderT $ \se -> do
81 liftIO $ print $ "sent:" <+> ppMessage msg 83 liftIO $ print $ "sent:" <+> ppMessage msg
82 liftIO $ updateOutcoming se 84 liftIO $ updateOutcoming se
83 85
84
85peerWant :: P2P Bitfield 86peerWant :: P2P Bitfield
86peerWant = BF.difference <$> getClientBF <*> use bitfield 87peerWant = BF.difference <$> getClientBF <*> use bitfield
87 88
@@ -272,18 +273,32 @@ checkPiece = undefined
272-- | 273-- |
273-- Exceptions: 274-- Exceptions:
274-- 275--
275-- * SessionException: is visible with one peer session. Use this 276-- * SessionException: is visible only within one peer
276-- exception to terminate P2P session, but not the swarm session. 277-- session. Use this exception to terminate P2P session, but not
278-- the swarm session.
277-- 279--
278newtype P2P a = P2P { 280newtype P2P a = P2P {
279 runP2P :: ReaderT PeerSession PeerWire a 281 unP2P :: ReaderT PeerSession PeerWire a
280 } deriving ( Functor, Applicative, Monad 282 } deriving ( Functor, Applicative, Monad
281 , MonadIO, MonadThrow, MonadActive 283 , MonadIO, MonadThrow, MonadActive
282 , MonadReader PeerSession 284 , MonadReader PeerSession
283 ) 285 )
284 286
285withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () 287runSession :: SwarmSession -> PeerAddr -> P2P () -> IO ()
286withPeer se addr p2p = 288runSession se addr p2p =
287 withPeerSession se addr $ \(sock, pses) -> do 289 withPeerSession se addr $ \(sock, pses) -> do
288 handle putSessionException $ 290 runPeerWire sock (runReaderT (unP2P p2p) pses)
289 runConduit sock (runReaderT (runP2P p2p) pses) 291
292-- | Run P2P session in the current thread. Normally you don't need this
293-- function in client application.
294runP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ()
295runP2P se addr p2p = waitVacancy se $ runSession se addr p2p
296
297-- | Run P2P session in forked thread. Might be used in listener or
298-- some other loop. Note that this function may block while waiting
299-- for a vacant place: use forkIO and runP2P instead.
300spawnP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ThreadId
301spawnP2P se addr p2p = do
302 enterSwarm se
303 forkIO $ do
304 runSession se addr p2p `finally` leaveSwarm se
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
index 0355c147..5c9efd1f 100644
--- a/src/Network/BitTorrent/Internal.hs
+++ b/src/Network/BitTorrent/Internal.hs
@@ -31,6 +31,7 @@ module Network.BitTorrent.Internal
31 -- * Swarm 31 -- * Swarm
32 , SwarmSession(SwarmSession, torrentMeta, clientSession) 32 , SwarmSession(SwarmSession, torrentMeta, clientSession)
33 , newLeacher, newSeeder 33 , newLeacher, newSeeder
34 , enterSwarm, leaveSwarm , waitVacancy
34 35
35 -- * Peer 36 -- * Peer
36 , PeerSession(PeerSession, connectedPeerAddr 37 , PeerSession(PeerSession, connectedPeerAddr
@@ -83,7 +84,7 @@ import Data.Torrent
83import Network.BitTorrent.Extension 84import Network.BitTorrent.Extension
84import Network.BitTorrent.Peer 85import Network.BitTorrent.Peer
85import Network.BitTorrent.Exchange.Protocol as BT 86import Network.BitTorrent.Exchange.Protocol as BT
86 87import Network.BitTorrent.Tracker.Protocol as BT
87 88
88 89
89-- | 'Progress' contains upload/download/left stats about 90-- | 'Progress' contains upload/download/left stats about
@@ -106,9 +107,10 @@ startProgress = Progress 0 0
106-----------------------------------------------------------------------} 107-----------------------------------------------------------------------}
107 108
108-- | In one application we could have many clients with difference 109-- | In one application we could have many clients with difference
109-- ID's and enabled extensions. 110-- ID's and different enabled extensions.
110data ClientSession = ClientSession { 111data ClientSession = ClientSession {
111 -- | Our peer ID used in handshaked and discovery mechanism. 112 -- | Our peer ID used in handshaked and discovery mechanism. The
113 -- clientPeerID is unique 'ClientSession' identifier.
112 clientPeerID :: PeerID 114 clientPeerID :: PeerID
113 115
114 -- | Extensions we should try to use. Hovewer some particular peer 116 -- | Extensions we should try to use. Hovewer some particular peer
@@ -116,7 +118,14 @@ data ClientSession = ClientSession {
116 -- 'PeerSession'. 118 -- 'PeerSession'.
117 , allowedExtensions :: [Extension] 119 , allowedExtensions :: [Extension]
118 120
121 -- | Semaphor used to bound number of active P2P sessions.
122 , activeThreads :: QSemN
123
124 -- | Max number of active connections.
125 , maxActive :: Int
126
119 , swarmSessions :: TVar (Set SwarmSession) 127 , swarmSessions :: TVar (Set SwarmSession)
128
120 , eventManager :: EventManager 129 , eventManager :: EventManager
121 , currentProgress :: TVar Progress 130 , currentProgress :: TVar Progress
122 } 131 }
@@ -130,8 +139,11 @@ instance Ord ClientSession where
130getCurrentProgress :: MonadIO m => ClientSession -> m Progress 139getCurrentProgress :: MonadIO m => ClientSession -> m Progress
131getCurrentProgress = liftIO . readTVarIO . currentProgress 140getCurrentProgress = liftIO . readTVarIO . currentProgress
132 141
133newClient :: [Extension] -> IO ClientSession 142newClient :: Int -- ^ Maximum count of active P2P Sessions.
134newClient exts = do 143 -> [Extension] -- ^ Extensions allowed to use.
144 -> IO ClientSession
145
146newClient n exts = do
135 mgr <- Ev.new 147 mgr <- Ev.new
136 -- TODO kill this thread when leave client 148 -- TODO kill this thread when leave client
137 _ <- forkIO $ loop mgr 149 _ <- forkIO $ loop mgr
@@ -139,6 +151,8 @@ newClient exts = do
139 ClientSession 151 ClientSession
140 <$> newPeerID 152 <$> newPeerID
141 <*> pure exts 153 <*> pure exts
154 <*> newQSemN n
155 <*> pure n
142 <*> newTVarIO S.empty 156 <*> newTVarIO S.empty
143 <*> pure mgr 157 <*> pure mgr
144 <*> newTVarIO (startProgress 0) 158 <*> newTVarIO (startProgress 0)
@@ -153,6 +167,10 @@ data SwarmSession = SwarmSession {
153 torrentMeta :: Torrent 167 torrentMeta :: Torrent
154 , clientSession :: ClientSession 168 , clientSession :: ClientSession
155 169
170 -- | Represent count of peers we _currently_ can connect to in the
171 -- swarm. Used to bound number of concurrent threads.
172 , vacantPeers :: QSemN
173
156 -- | Modify this carefully updating global progress. 174 -- | Modify this carefully updating global progress.
157 , clientBitfield :: TVar Bitfield 175 , clientBitfield :: TVar Bitfield
158 , connectedPeers :: TVar (Set PeerSession) 176 , connectedPeers :: TVar (Set PeerSession)
@@ -164,20 +182,28 @@ instance Eq SwarmSession where
164instance Ord SwarmSession where 182instance Ord SwarmSession where
165 compare = comparing (tInfoHash . torrentMeta) 183 compare = comparing (tInfoHash . torrentMeta)
166 184
167newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession 185newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
168newSwarmSession bf cs @ ClientSession {..} t @ Torrent {..} 186 -> IO SwarmSession
187newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
169 = SwarmSession <$> pure t 188 = SwarmSession <$> pure t
170 <*> pure cs 189 <*> pure cs
190 <*> newQSemN n
171 <*> newTVarIO bf 191 <*> newTVarIO bf
172 <*> newTVarIO S.empty 192 <*> newTVarIO S.empty
173 193
174newSeeder :: ClientSession -> Torrent -> IO SwarmSession 194newSeeder :: ClientSession -> Torrent -> IO SwarmSession
175newSeeder cs t @ Torrent {..} 195newSeeder cs t @ Torrent {..}
176 = newSwarmSession (haveAll (pieceCount tInfo)) cs t 196 = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
177 197
178newLeacher :: ClientSession -> Torrent -> IO SwarmSession 198newLeacher :: ClientSession -> Torrent -> IO SwarmSession
179newLeacher cs t @ Torrent {..} 199newLeacher cs t @ Torrent {..}
180 = newSwarmSession (haveNone (pieceCount tInfo)) cs t 200 = newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
201
202defSeederConns :: Int
203defSeederConns = defaultUnchokeSlots
204
205defLeacherConns :: Int
206defLeacherConns = defaultNumWant
181 207
182--isLeacher :: SwarmSession -> IO Bool 208--isLeacher :: SwarmSession -> IO Bool
183--isLeacher = undefined 209--isLeacher = undefined
@@ -190,6 +216,22 @@ haveDone ix =
190 writeTVar (have ix bf) 216 writeTVar (have ix bf)
191 currentProgress 217 currentProgress
192-} 218-}
219
220enterSwarm :: SwarmSession -> IO ()
221enterSwarm SwarmSession {..} = do
222 waitQSemN (activeThreads clientSession) 1
223 waitQSemN vacantPeers 1
224
225leaveSwarm :: SwarmSession -> IO ()
226leaveSwarm SwarmSession {..} = do
227 signalQSemN vacantPeers 1
228 signalQSemN (activeThreads clientSession) 1
229
230waitVacancy :: SwarmSession -> IO () -> IO ()
231waitVacancy se =
232 bracket (enterSwarm se) (const (leaveSwarm se))
233 . const
234
193{----------------------------------------------------------------------- 235{-----------------------------------------------------------------------
194 Peer session 236 Peer session
195-----------------------------------------------------------------------} 237-----------------------------------------------------------------------}
@@ -263,16 +305,18 @@ sessionError msg
263 305
264-- TODO check if it connected yet peer 306-- TODO check if it connected yet peer
265withPeerSession :: SwarmSession -> PeerAddr 307withPeerSession :: SwarmSession -> PeerAddr
266 -> ((Socket, PeerSession) -> IO a) 308 -> ((Socket, PeerSession) -> IO ())
267 -> IO a 309 -> IO ()
268 310
269withPeerSession ss @ SwarmSession {..} addr 311withPeerSession ss @ SwarmSession {..} addr
270 = bracket openSession closeSession 312 = handle isSessionException . bracket openSession closeSession
271 where 313 where
272 openSession = do 314 openSession = do
273 let caps = encodeExts $ allowedExtensions $ clientSession 315 let caps = encodeExts $ allowedExtensions $ clientSession
274 let pid = clientPeerID $ clientSession 316 let ihash = tInfoHash torrentMeta
275 let chs = Handshake defaultBTProtocol caps (tInfoHash torrentMeta) pid 317 let pid = clientPeerID $ clientSession
318 let chs = Handshake defaultBTProtocol caps ihash pid
319
276 sock <- connectToPeer addr 320 sock <- connectToPeer addr
277 phs <- handshake sock chs `onException` close sock 321 phs <- handshake sock chs `onException` close sock
278 322
@@ -292,7 +336,8 @@ withPeerSession ss @ SwarmSession {..} addr
292 } 336 }
293 return (sock, ps) 337 return (sock, ps)
294 338
295 closeSession = close . fst 339 closeSession (sock, _) = do
340 close sock
296 341
297getPieceCount :: (MonadReader PeerSession m) => m PieceCount 342getPieceCount :: (MonadReader PeerSession m) => m PieceCount
298getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession) 343getPieceCount = asks (pieceCount . tInfo . torrentMeta . swarmSession)