diff options
Diffstat (limited to 'src/Network/BitTorrent/Sessions.hs')
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 107 |
1 files changed, 51 insertions, 56 deletions
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs index 1f34bd4c..4d4cf629 100644 --- a/src/Network/BitTorrent/Sessions.hs +++ b/src/Network/BitTorrent/Sessions.hs | |||
@@ -30,15 +30,14 @@ module Network.BitTorrent.Sessions | |||
30 | , getCurrentProgress | 30 | , getCurrentProgress |
31 | , getSwarmCount | 31 | , getSwarmCount |
32 | , getPeerCount | 32 | , getPeerCount |
33 | , getSwarm | ||
34 | , openSwarmSession | ||
33 | 35 | ||
34 | -- * Swarm | 36 | -- * Swarm |
35 | , SwarmSession( SwarmSession, torrentMeta, clientSession ) | 37 | , SwarmSession( SwarmSession, torrentMeta, clientSession ) |
36 | 38 | ||
37 | , SessionCount | 39 | , SessionCount |
38 | , getSessionCount | 40 | , getSessionCount |
39 | |||
40 | , newLeecher | ||
41 | , newSeeder | ||
42 | , getClientBitfield | 41 | , getClientBitfield |
43 | 42 | ||
44 | , discover | 43 | , discover |
@@ -63,6 +62,7 @@ import Data.Set as S | |||
63 | import Data.Serialize hiding (get) | 62 | import Data.Serialize hiding (get) |
64 | 63 | ||
65 | import Network hiding (accept) | 64 | import Network hiding (accept) |
65 | import Network.BSD | ||
66 | import Network.Socket | 66 | import Network.Socket |
67 | import Network.Socket.ByteString | 67 | import Network.Socket.ByteString |
68 | 68 | ||
@@ -177,14 +177,14 @@ getListenerPort ClientSession {..} = servPort <$> readMVar peerListener | |||
177 | defSeederConns :: SessionCount | 177 | defSeederConns :: SessionCount |
178 | defSeederConns = defaultUnchokeSlots | 178 | defSeederConns = defaultUnchokeSlots |
179 | 179 | ||
180 | defLeacherConns :: SessionCount | 180 | defLeecherConns :: SessionCount |
181 | defLeacherConns = defaultNumWant | 181 | defLeecherConns = defaultNumWant |
182 | 182 | ||
183 | -- discovery should hide tracker and DHT communication under the hood | 183 | -- discovery should hide tracker and DHT communication under the hood |
184 | -- thus we can obtain an unified interface | 184 | -- thus we can obtain an unified interface |
185 | 185 | ||
186 | discover :: SwarmSession -> P2P () -> IO () | 186 | discover :: SwarmSession -> IO () |
187 | discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do | 187 | discover swarm @ SwarmSession {..} = {-# SCC discover #-} do |
188 | port <- getListenerPort clientSession | 188 | port <- getListenerPort clientSession |
189 | 189 | ||
190 | let conn = TConnection { | 190 | let conn = TConnection { |
@@ -199,37 +199,46 @@ discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do | |||
199 | withTracker progress conn $ \tses -> do | 199 | withTracker progress conn $ \tses -> do |
200 | forever $ do | 200 | forever $ do |
201 | addr <- getPeerAddr tses | 201 | addr <- getPeerAddr tses |
202 | print addr | ||
202 | forkThrottle swarm $ do | 203 | forkThrottle swarm $ do |
203 | initiatePeerSession swarm addr $ \conn -> | 204 | print addr |
204 | runP2P conn action | 205 | initiatePeerSession swarm addr $ \conn -> do |
206 | print addr | ||
207 | runP2P conn (exchange storage) | ||
205 | 208 | ||
206 | registerSwarmSession :: SwarmSession -> IO () | 209 | registerSwarmSession :: SwarmSession -> STM () |
207 | registerSwarmSession = undefined | 210 | registerSwarmSession ss @ SwarmSession {..} = |
211 | modifyTVar' (swarmSessions clientSession) $ | ||
212 | M.insert (tInfoHash torrentMeta) ss | ||
208 | 213 | ||
209 | unregisterSwarmSession :: SwarmSession -> IO () | 214 | unregisterSwarmSession :: SwarmSession -> STM () |
210 | unregisterSwarmSession SwarmSession {..} = | 215 | unregisterSwarmSession SwarmSession {..} = |
211 | atomically $ modifyTVar (swarmSessions clientSession) $ | 216 | modifyTVar' (swarmSessions clientSession) $ |
212 | M.delete $ tInfoHash torrentMeta | 217 | M.delete $ tInfoHash torrentMeta |
213 | 218 | ||
214 | newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | 219 | openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession |
215 | -> IO SwarmSession | 220 | openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do |
216 | newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | 221 | t <- validateLocation loc |
217 | = SwarmSession t cs | 222 | let bf = haveNone $ pieceCount $ tInfo t |
218 | <$> MSem.new n | 223 | |
224 | ss <- SwarmSession t cs | ||
225 | <$> MSem.new defLeecherConns | ||
219 | <*> newTVarIO bf | 226 | <*> newTVarIO bf |
220 | <*> undefined | 227 | <*> openStorage t dataDirPath |
221 | <*> newTVarIO S.empty | 228 | <*> newTVarIO S.empty |
222 | <*> newBroadcastTChanIO | 229 | <*> newBroadcastTChanIO |
223 | 230 | ||
224 | -- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession | 231 | atomically $ do |
225 | -- > openSwarmSession ClientSession {..} ih = do | 232 | modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t |
226 | -- > loc <- HM.lookup <$> readTVarIO torrentMap | 233 | registerSwarmSession ss |
227 | -- > torrent <- validateLocation loc | 234 | |
228 | -- > return undefined | 235 | forkIO $ discover ss |
236 | |||
237 | return ss | ||
229 | 238 | ||
230 | closeSwarmSession :: SwarmSession -> IO () | 239 | closeSwarmSession :: SwarmSession -> IO () |
231 | closeSwarmSession se @ SwarmSession {..} = do | 240 | closeSwarmSession se @ SwarmSession {..} = do |
232 | unregisterSwarmSession se | 241 | atomically $ unregisterSwarmSession se |
233 | -- TODO stop discovery | 242 | -- TODO stop discovery |
234 | -- TODO killall peer sessions | 243 | -- TODO killall peer sessions |
235 | -- TODO the order is important! | 244 | -- TODO the order is important! |
@@ -237,21 +246,11 @@ closeSwarmSession se @ SwarmSession {..} = do | |||
237 | 246 | ||
238 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | 247 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession |
239 | getSwarm cs @ ClientSession {..} ih = do | 248 | getSwarm cs @ ClientSession {..} ih = do |
240 | ss <- readTVarIO swarmSessions | 249 | status <- torrentPresence cs ih |
241 | case M.lookup ih ss of | 250 | case status of |
242 | Just sw -> return sw | 251 | Unknown -> throw $ UnknownTorrent ih |
243 | Nothing -> undefined -- openSwarmSession cs | 252 | Active sw -> return sw |
244 | 253 | Registered loc -> openSwarmSession cs loc | |
245 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
246 | newSeeder cs t @ Torrent {..} | ||
247 | = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
248 | |||
249 | -- | New swarm in which the client allowed both download and upload. | ||
250 | newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
251 | newLeecher cs t @ Torrent {..} = do | ||
252 | se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
253 | atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
254 | return se | ||
255 | 254 | ||
256 | -- | Get the number of connected peers in the given swarm. | 255 | -- | Get the number of connected peers in the given swarm. |
257 | getSessionCount :: SwarmSession -> IO SessionCount | 256 | getSessionCount :: SwarmSession -> IO SessionCount |
@@ -284,9 +283,6 @@ leaveSwarm SwarmSession {..} = do | |||
284 | MSem.signal vacantPeers | 283 | MSem.signal vacantPeers |
285 | MSem.signal (activeThreads clientSession) | 284 | MSem.signal (activeThreads clientSession) |
286 | 285 | ||
287 | waitVacancy :: SwarmSession -> IO () -> IO () | ||
288 | waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const | ||
289 | |||
290 | forkThrottle :: SwarmSession -> IO () -> IO ThreadId | 286 | forkThrottle :: SwarmSession -> IO () -> IO ThreadId |
291 | forkThrottle se action = do | 287 | forkThrottle se action = do |
292 | enterSwarm se | 288 | enterSwarm se |
@@ -304,14 +300,6 @@ registerTorrent = error "registerTorrent" | |||
304 | unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () | 300 | unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () |
305 | unregisterTorrent = error "unregisterTorrent" | 301 | unregisterTorrent = error "unregisterTorrent" |
306 | 302 | ||
307 | torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession | ||
308 | torrentSwarm _ _ (Active sws) = return sws | ||
309 | torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc | ||
310 | torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih | ||
311 | |||
312 | lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
313 | lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih | ||
314 | |||
315 | {----------------------------------------------------------------------- | 303 | {----------------------------------------------------------------------- |
316 | Peer session creation | 304 | Peer session creation |
317 | ------------------------------------------------------------------------ | 305 | ------------------------------------------------------------------------ |
@@ -353,6 +341,7 @@ openSession ss @ SwarmSession {..} addr Handshake {..} = do | |||
353 | registerPeerSession ps | 341 | registerPeerSession ps |
354 | return ps | 342 | return ps |
355 | 343 | ||
344 | -- TODO kill thread | ||
356 | closeSession :: PeerSession -> IO () | 345 | closeSession :: PeerSession -> IO () |
357 | closeSession = unregisterPeerSession | 346 | closeSession = unregisterPeerSession |
358 | 347 | ||
@@ -384,10 +373,11 @@ runSession connector opener action = | |||
384 | -- | Used then the client want to connect to a peer. | 373 | -- | Used then the client want to connect to a peer. |
385 | initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | 374 | initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () |
386 | initiatePeerSession ss @ SwarmSession {..} addr | 375 | initiatePeerSession ss @ SwarmSession {..} addr |
387 | = runSession (connectToPeer addr) initiated | 376 | = runSession (putStrLn ("trying to connect" ++ show addr) *> connectToPeer addr <* putStrLn "connected") initiated |
388 | where | 377 | where |
389 | initiated sock = do | 378 | initiated sock = do |
390 | phs <- handshake sock (swarmHandshake ss) | 379 | phs <- handshake sock (swarmHandshake ss) |
380 | putStrLn "handshaked" | ||
391 | ps <- openSession ss addr phs | 381 | ps <- openSession ss addr phs |
392 | sendClientStatus (sock, ps) | 382 | sendClientStatus (sock, ps) |
393 | return ps | 383 | return ps |
@@ -398,7 +388,7 @@ acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted | |||
398 | where | 388 | where |
399 | accepted sock = do | 389 | accepted sock = do |
400 | phs <- recvHandshake sock | 390 | phs <- recvHandshake sock |
401 | swarm <- lookupSwarm cs $ hsInfoHash phs | 391 | swarm <- getSwarm cs $ hsInfoHash phs |
402 | ps <- openSession swarm addr phs | 392 | ps <- openSession swarm addr phs |
403 | sendHandshake sock $ Handshake { | 393 | sendHandshake sock $ Handshake { |
404 | hsProtocol = defaultBTProtocol | 394 | hsProtocol = defaultBTProtocol |
@@ -413,17 +403,22 @@ listener :: ClientSession -> Exchange -> PortNumber -> IO () | |||
413 | listener cs action serverPort = bracket openListener close loop | 403 | listener cs action serverPort = bracket openListener close loop |
414 | where | 404 | where |
415 | loop sock = forever $ handle isIOError $ do | 405 | loop sock = forever $ handle isIOError $ do |
406 | putStrLn "listen" | ||
407 | print =<< getSocketName sock | ||
416 | (conn, addr) <- accept sock | 408 | (conn, addr) <- accept sock |
409 | putStrLn "accepted" | ||
417 | case addr of | 410 | case addr of |
418 | SockAddrInet port host -> do | 411 | SockAddrInet port host -> do |
419 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | 412 | forkIO $ do |
413 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
414 | return () | ||
420 | _ -> return () | 415 | _ -> return () |
421 | 416 | ||
422 | isIOError :: IOError -> IO () | 417 | isIOError :: IOError -> IO () |
423 | isIOError _ = return () | 418 | isIOError _ = return () |
424 | 419 | ||
425 | openListener = do | 420 | openListener = do |
426 | sock <- socket AF_INET Stream defaultProtocol | 421 | sock <- socket AF_INET Stream =<< getProtocolNumber "tcp" |
427 | bindSocket sock (SockAddrInet serverPort 0) | 422 | bindSocket sock (SockAddrInet serverPort iNADDR_ANY) |
428 | listen sock 1 | 423 | listen sock 1 |
429 | return sock | 424 | return sock |