diff options
author | Sam T <pxqr.sta@gmail.com> | 2013-07-14 19:34:36 +0400 |
---|---|---|
committer | Sam T <pxqr.sta@gmail.com> | 2013-07-14 19:34:36 +0400 |
commit | 4427cb321a6927b2dd8119e95e09f4998ff8a226 (patch) | |
tree | ed29b1a0ddcc3113cbe3d68d70413cc8239f5642 /src/Network/BitTorrent | |
parent | 5cd492bc9e7ebbaa9557f7a6ae15582febd60a7d (diff) |
~ Use timestamp peer Id generator.
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r-- | src/Network/BitTorrent/Exchange.hs | 11 | ||||
-rw-r--r-- | src/Network/BitTorrent/Peer.hs | 4 | ||||
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 107 | ||||
-rw-r--r-- | src/Network/BitTorrent/Tracker.hs | 4 |
4 files changed, 60 insertions, 66 deletions
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 32ab493d..52b5f690 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs | |||
@@ -71,7 +71,6 @@ module Network.BitTorrent.Exchange | |||
71 | 71 | ||
72 | import Control.Applicative | 72 | import Control.Applicative |
73 | import Control.Exception | 73 | import Control.Exception |
74 | import Control.Concurrent | ||
75 | import Control.Lens | 74 | import Control.Lens |
76 | import Control.Monad.Reader | 75 | import Control.Monad.Reader |
77 | import Control.Monad.State | 76 | import Control.Monad.State |
@@ -89,7 +88,6 @@ import Network | |||
89 | 88 | ||
90 | import Data.Bitfield as BF | 89 | import Data.Bitfield as BF |
91 | import Network.BitTorrent.Extension | 90 | import Network.BitTorrent.Extension |
92 | import Network.BitTorrent.Peer | ||
93 | import Network.BitTorrent.Exchange.Protocol | 91 | import Network.BitTorrent.Exchange.Protocol |
94 | import Network.BitTorrent.Sessions.Types | 92 | import Network.BitTorrent.Sessions.Types |
95 | import System.Torrent.Storage | 93 | import System.Torrent.Storage |
@@ -117,7 +115,8 @@ awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go | |||
117 | {-# INLINE awaitMessage #-} | 115 | {-# INLINE awaitMessage #-} |
118 | 116 | ||
119 | yieldMessage :: Message -> P2P () | 117 | yieldMessage :: Message -> P2P () |
120 | yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} C.yield msg | 118 | yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do |
119 | C.yield msg | ||
121 | {-# INLINE yieldMessage #-} | 120 | {-# INLINE yieldMessage #-} |
122 | 121 | ||
123 | -- TODO send vectored | 122 | -- TODO send vectored |
@@ -209,7 +208,6 @@ singletonBF i = liftM (BF.singleton i) getPieceCount | |||
209 | adjustBF :: Bitfield -> P2P Bitfield | 208 | adjustBF :: Bitfield -> P2P Bitfield |
210 | adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount | 209 | adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount |
211 | 210 | ||
212 | |||
213 | peerWant :: P2P Bitfield | 211 | peerWant :: P2P Bitfield |
214 | peerWant = BF.difference <$> getClientBF <*> use bitfield | 212 | peerWant = BF.difference <$> getClientBF <*> use bitfield |
215 | 213 | ||
@@ -306,7 +304,8 @@ data Event | |||
306 | awaitEvent :: P2P Event | 304 | awaitEvent :: P2P Event |
307 | awaitEvent = {-# SCC awaitEvent #-} do | 305 | awaitEvent = {-# SCC awaitEvent #-} do |
308 | flushPending | 306 | flushPending |
309 | awaitMessage >>= go | 307 | msg <- awaitMessage |
308 | go msg | ||
310 | where | 309 | where |
311 | go KeepAlive = awaitEvent | 310 | go KeepAlive = awaitEvent |
312 | go Choke = do | 311 | go Choke = do |
@@ -464,7 +463,7 @@ exchange :: Storage -> P2P () | |||
464 | exchange storage = {-# SCC exchange #-} awaitEvent >>= handler | 463 | exchange storage = {-# SCC exchange #-} awaitEvent >>= handler |
465 | where | 464 | where |
466 | handler (Available bf) = do | 465 | handler (Available bf) = do |
467 | liftIO (print (completeness bf)) | 466 | liftIO $ print (completeness bf) |
468 | ixs <- selBlk (findMin bf) storage | 467 | ixs <- selBlk (findMin bf) storage |
469 | mapM_ (yieldEvent . Want) ixs -- TODO yield vectored | 468 | mapM_ (yieldEvent . Want) ixs -- TODO yield vectored |
470 | 469 | ||
diff --git a/src/Network/BitTorrent/Peer.hs b/src/Network/BitTorrent/Peer.hs index cdcd65ea..7bac336b 100644 --- a/src/Network/BitTorrent/Peer.hs +++ b/src/Network/BitTorrent/Peer.hs | |||
@@ -196,6 +196,8 @@ timestamp = (BC.pack . format) <$> getCurrentTime | |||
196 | entropy :: IO ByteString | 196 | entropy :: IO ByteString |
197 | entropy = getEntropy 15 | 197 | entropy = getEntropy 15 |
198 | 198 | ||
199 | -- NOTE: entropy generates incorrrect peer id | ||
200 | |||
199 | -- | Here we use Azureus-style encoding with the following args: | 201 | -- | Here we use Azureus-style encoding with the following args: |
200 | -- | 202 | -- |
201 | -- * 'HS' for the client id. | 203 | -- * 'HS' for the client id. |
@@ -205,7 +207,7 @@ entropy = getEntropy 15 | |||
205 | -- * UTC time day ++ day time for the random number. | 207 | -- * UTC time day ++ day time for the random number. |
206 | -- | 208 | -- |
207 | genPeerId :: IO PeerId | 209 | genPeerId :: IO PeerId |
208 | genPeerId = azureusStyle defaultClientId defaultVersionNumber <$> entropy | 210 | genPeerId = azureusStyle defaultClientId defaultVersionNumber <$> timestamp |
209 | 211 | ||
210 | -- | Pad bytestring so it's becomes exactly request length. Conversion | 212 | -- | Pad bytestring so it's becomes exactly request length. Conversion |
211 | -- is done like so: | 213 | -- is done like so: |
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs index 1f34bd4c..4d4cf629 100644 --- a/src/Network/BitTorrent/Sessions.hs +++ b/src/Network/BitTorrent/Sessions.hs | |||
@@ -30,15 +30,14 @@ module Network.BitTorrent.Sessions | |||
30 | , getCurrentProgress | 30 | , getCurrentProgress |
31 | , getSwarmCount | 31 | , getSwarmCount |
32 | , getPeerCount | 32 | , getPeerCount |
33 | , getSwarm | ||
34 | , openSwarmSession | ||
33 | 35 | ||
34 | -- * Swarm | 36 | -- * Swarm |
35 | , SwarmSession( SwarmSession, torrentMeta, clientSession ) | 37 | , SwarmSession( SwarmSession, torrentMeta, clientSession ) |
36 | 38 | ||
37 | , SessionCount | 39 | , SessionCount |
38 | , getSessionCount | 40 | , getSessionCount |
39 | |||
40 | , newLeecher | ||
41 | , newSeeder | ||
42 | , getClientBitfield | 41 | , getClientBitfield |
43 | 42 | ||
44 | , discover | 43 | , discover |
@@ -63,6 +62,7 @@ import Data.Set as S | |||
63 | import Data.Serialize hiding (get) | 62 | import Data.Serialize hiding (get) |
64 | 63 | ||
65 | import Network hiding (accept) | 64 | import Network hiding (accept) |
65 | import Network.BSD | ||
66 | import Network.Socket | 66 | import Network.Socket |
67 | import Network.Socket.ByteString | 67 | import Network.Socket.ByteString |
68 | 68 | ||
@@ -177,14 +177,14 @@ getListenerPort ClientSession {..} = servPort <$> readMVar peerListener | |||
177 | defSeederConns :: SessionCount | 177 | defSeederConns :: SessionCount |
178 | defSeederConns = defaultUnchokeSlots | 178 | defSeederConns = defaultUnchokeSlots |
179 | 179 | ||
180 | defLeacherConns :: SessionCount | 180 | defLeecherConns :: SessionCount |
181 | defLeacherConns = defaultNumWant | 181 | defLeecherConns = defaultNumWant |
182 | 182 | ||
183 | -- discovery should hide tracker and DHT communication under the hood | 183 | -- discovery should hide tracker and DHT communication under the hood |
184 | -- thus we can obtain an unified interface | 184 | -- thus we can obtain an unified interface |
185 | 185 | ||
186 | discover :: SwarmSession -> P2P () -> IO () | 186 | discover :: SwarmSession -> IO () |
187 | discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do | 187 | discover swarm @ SwarmSession {..} = {-# SCC discover #-} do |
188 | port <- getListenerPort clientSession | 188 | port <- getListenerPort clientSession |
189 | 189 | ||
190 | let conn = TConnection { | 190 | let conn = TConnection { |
@@ -199,37 +199,46 @@ discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do | |||
199 | withTracker progress conn $ \tses -> do | 199 | withTracker progress conn $ \tses -> do |
200 | forever $ do | 200 | forever $ do |
201 | addr <- getPeerAddr tses | 201 | addr <- getPeerAddr tses |
202 | print addr | ||
202 | forkThrottle swarm $ do | 203 | forkThrottle swarm $ do |
203 | initiatePeerSession swarm addr $ \conn -> | 204 | print addr |
204 | runP2P conn action | 205 | initiatePeerSession swarm addr $ \conn -> do |
206 | print addr | ||
207 | runP2P conn (exchange storage) | ||
205 | 208 | ||
206 | registerSwarmSession :: SwarmSession -> IO () | 209 | registerSwarmSession :: SwarmSession -> STM () |
207 | registerSwarmSession = undefined | 210 | registerSwarmSession ss @ SwarmSession {..} = |
211 | modifyTVar' (swarmSessions clientSession) $ | ||
212 | M.insert (tInfoHash torrentMeta) ss | ||
208 | 213 | ||
209 | unregisterSwarmSession :: SwarmSession -> IO () | 214 | unregisterSwarmSession :: SwarmSession -> STM () |
210 | unregisterSwarmSession SwarmSession {..} = | 215 | unregisterSwarmSession SwarmSession {..} = |
211 | atomically $ modifyTVar (swarmSessions clientSession) $ | 216 | modifyTVar' (swarmSessions clientSession) $ |
212 | M.delete $ tInfoHash torrentMeta | 217 | M.delete $ tInfoHash torrentMeta |
213 | 218 | ||
214 | newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | 219 | openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession |
215 | -> IO SwarmSession | 220 | openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do |
216 | newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | 221 | t <- validateLocation loc |
217 | = SwarmSession t cs | 222 | let bf = haveNone $ pieceCount $ tInfo t |
218 | <$> MSem.new n | 223 | |
224 | ss <- SwarmSession t cs | ||
225 | <$> MSem.new defLeecherConns | ||
219 | <*> newTVarIO bf | 226 | <*> newTVarIO bf |
220 | <*> undefined | 227 | <*> openStorage t dataDirPath |
221 | <*> newTVarIO S.empty | 228 | <*> newTVarIO S.empty |
222 | <*> newBroadcastTChanIO | 229 | <*> newBroadcastTChanIO |
223 | 230 | ||
224 | -- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession | 231 | atomically $ do |
225 | -- > openSwarmSession ClientSession {..} ih = do | 232 | modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t |
226 | -- > loc <- HM.lookup <$> readTVarIO torrentMap | 233 | registerSwarmSession ss |
227 | -- > torrent <- validateLocation loc | 234 | |
228 | -- > return undefined | 235 | forkIO $ discover ss |
236 | |||
237 | return ss | ||
229 | 238 | ||
230 | closeSwarmSession :: SwarmSession -> IO () | 239 | closeSwarmSession :: SwarmSession -> IO () |
231 | closeSwarmSession se @ SwarmSession {..} = do | 240 | closeSwarmSession se @ SwarmSession {..} = do |
232 | unregisterSwarmSession se | 241 | atomically $ unregisterSwarmSession se |
233 | -- TODO stop discovery | 242 | -- TODO stop discovery |
234 | -- TODO killall peer sessions | 243 | -- TODO killall peer sessions |
235 | -- TODO the order is important! | 244 | -- TODO the order is important! |
@@ -237,21 +246,11 @@ closeSwarmSession se @ SwarmSession {..} = do | |||
237 | 246 | ||
238 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | 247 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession |
239 | getSwarm cs @ ClientSession {..} ih = do | 248 | getSwarm cs @ ClientSession {..} ih = do |
240 | ss <- readTVarIO swarmSessions | 249 | status <- torrentPresence cs ih |
241 | case M.lookup ih ss of | 250 | case status of |
242 | Just sw -> return sw | 251 | Unknown -> throw $ UnknownTorrent ih |
243 | Nothing -> undefined -- openSwarmSession cs | 252 | Active sw -> return sw |
244 | 253 | Registered loc -> openSwarmSession cs loc | |
245 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
246 | newSeeder cs t @ Torrent {..} | ||
247 | = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
248 | |||
249 | -- | New swarm in which the client allowed both download and upload. | ||
250 | newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
251 | newLeecher cs t @ Torrent {..} = do | ||
252 | se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
253 | atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
254 | return se | ||
255 | 254 | ||
256 | -- | Get the number of connected peers in the given swarm. | 255 | -- | Get the number of connected peers in the given swarm. |
257 | getSessionCount :: SwarmSession -> IO SessionCount | 256 | getSessionCount :: SwarmSession -> IO SessionCount |
@@ -284,9 +283,6 @@ leaveSwarm SwarmSession {..} = do | |||
284 | MSem.signal vacantPeers | 283 | MSem.signal vacantPeers |
285 | MSem.signal (activeThreads clientSession) | 284 | MSem.signal (activeThreads clientSession) |
286 | 285 | ||
287 | waitVacancy :: SwarmSession -> IO () -> IO () | ||
288 | waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const | ||
289 | |||
290 | forkThrottle :: SwarmSession -> IO () -> IO ThreadId | 286 | forkThrottle :: SwarmSession -> IO () -> IO ThreadId |
291 | forkThrottle se action = do | 287 | forkThrottle se action = do |
292 | enterSwarm se | 288 | enterSwarm se |
@@ -304,14 +300,6 @@ registerTorrent = error "registerTorrent" | |||
304 | unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () | 300 | unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () |
305 | unregisterTorrent = error "unregisterTorrent" | 301 | unregisterTorrent = error "unregisterTorrent" |
306 | 302 | ||
307 | torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession | ||
308 | torrentSwarm _ _ (Active sws) = return sws | ||
309 | torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc | ||
310 | torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih | ||
311 | |||
312 | lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
313 | lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih | ||
314 | |||
315 | {----------------------------------------------------------------------- | 303 | {----------------------------------------------------------------------- |
316 | Peer session creation | 304 | Peer session creation |
317 | ------------------------------------------------------------------------ | 305 | ------------------------------------------------------------------------ |
@@ -353,6 +341,7 @@ openSession ss @ SwarmSession {..} addr Handshake {..} = do | |||
353 | registerPeerSession ps | 341 | registerPeerSession ps |
354 | return ps | 342 | return ps |
355 | 343 | ||
344 | -- TODO kill thread | ||
356 | closeSession :: PeerSession -> IO () | 345 | closeSession :: PeerSession -> IO () |
357 | closeSession = unregisterPeerSession | 346 | closeSession = unregisterPeerSession |
358 | 347 | ||
@@ -384,10 +373,11 @@ runSession connector opener action = | |||
384 | -- | Used then the client want to connect to a peer. | 373 | -- | Used then the client want to connect to a peer. |
385 | initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | 374 | initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () |
386 | initiatePeerSession ss @ SwarmSession {..} addr | 375 | initiatePeerSession ss @ SwarmSession {..} addr |
387 | = runSession (connectToPeer addr) initiated | 376 | = runSession (putStrLn ("trying to connect" ++ show addr) *> connectToPeer addr <* putStrLn "connected") initiated |
388 | where | 377 | where |
389 | initiated sock = do | 378 | initiated sock = do |
390 | phs <- handshake sock (swarmHandshake ss) | 379 | phs <- handshake sock (swarmHandshake ss) |
380 | putStrLn "handshaked" | ||
391 | ps <- openSession ss addr phs | 381 | ps <- openSession ss addr phs |
392 | sendClientStatus (sock, ps) | 382 | sendClientStatus (sock, ps) |
393 | return ps | 383 | return ps |
@@ -398,7 +388,7 @@ acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted | |||
398 | where | 388 | where |
399 | accepted sock = do | 389 | accepted sock = do |
400 | phs <- recvHandshake sock | 390 | phs <- recvHandshake sock |
401 | swarm <- lookupSwarm cs $ hsInfoHash phs | 391 | swarm <- getSwarm cs $ hsInfoHash phs |
402 | ps <- openSession swarm addr phs | 392 | ps <- openSession swarm addr phs |
403 | sendHandshake sock $ Handshake { | 393 | sendHandshake sock $ Handshake { |
404 | hsProtocol = defaultBTProtocol | 394 | hsProtocol = defaultBTProtocol |
@@ -413,17 +403,22 @@ listener :: ClientSession -> Exchange -> PortNumber -> IO () | |||
413 | listener cs action serverPort = bracket openListener close loop | 403 | listener cs action serverPort = bracket openListener close loop |
414 | where | 404 | where |
415 | loop sock = forever $ handle isIOError $ do | 405 | loop sock = forever $ handle isIOError $ do |
406 | putStrLn "listen" | ||
407 | print =<< getSocketName sock | ||
416 | (conn, addr) <- accept sock | 408 | (conn, addr) <- accept sock |
409 | putStrLn "accepted" | ||
417 | case addr of | 410 | case addr of |
418 | SockAddrInet port host -> do | 411 | SockAddrInet port host -> do |
419 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | 412 | forkIO $ do |
413 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
414 | return () | ||
420 | _ -> return () | 415 | _ -> return () |
421 | 416 | ||
422 | isIOError :: IOError -> IO () | 417 | isIOError :: IOError -> IO () |
423 | isIOError _ = return () | 418 | isIOError _ = return () |
424 | 419 | ||
425 | openListener = do | 420 | openListener = do |
426 | sock <- socket AF_INET Stream defaultProtocol | 421 | sock <- socket AF_INET Stream =<< getProtocolNumber "tcp" |
427 | bindSocket sock (SockAddrInet serverPort 0) | 422 | bindSocket sock (SockAddrInet serverPort iNADDR_ANY) |
428 | listen sock 1 | 423 | listen sock 1 |
429 | return sock | 424 | return sock |
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs index b320f0f9..b737a3df 100644 --- a/src/Network/BitTorrent/Tracker.hs +++ b/src/Network/BitTorrent/Tracker.hs | |||
@@ -241,9 +241,7 @@ withTracker initProgress conn action = bracket start end (action . fst) | |||
241 | -- commutative: this implements the heuristic "old peers | 241 | -- commutative: this implements the heuristic "old peers |
242 | -- in head" | 242 | -- in head" |
243 | old <- BC.getChanContents sePeers | 243 | old <- BC.getChanContents sePeers |
244 | let new = respPeers | 244 | let combined = L.union old respPeers |
245 | let combined = L.union old new | ||
246 | |||
247 | BC.writeList2Chan sePeers combined | 245 | BC.writeList2Chan sePeers combined |
248 | 246 | ||
249 | _ -> return () | 247 | _ -> return () |