summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Sessions.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Sessions.hs')
-rw-r--r--src/Network/BitTorrent/Sessions.hs107
1 files changed, 51 insertions, 56 deletions
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs
index 1f34bd4c..4d4cf629 100644
--- a/src/Network/BitTorrent/Sessions.hs
+++ b/src/Network/BitTorrent/Sessions.hs
@@ -30,15 +30,14 @@ module Network.BitTorrent.Sessions
30 , getCurrentProgress 30 , getCurrentProgress
31 , getSwarmCount 31 , getSwarmCount
32 , getPeerCount 32 , getPeerCount
33 , getSwarm
34 , openSwarmSession
33 35
34 -- * Swarm 36 -- * Swarm
35 , SwarmSession( SwarmSession, torrentMeta, clientSession ) 37 , SwarmSession( SwarmSession, torrentMeta, clientSession )
36 38
37 , SessionCount 39 , SessionCount
38 , getSessionCount 40 , getSessionCount
39
40 , newLeecher
41 , newSeeder
42 , getClientBitfield 41 , getClientBitfield
43 42
44 , discover 43 , discover
@@ -63,6 +62,7 @@ import Data.Set as S
63import Data.Serialize hiding (get) 62import Data.Serialize hiding (get)
64 63
65import Network hiding (accept) 64import Network hiding (accept)
65import Network.BSD
66import Network.Socket 66import Network.Socket
67import Network.Socket.ByteString 67import Network.Socket.ByteString
68 68
@@ -177,14 +177,14 @@ getListenerPort ClientSession {..} = servPort <$> readMVar peerListener
177defSeederConns :: SessionCount 177defSeederConns :: SessionCount
178defSeederConns = defaultUnchokeSlots 178defSeederConns = defaultUnchokeSlots
179 179
180defLeacherConns :: SessionCount 180defLeecherConns :: SessionCount
181defLeacherConns = defaultNumWant 181defLeecherConns = defaultNumWant
182 182
183-- discovery should hide tracker and DHT communication under the hood 183-- discovery should hide tracker and DHT communication under the hood
184-- thus we can obtain an unified interface 184-- thus we can obtain an unified interface
185 185
186discover :: SwarmSession -> P2P () -> IO () 186discover :: SwarmSession -> IO ()
187discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do 187discover swarm @ SwarmSession {..} = {-# SCC discover #-} do
188 port <- getListenerPort clientSession 188 port <- getListenerPort clientSession
189 189
190 let conn = TConnection { 190 let conn = TConnection {
@@ -199,37 +199,46 @@ discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do
199 withTracker progress conn $ \tses -> do 199 withTracker progress conn $ \tses -> do
200 forever $ do 200 forever $ do
201 addr <- getPeerAddr tses 201 addr <- getPeerAddr tses
202 print addr
202 forkThrottle swarm $ do 203 forkThrottle swarm $ do
203 initiatePeerSession swarm addr $ \conn -> 204 print addr
204 runP2P conn action 205 initiatePeerSession swarm addr $ \conn -> do
206 print addr
207 runP2P conn (exchange storage)
205 208
206registerSwarmSession :: SwarmSession -> IO () 209registerSwarmSession :: SwarmSession -> STM ()
207registerSwarmSession = undefined 210registerSwarmSession ss @ SwarmSession {..} =
211 modifyTVar' (swarmSessions clientSession) $
212 M.insert (tInfoHash torrentMeta) ss
208 213
209unregisterSwarmSession :: SwarmSession -> IO () 214unregisterSwarmSession :: SwarmSession -> STM ()
210unregisterSwarmSession SwarmSession {..} = 215unregisterSwarmSession SwarmSession {..} =
211 atomically $ modifyTVar (swarmSessions clientSession) $ 216 modifyTVar' (swarmSessions clientSession) $
212 M.delete $ tInfoHash torrentMeta 217 M.delete $ tInfoHash torrentMeta
213 218
214newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent 219openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession
215 -> IO SwarmSession 220openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
216newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} 221 t <- validateLocation loc
217 = SwarmSession t cs 222 let bf = haveNone $ pieceCount $ tInfo t
218 <$> MSem.new n 223
224 ss <- SwarmSession t cs
225 <$> MSem.new defLeecherConns
219 <*> newTVarIO bf 226 <*> newTVarIO bf
220 <*> undefined 227 <*> openStorage t dataDirPath
221 <*> newTVarIO S.empty 228 <*> newTVarIO S.empty
222 <*> newBroadcastTChanIO 229 <*> newBroadcastTChanIO
223 230
224-- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession 231 atomically $ do
225-- > openSwarmSession ClientSession {..} ih = do 232 modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t
226-- > loc <- HM.lookup <$> readTVarIO torrentMap 233 registerSwarmSession ss
227-- > torrent <- validateLocation loc 234
228-- > return undefined 235 forkIO $ discover ss
236
237 return ss
229 238
230closeSwarmSession :: SwarmSession -> IO () 239closeSwarmSession :: SwarmSession -> IO ()
231closeSwarmSession se @ SwarmSession {..} = do 240closeSwarmSession se @ SwarmSession {..} = do
232 unregisterSwarmSession se 241 atomically $ unregisterSwarmSession se
233 -- TODO stop discovery 242 -- TODO stop discovery
234 -- TODO killall peer sessions 243 -- TODO killall peer sessions
235 -- TODO the order is important! 244 -- TODO the order is important!
@@ -237,21 +246,11 @@ closeSwarmSession se @ SwarmSession {..} = do
237 246
238getSwarm :: ClientSession -> InfoHash -> IO SwarmSession 247getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
239getSwarm cs @ ClientSession {..} ih = do 248getSwarm cs @ ClientSession {..} ih = do
240 ss <- readTVarIO swarmSessions 249 status <- torrentPresence cs ih
241 case M.lookup ih ss of 250 case status of
242 Just sw -> return sw 251 Unknown -> throw $ UnknownTorrent ih
243 Nothing -> undefined -- openSwarmSession cs 252 Active sw -> return sw
244 253 Registered loc -> openSwarmSession cs loc
245newSeeder :: ClientSession -> Torrent -> IO SwarmSession
246newSeeder cs t @ Torrent {..}
247 = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
248
249-- | New swarm in which the client allowed both download and upload.
250newLeecher :: ClientSession -> Torrent -> IO SwarmSession
251newLeecher cs t @ Torrent {..} = do
252 se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
253 atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
254 return se
255 254
256-- | Get the number of connected peers in the given swarm. 255-- | Get the number of connected peers in the given swarm.
257getSessionCount :: SwarmSession -> IO SessionCount 256getSessionCount :: SwarmSession -> IO SessionCount
@@ -284,9 +283,6 @@ leaveSwarm SwarmSession {..} = do
284 MSem.signal vacantPeers 283 MSem.signal vacantPeers
285 MSem.signal (activeThreads clientSession) 284 MSem.signal (activeThreads clientSession)
286 285
287waitVacancy :: SwarmSession -> IO () -> IO ()
288waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const
289
290forkThrottle :: SwarmSession -> IO () -> IO ThreadId 286forkThrottle :: SwarmSession -> IO () -> IO ThreadId
291forkThrottle se action = do 287forkThrottle se action = do
292 enterSwarm se 288 enterSwarm se
@@ -304,14 +300,6 @@ registerTorrent = error "registerTorrent"
304unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () 300unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
305unregisterTorrent = error "unregisterTorrent" 301unregisterTorrent = error "unregisterTorrent"
306 302
307torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
308torrentSwarm _ _ (Active sws) = return sws
309torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc
310torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih
311
312lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession
313lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih
314
315{----------------------------------------------------------------------- 303{-----------------------------------------------------------------------
316 Peer session creation 304 Peer session creation
317------------------------------------------------------------------------ 305------------------------------------------------------------------------
@@ -353,6 +341,7 @@ openSession ss @ SwarmSession {..} addr Handshake {..} = do
353 registerPeerSession ps 341 registerPeerSession ps
354 return ps 342 return ps
355 343
344-- TODO kill thread
356closeSession :: PeerSession -> IO () 345closeSession :: PeerSession -> IO ()
357closeSession = unregisterPeerSession 346closeSession = unregisterPeerSession
358 347
@@ -384,10 +373,11 @@ runSession connector opener action =
384-- | Used then the client want to connect to a peer. 373-- | Used then the client want to connect to a peer.
385initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () 374initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
386initiatePeerSession ss @ SwarmSession {..} addr 375initiatePeerSession ss @ SwarmSession {..} addr
387 = runSession (connectToPeer addr) initiated 376 = runSession (putStrLn ("trying to connect" ++ show addr) *> connectToPeer addr <* putStrLn "connected") initiated
388 where 377 where
389 initiated sock = do 378 initiated sock = do
390 phs <- handshake sock (swarmHandshake ss) 379 phs <- handshake sock (swarmHandshake ss)
380 putStrLn "handshaked"
391 ps <- openSession ss addr phs 381 ps <- openSession ss addr phs
392 sendClientStatus (sock, ps) 382 sendClientStatus (sock, ps)
393 return ps 383 return ps
@@ -398,7 +388,7 @@ acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
398 where 388 where
399 accepted sock = do 389 accepted sock = do
400 phs <- recvHandshake sock 390 phs <- recvHandshake sock
401 swarm <- lookupSwarm cs $ hsInfoHash phs 391 swarm <- getSwarm cs $ hsInfoHash phs
402 ps <- openSession swarm addr phs 392 ps <- openSession swarm addr phs
403 sendHandshake sock $ Handshake { 393 sendHandshake sock $ Handshake {
404 hsProtocol = defaultBTProtocol 394 hsProtocol = defaultBTProtocol
@@ -413,17 +403,22 @@ listener :: ClientSession -> Exchange -> PortNumber -> IO ()
413listener cs action serverPort = bracket openListener close loop 403listener cs action serverPort = bracket openListener close loop
414 where 404 where
415 loop sock = forever $ handle isIOError $ do 405 loop sock = forever $ handle isIOError $ do
406 putStrLn "listen"
407 print =<< getSocketName sock
416 (conn, addr) <- accept sock 408 (conn, addr) <- accept sock
409 putStrLn "accepted"
417 case addr of 410 case addr of
418 SockAddrInet port host -> do 411 SockAddrInet port host -> do
419 acceptPeerSession cs (PeerAddr Nothing host port) conn action 412 forkIO $ do
413 acceptPeerSession cs (PeerAddr Nothing host port) conn action
414 return ()
420 _ -> return () 415 _ -> return ()
421 416
422 isIOError :: IOError -> IO () 417 isIOError :: IOError -> IO ()
423 isIOError _ = return () 418 isIOError _ = return ()
424 419
425 openListener = do 420 openListener = do
426 sock <- socket AF_INET Stream defaultProtocol 421 sock <- socket AF_INET Stream =<< getProtocolNumber "tcp"
427 bindSocket sock (SockAddrInet serverPort 0) 422 bindSocket sock (SockAddrInet serverPort iNADDR_ANY)
428 listen sock 1 423 listen sock 1
429 return sock 424 return sock