summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bittorrent.cabal3
-rw-r--r--examples/Main.hs9
-rw-r--r--src/Network/BitTorrent.hs18
-rw-r--r--src/Network/BitTorrent/Exchange.hs11
-rw-r--r--src/Network/BitTorrent/Peer.hs4
-rw-r--r--src/Network/BitTorrent/Sessions.hs107
-rw-r--r--src/Network/BitTorrent/Tracker.hs4
-rw-r--r--src/System/Torrent/Storage.hs3
8 files changed, 73 insertions, 86 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index e5fbc058..f97aa746 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -45,9 +45,10 @@ library
45 , Network.BitTorrent.Exchange 45 , Network.BitTorrent.Exchange
46 , Network.BitTorrent.DHT 46 , Network.BitTorrent.DHT
47 , System.Torrent.Storage 47 , System.Torrent.Storage
48 , Network.BitTorrent.Sessions
48 49
49 other-modules: Network.BitTorrent.Sessions.Types 50 other-modules: Network.BitTorrent.Sessions.Types
50 , Network.BitTorrent.Sessions 51
51 52
52 if flag(testing) 53 if flag(testing)
53 exposed-modules: Network.BitTorrent.Exchange.Protocol 54 exposed-modules: Network.BitTorrent.Exchange.Protocol
diff --git a/examples/Main.hs b/examples/Main.hs
index b8e3c11f..5128b290 100644
--- a/examples/Main.hs
+++ b/examples/Main.hs
@@ -1,6 +1,8 @@
1module Main (main) where 1module Main (main) where
2 2
3import Control.Concurrent
3import Network.BitTorrent 4import Network.BitTorrent
5import Network.BitTorrent.Sessions
4import System.Environment 6import System.Environment
5 7
6main :: IO () 8main :: IO ()
@@ -8,6 +10,9 @@ main = do
8 [path] <- getArgs 10 [path] <- getArgs
9 torrent <- fromFile path 11 torrent <- fromFile path
10 print (contentLayout "./" (tInfo torrent)) 12 print (contentLayout "./" (tInfo torrent))
13 let loc = TorrentLoc path "/tmp"
11 14
12 withDefaultClient 3000 3001 $ \ client -> 15 withDefaultClient 51413 3000 $ \ client -> do
13 addTorrent client $ TorrentLoc path "/tmp" 16 openSwarmSession client loc
17 threadDelay 1000000000000
18 return ()
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs
index c166b1b1..26824724 100644
--- a/src/Network/BitTorrent.hs
+++ b/src/Network/BitTorrent.hs
@@ -28,25 +28,19 @@ module Network.BitTorrent
28 , ppExtension 28 , ppExtension
29 ) where 29 ) where
30 30
31import Control.Concurrent
32import Control.Monad
33import Control.Monad.Trans
34import Network 31import Network
35import Data.Torrent 32import Data.Torrent
36import Network.BitTorrent.Sessions.Types 33import Network.BitTorrent.Sessions.Types
37import Network.BitTorrent.Sessions 34import Network.BitTorrent.Sessions
38import Network.BitTorrent.Extension 35import Network.BitTorrent.Extension
39import Network.BitTorrent.Exchange
40
41import System.Torrent.Storage
42 36
43-- TODO remove fork from Network.BitTorrent.Exchange 37-- TODO remove fork from Network.BitTorrent.Exchange
44-- TODO make all forks in Internal. 38-- TODO make all forks in Internal.
45 39
46-- | Client session with default parameters. Use it for testing only. 40-- | Client session with default parameters. Use it for testing only.
47withDefaultClient :: PortNumber -> PortNumber -> (ClientSession -> IO ()) -> IO () 41withDefaultClient :: PortNumber -> PortNumber -> (ClientSession -> IO ()) -> IO ()
48withDefaultClient dhtPort listPort action = do 42withDefaultClient listPort dhtPort action = do
49 withClientSession defaultThreadCount defaultExtensions listPort dhtPort action 43 withClientSession defaultThreadCount [] listPort dhtPort action
50 44
51{----------------------------------------------------------------------- 45{-----------------------------------------------------------------------
52 Torrent management 46 Torrent management
@@ -68,14 +62,6 @@ addTorrent clientSession loc @ TorrentLoc {..} = do
68 torrent <- validateLocation loc 62 torrent <- validateLocation loc
69-- registerTorrent loc tInfoHash 63-- registerTorrent loc tInfoHash
70-- when (bf is not full) 64-- when (bf is not full)
71
72 swarm <- newLeecher clientSession torrent
73 storage <- openStorage (torrentMeta swarm) dataDirPath
74 forkIO $ discover swarm $ do
75 liftIO $ putStrLn "connected to peer"
76 forever $ do
77 liftIO $ putStrLn "from mesage loop"
78 exchange storage
79 return () 65 return ()
80 66
81-- | Unregister torrent and stop all running sessions. 67-- | Unregister torrent and stop all running sessions.
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index 32ab493d..52b5f690 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -71,7 +71,6 @@ module Network.BitTorrent.Exchange
71 71
72import Control.Applicative 72import Control.Applicative
73import Control.Exception 73import Control.Exception
74import Control.Concurrent
75import Control.Lens 74import Control.Lens
76import Control.Monad.Reader 75import Control.Monad.Reader
77import Control.Monad.State 76import Control.Monad.State
@@ -89,7 +88,6 @@ import Network
89 88
90import Data.Bitfield as BF 89import Data.Bitfield as BF
91import Network.BitTorrent.Extension 90import Network.BitTorrent.Extension
92import Network.BitTorrent.Peer
93import Network.BitTorrent.Exchange.Protocol 91import Network.BitTorrent.Exchange.Protocol
94import Network.BitTorrent.Sessions.Types 92import Network.BitTorrent.Sessions.Types
95import System.Torrent.Storage 93import System.Torrent.Storage
@@ -117,7 +115,8 @@ awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go
117{-# INLINE awaitMessage #-} 115{-# INLINE awaitMessage #-}
118 116
119yieldMessage :: Message -> P2P () 117yieldMessage :: Message -> P2P ()
120yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} C.yield msg 118yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do
119 C.yield msg
121{-# INLINE yieldMessage #-} 120{-# INLINE yieldMessage #-}
122 121
123-- TODO send vectored 122-- TODO send vectored
@@ -209,7 +208,6 @@ singletonBF i = liftM (BF.singleton i) getPieceCount
209adjustBF :: Bitfield -> P2P Bitfield 208adjustBF :: Bitfield -> P2P Bitfield
210adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount 209adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount
211 210
212
213peerWant :: P2P Bitfield 211peerWant :: P2P Bitfield
214peerWant = BF.difference <$> getClientBF <*> use bitfield 212peerWant = BF.difference <$> getClientBF <*> use bitfield
215 213
@@ -306,7 +304,8 @@ data Event
306awaitEvent :: P2P Event 304awaitEvent :: P2P Event
307awaitEvent = {-# SCC awaitEvent #-} do 305awaitEvent = {-# SCC awaitEvent #-} do
308 flushPending 306 flushPending
309 awaitMessage >>= go 307 msg <- awaitMessage
308 go msg
310 where 309 where
311 go KeepAlive = awaitEvent 310 go KeepAlive = awaitEvent
312 go Choke = do 311 go Choke = do
@@ -464,7 +463,7 @@ exchange :: Storage -> P2P ()
464exchange storage = {-# SCC exchange #-} awaitEvent >>= handler 463exchange storage = {-# SCC exchange #-} awaitEvent >>= handler
465 where 464 where
466 handler (Available bf) = do 465 handler (Available bf) = do
467 liftIO (print (completeness bf)) 466 liftIO $ print (completeness bf)
468 ixs <- selBlk (findMin bf) storage 467 ixs <- selBlk (findMin bf) storage
469 mapM_ (yieldEvent . Want) ixs -- TODO yield vectored 468 mapM_ (yieldEvent . Want) ixs -- TODO yield vectored
470 469
diff --git a/src/Network/BitTorrent/Peer.hs b/src/Network/BitTorrent/Peer.hs
index cdcd65ea..7bac336b 100644
--- a/src/Network/BitTorrent/Peer.hs
+++ b/src/Network/BitTorrent/Peer.hs
@@ -196,6 +196,8 @@ timestamp = (BC.pack . format) <$> getCurrentTime
196entropy :: IO ByteString 196entropy :: IO ByteString
197entropy = getEntropy 15 197entropy = getEntropy 15
198 198
199-- NOTE: entropy generates incorrrect peer id
200
199-- | Here we use Azureus-style encoding with the following args: 201-- | Here we use Azureus-style encoding with the following args:
200-- 202--
201-- * 'HS' for the client id. 203-- * 'HS' for the client id.
@@ -205,7 +207,7 @@ entropy = getEntropy 15
205-- * UTC time day ++ day time for the random number. 207-- * UTC time day ++ day time for the random number.
206-- 208--
207genPeerId :: IO PeerId 209genPeerId :: IO PeerId
208genPeerId = azureusStyle defaultClientId defaultVersionNumber <$> entropy 210genPeerId = azureusStyle defaultClientId defaultVersionNumber <$> timestamp
209 211
210-- | Pad bytestring so it's becomes exactly request length. Conversion 212-- | Pad bytestring so it's becomes exactly request length. Conversion
211-- is done like so: 213-- is done like so:
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs
index 1f34bd4c..4d4cf629 100644
--- a/src/Network/BitTorrent/Sessions.hs
+++ b/src/Network/BitTorrent/Sessions.hs
@@ -30,15 +30,14 @@ module Network.BitTorrent.Sessions
30 , getCurrentProgress 30 , getCurrentProgress
31 , getSwarmCount 31 , getSwarmCount
32 , getPeerCount 32 , getPeerCount
33 , getSwarm
34 , openSwarmSession
33 35
34 -- * Swarm 36 -- * Swarm
35 , SwarmSession( SwarmSession, torrentMeta, clientSession ) 37 , SwarmSession( SwarmSession, torrentMeta, clientSession )
36 38
37 , SessionCount 39 , SessionCount
38 , getSessionCount 40 , getSessionCount
39
40 , newLeecher
41 , newSeeder
42 , getClientBitfield 41 , getClientBitfield
43 42
44 , discover 43 , discover
@@ -63,6 +62,7 @@ import Data.Set as S
63import Data.Serialize hiding (get) 62import Data.Serialize hiding (get)
64 63
65import Network hiding (accept) 64import Network hiding (accept)
65import Network.BSD
66import Network.Socket 66import Network.Socket
67import Network.Socket.ByteString 67import Network.Socket.ByteString
68 68
@@ -177,14 +177,14 @@ getListenerPort ClientSession {..} = servPort <$> readMVar peerListener
177defSeederConns :: SessionCount 177defSeederConns :: SessionCount
178defSeederConns = defaultUnchokeSlots 178defSeederConns = defaultUnchokeSlots
179 179
180defLeacherConns :: SessionCount 180defLeecherConns :: SessionCount
181defLeacherConns = defaultNumWant 181defLeecherConns = defaultNumWant
182 182
183-- discovery should hide tracker and DHT communication under the hood 183-- discovery should hide tracker and DHT communication under the hood
184-- thus we can obtain an unified interface 184-- thus we can obtain an unified interface
185 185
186discover :: SwarmSession -> P2P () -> IO () 186discover :: SwarmSession -> IO ()
187discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do 187discover swarm @ SwarmSession {..} = {-# SCC discover #-} do
188 port <- getListenerPort clientSession 188 port <- getListenerPort clientSession
189 189
190 let conn = TConnection { 190 let conn = TConnection {
@@ -199,37 +199,46 @@ discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do
199 withTracker progress conn $ \tses -> do 199 withTracker progress conn $ \tses -> do
200 forever $ do 200 forever $ do
201 addr <- getPeerAddr tses 201 addr <- getPeerAddr tses
202 print addr
202 forkThrottle swarm $ do 203 forkThrottle swarm $ do
203 initiatePeerSession swarm addr $ \conn -> 204 print addr
204 runP2P conn action 205 initiatePeerSession swarm addr $ \conn -> do
206 print addr
207 runP2P conn (exchange storage)
205 208
206registerSwarmSession :: SwarmSession -> IO () 209registerSwarmSession :: SwarmSession -> STM ()
207registerSwarmSession = undefined 210registerSwarmSession ss @ SwarmSession {..} =
211 modifyTVar' (swarmSessions clientSession) $
212 M.insert (tInfoHash torrentMeta) ss
208 213
209unregisterSwarmSession :: SwarmSession -> IO () 214unregisterSwarmSession :: SwarmSession -> STM ()
210unregisterSwarmSession SwarmSession {..} = 215unregisterSwarmSession SwarmSession {..} =
211 atomically $ modifyTVar (swarmSessions clientSession) $ 216 modifyTVar' (swarmSessions clientSession) $
212 M.delete $ tInfoHash torrentMeta 217 M.delete $ tInfoHash torrentMeta
213 218
214newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent 219openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession
215 -> IO SwarmSession 220openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
216newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} 221 t <- validateLocation loc
217 = SwarmSession t cs 222 let bf = haveNone $ pieceCount $ tInfo t
218 <$> MSem.new n 223
224 ss <- SwarmSession t cs
225 <$> MSem.new defLeecherConns
219 <*> newTVarIO bf 226 <*> newTVarIO bf
220 <*> undefined 227 <*> openStorage t dataDirPath
221 <*> newTVarIO S.empty 228 <*> newTVarIO S.empty
222 <*> newBroadcastTChanIO 229 <*> newBroadcastTChanIO
223 230
224-- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession 231 atomically $ do
225-- > openSwarmSession ClientSession {..} ih = do 232 modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t
226-- > loc <- HM.lookup <$> readTVarIO torrentMap 233 registerSwarmSession ss
227-- > torrent <- validateLocation loc 234
228-- > return undefined 235 forkIO $ discover ss
236
237 return ss
229 238
230closeSwarmSession :: SwarmSession -> IO () 239closeSwarmSession :: SwarmSession -> IO ()
231closeSwarmSession se @ SwarmSession {..} = do 240closeSwarmSession se @ SwarmSession {..} = do
232 unregisterSwarmSession se 241 atomically $ unregisterSwarmSession se
233 -- TODO stop discovery 242 -- TODO stop discovery
234 -- TODO killall peer sessions 243 -- TODO killall peer sessions
235 -- TODO the order is important! 244 -- TODO the order is important!
@@ -237,21 +246,11 @@ closeSwarmSession se @ SwarmSession {..} = do
237 246
238getSwarm :: ClientSession -> InfoHash -> IO SwarmSession 247getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
239getSwarm cs @ ClientSession {..} ih = do 248getSwarm cs @ ClientSession {..} ih = do
240 ss <- readTVarIO swarmSessions 249 status <- torrentPresence cs ih
241 case M.lookup ih ss of 250 case status of
242 Just sw -> return sw 251 Unknown -> throw $ UnknownTorrent ih
243 Nothing -> undefined -- openSwarmSession cs 252 Active sw -> return sw
244 253 Registered loc -> openSwarmSession cs loc
245newSeeder :: ClientSession -> Torrent -> IO SwarmSession
246newSeeder cs t @ Torrent {..}
247 = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
248
249-- | New swarm in which the client allowed both download and upload.
250newLeecher :: ClientSession -> Torrent -> IO SwarmSession
251newLeecher cs t @ Torrent {..} = do
252 se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
253 atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
254 return se
255 254
256-- | Get the number of connected peers in the given swarm. 255-- | Get the number of connected peers in the given swarm.
257getSessionCount :: SwarmSession -> IO SessionCount 256getSessionCount :: SwarmSession -> IO SessionCount
@@ -284,9 +283,6 @@ leaveSwarm SwarmSession {..} = do
284 MSem.signal vacantPeers 283 MSem.signal vacantPeers
285 MSem.signal (activeThreads clientSession) 284 MSem.signal (activeThreads clientSession)
286 285
287waitVacancy :: SwarmSession -> IO () -> IO ()
288waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const
289
290forkThrottle :: SwarmSession -> IO () -> IO ThreadId 286forkThrottle :: SwarmSession -> IO () -> IO ThreadId
291forkThrottle se action = do 287forkThrottle se action = do
292 enterSwarm se 288 enterSwarm se
@@ -304,14 +300,6 @@ registerTorrent = error "registerTorrent"
304unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () 300unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
305unregisterTorrent = error "unregisterTorrent" 301unregisterTorrent = error "unregisterTorrent"
306 302
307torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
308torrentSwarm _ _ (Active sws) = return sws
309torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc
310torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih
311
312lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession
313lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih
314
315{----------------------------------------------------------------------- 303{-----------------------------------------------------------------------
316 Peer session creation 304 Peer session creation
317------------------------------------------------------------------------ 305------------------------------------------------------------------------
@@ -353,6 +341,7 @@ openSession ss @ SwarmSession {..} addr Handshake {..} = do
353 registerPeerSession ps 341 registerPeerSession ps
354 return ps 342 return ps
355 343
344-- TODO kill thread
356closeSession :: PeerSession -> IO () 345closeSession :: PeerSession -> IO ()
357closeSession = unregisterPeerSession 346closeSession = unregisterPeerSession
358 347
@@ -384,10 +373,11 @@ runSession connector opener action =
384-- | Used then the client want to connect to a peer. 373-- | Used then the client want to connect to a peer.
385initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () 374initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
386initiatePeerSession ss @ SwarmSession {..} addr 375initiatePeerSession ss @ SwarmSession {..} addr
387 = runSession (connectToPeer addr) initiated 376 = runSession (putStrLn ("trying to connect" ++ show addr) *> connectToPeer addr <* putStrLn "connected") initiated
388 where 377 where
389 initiated sock = do 378 initiated sock = do
390 phs <- handshake sock (swarmHandshake ss) 379 phs <- handshake sock (swarmHandshake ss)
380 putStrLn "handshaked"
391 ps <- openSession ss addr phs 381 ps <- openSession ss addr phs
392 sendClientStatus (sock, ps) 382 sendClientStatus (sock, ps)
393 return ps 383 return ps
@@ -398,7 +388,7 @@ acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
398 where 388 where
399 accepted sock = do 389 accepted sock = do
400 phs <- recvHandshake sock 390 phs <- recvHandshake sock
401 swarm <- lookupSwarm cs $ hsInfoHash phs 391 swarm <- getSwarm cs $ hsInfoHash phs
402 ps <- openSession swarm addr phs 392 ps <- openSession swarm addr phs
403 sendHandshake sock $ Handshake { 393 sendHandshake sock $ Handshake {
404 hsProtocol = defaultBTProtocol 394 hsProtocol = defaultBTProtocol
@@ -413,17 +403,22 @@ listener :: ClientSession -> Exchange -> PortNumber -> IO ()
413listener cs action serverPort = bracket openListener close loop 403listener cs action serverPort = bracket openListener close loop
414 where 404 where
415 loop sock = forever $ handle isIOError $ do 405 loop sock = forever $ handle isIOError $ do
406 putStrLn "listen"
407 print =<< getSocketName sock
416 (conn, addr) <- accept sock 408 (conn, addr) <- accept sock
409 putStrLn "accepted"
417 case addr of 410 case addr of
418 SockAddrInet port host -> do 411 SockAddrInet port host -> do
419 acceptPeerSession cs (PeerAddr Nothing host port) conn action 412 forkIO $ do
413 acceptPeerSession cs (PeerAddr Nothing host port) conn action
414 return ()
420 _ -> return () 415 _ -> return ()
421 416
422 isIOError :: IOError -> IO () 417 isIOError :: IOError -> IO ()
423 isIOError _ = return () 418 isIOError _ = return ()
424 419
425 openListener = do 420 openListener = do
426 sock <- socket AF_INET Stream defaultProtocol 421 sock <- socket AF_INET Stream =<< getProtocolNumber "tcp"
427 bindSocket sock (SockAddrInet serverPort 0) 422 bindSocket sock (SockAddrInet serverPort iNADDR_ANY)
428 listen sock 1 423 listen sock 1
429 return sock 424 return sock
diff --git a/src/Network/BitTorrent/Tracker.hs b/src/Network/BitTorrent/Tracker.hs
index b320f0f9..b737a3df 100644
--- a/src/Network/BitTorrent/Tracker.hs
+++ b/src/Network/BitTorrent/Tracker.hs
@@ -241,9 +241,7 @@ withTracker initProgress conn action = bracket start end (action . fst)
241 -- commutative: this implements the heuristic "old peers 241 -- commutative: this implements the heuristic "old peers
242 -- in head" 242 -- in head"
243 old <- BC.getChanContents sePeers 243 old <- BC.getChanContents sePeers
244 let new = respPeers 244 let combined = L.union old respPeers
245 let combined = L.union old new
246
247 BC.writeList2Chan sePeers combined 245 BC.writeList2Chan sePeers combined
248 246
249 _ -> return () 247 _ -> return ()
diff --git a/src/System/Torrent/Storage.hs b/src/System/Torrent/Storage.hs
index 6a748fe3..cd1a8364 100644
--- a/src/System/Torrent/Storage.hs
+++ b/src/System/Torrent/Storage.hs
@@ -92,8 +92,9 @@ openStorage t @ Torrent {..} contentPath = do
92 unless exist $ do 92 unless exist $ do
93 createDirectoryIfMissing True dirPath 93 createDirectoryIfMissing True dirPath
94 94
95-- TODO
95closeStorage :: Storage -> IO () 96closeStorage :: Storage -> IO ()
96closeStorage st = error "closeStorage" 97closeStorage st = return ()
97 98
98 99
99withStorage :: Torrent -> FilePath -> (Storage -> IO a) -> IO a 100withStorage :: Torrent -> FilePath -> (Storage -> IO a) -> IO a