summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent.hs6
-rw-r--r--src/Network/BitTorrent/Exchange.hs118
-rw-r--r--src/Network/BitTorrent/Exchange/Protocol.hs15
-rw-r--r--src/Network/BitTorrent/Internal.hs67
-rw-r--r--src/Network/BitTorrent/Tracker/Protocol.hs5
5 files changed, 113 insertions, 98 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs
index 546c7644..24d78e85 100644
--- a/src/Network/BitTorrent.hs
+++ b/src/Network/BitTorrent.hs
@@ -16,6 +16,7 @@ module Network.BitTorrent
16 16
17 , SwarmSession 17 , SwarmSession
18 , newLeacher, newSeeder 18 , newLeacher, newSeeder
19 , getSessionCount
19 20
20 -- * Discovery 21 -- * Discovery
21 , discover 22 , discover
@@ -63,16 +64,11 @@ discover swarm action = do
63 64
64 progress <- getCurrentProgress (clientSession swarm) 65 progress <- getCurrentProgress (clientSession swarm)
65 66
66 putStrLn "lookup peers"
67 withTracker progress conn $ \tses -> do 67 withTracker progress conn $ \tses -> do
68 putStrLn "get peer list "
69 forever $ do 68 forever $ do
70 addr <- getPeerAddr tses 69 addr <- getPeerAddr tses
71 putStrLn "connect to peer"
72 spawnP2P swarm addr $ do 70 spawnP2P swarm addr $ do
73 liftIO $ putStrLn "run p2p session"
74 action 71 action
75 putStrLn "connected"
76 72
77listener :: SwarmSession -> P2P () -> IO PortNumber 73listener :: SwarmSession -> P2P () -> IO PortNumber
78listener _ _ = do 74listener _ _ = do
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index be9a455b..2d0393c0 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -61,28 +61,73 @@ runPeerWire :: Socket -> PeerWire () -> IO ()
61runPeerWire sock p2p = 61runPeerWire sock p2p =
62 sourceSocket sock $= 62 sourceSocket sock $=
63 conduitGet S.get $= 63 conduitGet S.get $=
64 forever p2p $= 64 p2p $=
65 conduitPut S.put $$ 65 conduitPut S.put $$
66 sinkSocket sock 66 sinkSocket sock
67 67
68awaitMessage :: P2P Message 68awaitMessage :: P2P Message
69awaitMessage = P2P (ReaderT go) 69awaitMessage = P2P (ReaderT (const go))
70 where 70 where
71 go _ = do 71 go = await >>= maybe disconnect return
72 liftIO $ putStrLn "trying recv:"
73 mmsg <- await
74 case mmsg of
75 Nothing -> monadThrow SessionException
76 Just msg -> do
77-- liftIO $ updateIncoming se
78 liftIO $ print ("recv:" <+> ppMessage msg)
79 return msg
80 72
81yieldMessage :: Message -> P2P () 73yieldMessage :: Message -> P2P ()
82yieldMessage msg = P2P $ ReaderT $ \se -> do 74yieldMessage msg = P2P $ ReaderT $ \se -> C.yield msg
83 C.yield msg 75
84 liftIO $ print $ "sent:" <+> ppMessage msg 76{-----------------------------------------------------------------------
85 liftIO $ updateOutcoming se 77 P2P monad
78-----------------------------------------------------------------------}
79
80-- |
81-- Exceptions:
82--
83-- * SessionException: is visible only within one peer
84-- session. Use this exception to terminate P2P session, but not
85-- the swarm session.
86--
87newtype P2P a = P2P {
88 unP2P :: ReaderT PeerSession PeerWire a
89 } deriving ( Functor, Applicative, Monad
90 , MonadIO, MonadThrow, MonadActive
91 , MonadReader PeerSession
92 )
93-- TODO instance for MonadFork
94
95runSession :: SwarmSession -> PeerAddr -> P2P () -> IO ()
96runSession se addr p2p =
97 handle isIOException $
98 withPeerSession se addr $ \(sock, pses) -> do
99 runPeerWire sock (runReaderT (unP2P p2p) pses)
100 where
101 isIOException :: IOException -> IO ()
102 isIOException _ = return ()
103
104-- | Run P2P session in the current thread. Normally you don't need this
105-- function in client application.
106runP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ()
107runP2P se addr p2p = waitVacancy se $ runSession se addr p2p
108
109-- | Run P2P session in forked thread. Might be used in listener or
110-- some other loop. Note that this function may block while waiting
111-- for a vacant place: use forkIO and runP2P instead.
112spawnP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ThreadId
113spawnP2P se addr p2p = do
114 enterSwarm se
115 forkIO $ do
116 runSession se addr p2p `finally` leaveSwarm se
117
118{-----------------------------------------------------------------------
119 Exceptions
120-----------------------------------------------------------------------}
121
122disconnect :: MonadThrow m => m a
123disconnect = monadThrow PeerDisconnected
124
125protocolError :: MonadThrow m => Doc -> m a
126protocolError = monadThrow . ProtocolError
127
128{-----------------------------------------------------------------------
129 Helpers
130-----------------------------------------------------------------------}
86 131
87peerWant :: P2P Bitfield 132peerWant :: P2P Bitfield
88peerWant = BF.difference <$> getClientBF <*> use bitfield 133peerWant = BF.difference <$> getClientBF <*> use bitfield
@@ -116,7 +161,7 @@ requireExtension :: Extension -> P2P ()
116requireExtension required = do 161requireExtension required = do
117 enabled <- asks enabledExtensions 162 enabled <- asks enabledExtensions
118 unless (required `elem` enabled) $ 163 unless (required `elem` enabled) $
119 sessionError $ ppExtension required <+> "not enabled" 164 protocolError $ ppExtension required <+> "not enabled"
120 165
121-- haveMessage bf = do 166-- haveMessage bf = do
122-- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession 167-- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession
@@ -124,6 +169,9 @@ requireExtension required = do
124-- then nextEvent se 169-- then nextEvent se
125-- else undefined -- return $ Available diff 170-- else undefined -- return $ Available diff
126 171
172{-----------------------------------------------------------------------
173 Exchange
174-----------------------------------------------------------------------}
127 175
128-- | 176-- |
129-- +----------+---------+ 177-- +----------+---------+
@@ -266,41 +314,3 @@ yieldEvent (Fragment blk) = do
266 314
267checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool 315checkPiece :: PieceLIx -> {-ByteString -> -} P2P Bool
268checkPiece = undefined 316checkPiece = undefined
269
270{-----------------------------------------------------------------------
271 P2P monad
272-----------------------------------------------------------------------}
273
274-- |
275-- Exceptions:
276--
277-- * SessionException: is visible only within one peer
278-- session. Use this exception to terminate P2P session, but not
279-- the swarm session.
280--
281newtype P2P a = P2P {
282 unP2P :: ReaderT PeerSession PeerWire a
283 } deriving ( Functor, Applicative, Monad
284 , MonadIO, MonadThrow, MonadActive
285 , MonadReader PeerSession
286 )
287-- TODO instance for MonadFork
288
289runSession :: SwarmSession -> PeerAddr -> P2P () -> IO ()
290runSession se addr p2p =
291 withPeerSession se addr $ \(sock, pses) -> do
292 runPeerWire sock (runReaderT (unP2P p2p) pses)
293
294-- | Run P2P session in the current thread. Normally you don't need this
295-- function in client application.
296runP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ()
297runP2P se addr p2p = waitVacancy se $ runSession se addr p2p
298
299-- | Run P2P session in forked thread. Might be used in listener or
300-- some other loop. Note that this function may block while waiting
301-- for a vacant place: use forkIO and runP2P instead.
302spawnP2P :: SwarmSession -> PeerAddr -> P2P () -> IO ThreadId
303spawnP2P se addr p2p = do
304 enterSwarm se
305 forkIO $ do
306 runSession se addr p2p `finally` leaveSwarm se
diff --git a/src/Network/BitTorrent/Exchange/Protocol.hs b/src/Network/BitTorrent/Exchange/Protocol.hs
index 5ea104cc..8d42e3a8 100644
--- a/src/Network/BitTorrent/Exchange/Protocol.hs
+++ b/src/Network/BitTorrent/Exchange/Protocol.hs
@@ -164,10 +164,8 @@ defaultHandshake = Handshake defaultBTProtocol defaultReserved
164-- | Handshaking with a peer specified by the second argument. 164-- | Handshaking with a peer specified by the second argument.
165handshake :: Socket -> Handshake -> IO Handshake 165handshake :: Socket -> Handshake -> IO Handshake
166handshake sock hs = do 166handshake sock hs = do
167 putStrLn "send handshake"
168 sendAll sock (S.encode hs) 167 sendAll sock (S.encode hs)
169 168
170 putStrLn "recv handshake size"
171 header <- recv sock 1 169 header <- recv sock 1
172 when (B.length header == 0) $ 170 when (B.length header == 0) $
173 throw $ userError "Unable to receive handshake." 171 throw $ userError "Unable to receive handshake."
@@ -175,7 +173,6 @@ handshake sock hs = do
175 let protocolLen = B.head header 173 let protocolLen = B.head header
176 let restLen = handshakeSize protocolLen - 1 174 let restLen = handshakeSize protocolLen - 1
177 175
178 putStrLn "recv handshake body"
179 body <- recv sock restLen 176 body <- recv sock restLen
180 let resp = B.cons protocolLen body 177 let resp = B.cons protocolLen body
181 178
@@ -432,9 +429,9 @@ ppMessage msg = text (show msg)
432 429
433-- | 430-- |
434data PeerStatus = PeerStatus { 431data PeerStatus = PeerStatus {
435 _choking :: Bool 432 _choking :: !Bool
436 , _interested :: Bool 433 , _interested :: !Bool
437 } 434 } deriving (Show, Eq)
438 435
439$(makeLenses ''PeerStatus) 436$(makeLenses ''PeerStatus)
440 437
@@ -443,9 +440,9 @@ instance Default PeerStatus where
443 440
444-- | 441-- |
445data SessionStatus = SessionStatus { 442data SessionStatus = SessionStatus {
446 _clientStatus :: PeerStatus 443 _clientStatus :: !PeerStatus
447 , _peerStatus :: PeerStatus 444 , _peerStatus :: !PeerStatus
448 } 445 } deriving (Show, Eq)
449 446
450$(makeLenses ''SessionStatus) 447$(makeLenses ''SessionStatus)
451 448
diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs
index 918bfed7..db84e879 100644
--- a/src/Network/BitTorrent/Internal.hs
+++ b/src/Network/BitTorrent/Internal.hs
@@ -14,6 +14,7 @@
14-- data should be modified through standalone functions. 14-- data should be modified through standalone functions.
15-- 15--
16{-# LANGUAGE OverloadedStrings #-} 16{-# LANGUAGE OverloadedStrings #-}
17{-# LANGUAGE BangPatterns #-}
17{-# LANGUAGE RecordWildCards #-} 18{-# LANGUAGE RecordWildCards #-}
18{-# LANGUAGE TemplateHaskell #-} 19{-# LANGUAGE TemplateHaskell #-}
19{-# LANGUAGE DeriveDataTypeable #-} 20{-# LANGUAGE DeriveDataTypeable #-}
@@ -32,6 +33,7 @@ module Network.BitTorrent.Internal
32 33
33 -- * Swarm 34 -- * Swarm
34 , SwarmSession(SwarmSession, torrentMeta, clientSession) 35 , SwarmSession(SwarmSession, torrentMeta, clientSession)
36 , getSessionCount
35 , newLeacher, newSeeder 37 , newLeacher, newSeeder
36 , enterSwarm, leaveSwarm , waitVacancy 38 , enterSwarm, leaveSwarm , waitVacancy
37 39
@@ -46,7 +48,6 @@ module Network.BitTorrent.Internal
46 , SessionException(..) 48 , SessionException(..)
47 , isSessionException 49 , isSessionException
48 , putSessionException 50 , putSessionException
49 , sessionError
50 51
51 -- ** Properties 52 -- ** Properties
52 , bitfield, status 53 , bitfield, status
@@ -123,7 +124,7 @@ defaultThreadCount = 1000
123data ClientSession = ClientSession { 124data ClientSession = ClientSession {
124 -- | Our peer ID used in handshaked and discovery mechanism. The 125 -- | Our peer ID used in handshaked and discovery mechanism. The
125 -- clientPeerID is unique 'ClientSession' identifier. 126 -- clientPeerID is unique 'ClientSession' identifier.
126 clientPeerID :: PeerID 127 clientPeerID :: !PeerID
127 128
128 -- | Extensions we should try to use. Hovewer some particular peer 129 -- | Extensions we should try to use. Hovewer some particular peer
129 -- might not support some extension, so we keep enableExtension in 130 -- might not support some extension, so we keep enableExtension in
@@ -186,24 +187,31 @@ defLeacherConns = defaultNumWant
186-- | Extensions are set globally by 187-- | Extensions are set globally by
187-- Swarm session are un 188-- Swarm session are un
188data SwarmSession = SwarmSession { 189data SwarmSession = SwarmSession {
189 torrentMeta :: Torrent 190 torrentMeta :: !Torrent
190 , clientSession :: ClientSession 191 , clientSession :: !ClientSession
191 192
192 -- | Represent count of peers we _currently_ can connect to in the 193 -- | Represent count of peers we _currently_ can connect to in the
193 -- swarm. Used to bound number of concurrent threads. 194 -- swarm. Used to bound number of concurrent threads.
194 , vacantPeers :: MSem SessionCount 195 , vacantPeers :: !(MSem SessionCount)
195 196
196 -- | Modify this carefully updating global progress. 197 -- | Modify this carefully updating global progress.
197 , clientBitfield :: TVar Bitfield 198 , clientBitfield :: !(TVar Bitfield)
198 , connectedPeers :: TVar (Set PeerSession) 199 , connectedPeers :: !(TVar (Set PeerSession))
199 } 200 }
200 201
202-- INVARIANT:
203-- max_sessions_count - sizeof connectedPeers = value vacantPeers
204
201instance Eq SwarmSession where 205instance Eq SwarmSession where
202 (==) = (==) `on` (tInfoHash . torrentMeta) 206 (==) = (==) `on` (tInfoHash . torrentMeta)
203 207
204instance Ord SwarmSession where 208instance Ord SwarmSession where
205 compare = comparing (tInfoHash . torrentMeta) 209 compare = comparing (tInfoHash . torrentMeta)
206 210
211getSessionCount :: SwarmSession -> IO SessionCount
212getSessionCount SwarmSession {..} = do
213 S.size <$> readTVarIO connectedPeers
214
207newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent 215newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
208 -> IO SwarmSession 216 -> IO SwarmSession
209newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} 217newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
@@ -255,9 +263,9 @@ waitVacancy se =
255data PeerSession = PeerSession { 263data PeerSession = PeerSession {
256 -- | Used as unique 'PeerSession' identifier within one 264 -- | Used as unique 'PeerSession' identifier within one
257 -- 'SwarmSession'. 265 -- 'SwarmSession'.
258 connectedPeerAddr :: PeerAddr 266 connectedPeerAddr :: !PeerAddr
259 267
260 , swarmSession :: SwarmSession 268 , swarmSession :: !SwarmSession
261 269
262 -- | Extensions such that both peer and client support. 270 -- | Extensions such that both peer and client support.
263 , enabledExtensions :: [Extension] 271 , enabledExtensions :: [Extension]
@@ -269,7 +277,7 @@ data PeerSession = PeerSession {
269 -- 277 --
270 -- We should update timeout if we /receive/ any message within 278 -- We should update timeout if we /receive/ any message within
271 -- timeout interval to keep connection up. 279 -- timeout interval to keep connection up.
272 , incomingTimeout :: TimeoutKey 280 , incomingTimeout :: !TimeoutKey
273 281
274 -- | To send KA message appropriately we should know when was last 282 -- | To send KA message appropriately we should know when was last
275 -- time we sent a message to a peer. To do that we keep registered 283 -- time we sent a message to a peer. To do that we keep registered
@@ -279,17 +287,17 @@ data PeerSession = PeerSession {
279 -- 287 --
280 -- We should update timeout if we /send/ any message within timeout 288 -- We should update timeout if we /send/ any message within timeout
281 -- to avoid reduntant KA messages. 289 -- to avoid reduntant KA messages.
282 , outcomingTimeout :: TimeoutKey 290 , outcomingTimeout :: !TimeoutKey
283 291
284 -- TODO use dupChan for broadcasting 292 -- TODO use dupChan for broadcasting
285 , broadcastMessages :: Chan [Message] 293 , broadcastMessages :: !(Chan [Message])
286 , sessionState :: IORef SessionState 294 , sessionState :: !(IORef SessionState)
287 } 295 }
288 296
289data SessionState = SessionState { 297data SessionState = SessionState {
290 _bitfield :: Bitfield 298 _bitfield :: !Bitfield
291 , _status :: SessionStatus 299 , _status :: !SessionStatus
292 } 300 } deriving (Show, Eq)
293 301
294$(makeLenses ''SessionState) 302$(makeLenses ''SessionState)
295 303
@@ -301,10 +309,16 @@ instance Ord PeerSession where
301 309
302instance (MonadIO m, MonadReader PeerSession m) 310instance (MonadIO m, MonadReader PeerSession m)
303 => MonadState SessionState m where 311 => MonadState SessionState m where
304 get = asks sessionState >>= liftIO . readIORef 312 get = do
305 put s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s 313 ref <- asks sessionState
314 st <- liftIO (readIORef ref)
315 liftIO $ print (completeness (_bitfield st))
316 return st
317
318 put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s
306 319
307data SessionException = SessionException 320data SessionException = PeerDisconnected
321 | ProtocolError Doc
308 deriving (Show, Typeable) 322 deriving (Show, Typeable)
309 323
310instance Exception SessionException 324instance Exception SessionException
@@ -315,10 +329,6 @@ isSessionException _ = return ()
315putSessionException :: SessionException -> IO () 329putSessionException :: SessionException -> IO ()
316putSessionException = print 330putSessionException = print
317 331
318sessionError :: MonadIO m => Doc -> m ()
319sessionError msg
320 = liftIO $ throwIO $ userError $ render $ msg <+> "in session"
321
322-- TODO check if it connected yet peer 332-- TODO check if it connected yet peer
323withPeerSession :: SwarmSession -> PeerAddr 333withPeerSession :: SwarmSession -> PeerAddr
324 -> ((Socket, PeerSession) -> IO ()) 334 -> ((Socket, PeerSession) -> IO ())
@@ -342,7 +352,7 @@ withPeerSession ss @ SwarmSession {..} addr
342 let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) 352 let enabled = decodeExts (enabledCaps caps (handshakeCaps phs))
343 ps <- PeerSession addr ss enabled 353 ps <- PeerSession addr ss enabled
344 <$> registerTimeout (eventManager clientSession) 354 <$> registerTimeout (eventManager clientSession)
345 maxIncomingTime abortSession 355 maxIncomingTime (return ())
346 <*> registerTimeout (eventManager clientSession) 356 <*> registerTimeout (eventManager clientSession)
347 maxOutcomingTime (sendKA sock) 357 maxOutcomingTime (sendKA sock)
348 <*> newChan 358 <*> newChan
@@ -350,9 +360,13 @@ withPeerSession ss @ SwarmSession {..} addr
350 ; tc <- totalCount <$> readTVarIO clientBitfield 360 ; tc <- totalCount <$> readTVarIO clientBitfield
351 ; newIORef (SessionState (haveNone tc) def) 361 ; newIORef (SessionState (haveNone tc) def)
352 } 362 }
363
364 atomically $ modifyTVar' connectedPeers (S.insert ps)
365
353 return (sock, ps) 366 return (sock, ps)
354 367
355 closeSession (sock, _) = do 368 closeSession (sock, ps) = do
369 atomically $ modifyTVar' connectedPeers (S.delete ps)
356 close sock 370 close sock
357 371
358getPieceCount :: (MonadReader PeerSession m) => m PieceCount 372getPieceCount :: (MonadReader PeerSession m) => m PieceCount
@@ -411,6 +425,3 @@ sendKA sock {- SwarmSession {..} -} = do
411-- let mgr = eventManager clientSession 425-- let mgr = eventManager clientSession
412-- updateTimeout mgr 426-- updateTimeout mgr
413-- print "Done.." 427-- print "Done.."
414
415abortSession :: IO ()
416abortSession = error "abortSession: not implemented"
diff --git a/src/Network/BitTorrent/Tracker/Protocol.hs b/src/Network/BitTorrent/Tracker/Protocol.hs
index f7599ba2..d4f3da3c 100644
--- a/src/Network/BitTorrent/Tracker/Protocol.hs
+++ b/src/Network/BitTorrent/Tracker/Protocol.hs
@@ -212,10 +212,11 @@ defaultPorts = [6881..6889]
212-- in fact only actively forms new connections if it has less than 212-- in fact only actively forms new connections if it has less than
213-- 30 peers and will refuse connections if it has 55. 213-- 30 peers and will refuse connections if it has 55.
214-- 214--
215-- So the default value is set to 25. 215-- So the default value is set to 50 because usually 30-50% of peers
216-- are not responding.
216-- 217--
217defaultNumWant :: Int 218defaultNumWant :: Int
218defaultNumWant = 25 219defaultNumWant = 50
219 220
220mkHTTPRequest :: URI -> Request ByteString 221mkHTTPRequest :: URI -> Request ByteString
221mkHTTPRequest uri = Request uri GET [] "" 222mkHTTPRequest uri = Request uri GET [] ""