summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-01-22 00:30:01 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-01-22 00:30:01 +0400
commite642358ffe4673ab0a03c5aafa628ffc86b17abd (patch)
treea4b60645255041838d56c358ffd97cec175ec3c4 /src
parent7e6d3f9edb55d686391b10386d917ef474f36c84 (diff)
Cleanup old sessions
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent.hs123
-rw-r--r--src/Network/BitTorrent/Exchange.hs428
-rw-r--r--src/Network/BitTorrent/Sessions.hs446
-rw-r--r--src/Network/BitTorrent/Sessions/Types.lhs316
4 files changed, 2 insertions, 1311 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs
index d8888416..21528efd 100644
--- a/src/Network/BitTorrent.hs
+++ b/src/Network/BitTorrent.hs
@@ -7,126 +7,5 @@
7-- 7--
8{-# LANGUAGE RecordWildCards #-} 8{-# LANGUAGE RecordWildCards #-}
9module Network.BitTorrent 9module Network.BitTorrent
10 ( module Data.Torrent 10 (
11
12 , TorrentLoc(..), TorrentMap, Progress(..)
13 , ThreadCount, SessionCount
14
15 , ClientSession( clientPeerId, allowedExtensions )
16 , withDefaultClient, defaultThreadCount, defaultPorts
17 , addTorrent
18 , removeTorrent
19
20 , getCurrentProgress
21 , getPeerCount
22 , getSwarmCount
23 , getSessionCount
24 , getSwarm
25 , getStorage
26 , getTorrentInfo
27 , getTorrentInfoStr
28
29 -- * Torrent Groups
30 , ClientLoc (..), ppClientLoc
31 , concreteLoc, concretePath
32 , addTorrents
33 , removeTorrents
34
35 -- * Extensions
36 , Extension
37 , defaultExtensions
38 , ppExtension
39 ) where 11 ) where
40
41import Control.Applicative
42import Control.Exception
43import Control.Monad
44import Data.List as L
45import Data.HashMap.Strict as HM
46import Network
47import Text.Read
48import Text.PrettyPrint
49import System.Directory
50import System.FilePath
51
52import Data.Torrent
53import Network.BitTorrent.Sessions.Types
54import Network.BitTorrent.Sessions
55import Network.BitTorrent.Extension
56import Network.BitTorrent.Tracker
57
58
59-- TODO remove fork from Network.BitTorrent.Exchange
60-- TODO make all forks in Internal.
61
62-- | Client session with default parameters. Use it for testing only.
63withDefaultClient :: PortNumber -> PortNumber -> (ClientSession -> IO ()) -> IO ()
64withDefaultClient listPort dhtPort action = do
65 withClientSession defaultThreadCount [] listPort dhtPort action
66
67getTorrentInfoStr :: ClientSession -> String -> IO (Maybe Torrent)
68getTorrentInfoStr cs str
69 | Just infohash <- readMaybe str = getTorrentInfo cs infohash
70 | otherwise = return Nothing
71
72{-----------------------------------------------------------------------
73 Torrent management
74-----------------------------------------------------------------------}
75
76-- | Register torrent and start downloading.
77addTorrent :: ClientSession -> TorrentLoc -> IO ()
78addTorrent cs loc @ TorrentLoc {..} = do
79 registerTorrent cs loc
80 openSwarmSession cs loc
81 return ()
82
83-- | Unregister torrent and stop all running sessions.
84removeTorrent :: ClientSession -> InfoHash -> IO ()
85removeTorrent = unregisterTorrent
86
87{-
88-- | The same as 'removeTorrrent' torrent, but delete all torrent
89-- content files.
90deleteTorrent :: ClientSession -> TorrentLoc -> IO ()
91deleteTorrent ClientSession {..} TorrentLoc {..} = undefined
92-}
93
94{-----------------------------------------------------------------------
95 Torrent group management
96-----------------------------------------------------------------------}
97-- TODO better name
98
99data ClientLoc = ClientLoc
100 { tdir :: FilePath -- ^ Path to directory with .torrent files.
101 , ddir :: FilePath -- ^ Path to directory to place content.
102 } deriving (Show, Eq)
103
104ppClientLoc :: ClientLoc -> Doc
105ppClientLoc ClientLoc {..} =
106 text "torrent directory" <+> text tdir $$
107 text "data directory" <+> text ddir
108
109concretePath :: ClientLoc -> FilePath -> FilePath
110concretePath ClientLoc {..} relPath = tdir </> relPath
111
112concreteLoc :: ClientLoc -> FilePath -> TorrentLoc
113concreteLoc loc @ ClientLoc {..} relPath
114 = TorrentLoc (concretePath loc relPath) ddir
115
116addTorrents :: ClientSession -> ClientLoc -> IO ()
117addTorrents ses loc @ ClientLoc {..} = do
118 paths <- L.filter isTorrentPath <$> getDirectoryContents tdir
119 forM_ paths $ handle handler . addTorrent ses . concreteLoc loc
120 where
121 handler :: SomeException -> IO ()
122 handler = print
123
124removeTorrents :: ClientSession -> IO ()
125removeTorrents cs = do
126 tm <- getRegistered cs
127 forM_ (keys tm) (removeTorrent cs)
128
129{-
130deleteTorrents :: ClientSession -> IO ()
131deleteTorrents = undefined
132-} \ No newline at end of file
diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs
index c1377449..934c646d 100644
--- a/src/Network/BitTorrent/Exchange.hs
+++ b/src/Network/BitTorrent/Exchange.hs
@@ -1,16 +1,3 @@
1{- TODO turn awaitEvent and yieldEvent to sourcePeer and sinkPeer
2
3 sourceSocket sock $=
4 conduitGet S.get $=
5 sourcePeer $=
6 p2p $=
7 sinkPeer $=
8 conduitPut S.put $$
9 sinkSocket sock
10
11 measure performance
12 -}
13
14-- | 1-- |
15-- Copyright : (c) Sam Truzjan 2013 2-- Copyright : (c) Sam Truzjan 2013
16-- License : BSD3 3-- License : BSD3
@@ -18,419 +5,6 @@
18-- Stability : experimental 5-- Stability : experimental
19-- Portability : portable 6-- Portability : portable
20-- 7--
21-- This module provides P2P communication and aims to hide the
22-- following stuff under the hood:
23--
24-- * TODO;
25--
26-- * /keep alives/ -- ;
27--
28-- * /choking mechanism/ -- is used ;
29--
30-- * /message broadcasting/ -- ;
31--
32-- * /message filtering/ -- due to network latency and concurrency
33-- some arriving messages might not make sense in the current
34-- session context;
35--
36-- * /scatter\/gather pieces/ -- ;
37--
38-- * /various P2P protocol extensions/ -- .
39--
40-- Finally we get a simple event-based communication model.
41--
42{-# LANGUAGE GeneralizedNewtypeDeriving #-}
43{-# LANGUAGE MultiParamTypeClasses #-}
44{-# LANGUAGE BangPatterns #-}
45module Network.BitTorrent.Exchange 8module Network.BitTorrent.Exchange
46 ( P2P 9 (
47 , runP2P
48
49 -- * Query
50 , getHaveCount
51 , getWantCount
52 , getPieceCount
53 , peerOffer
54
55 -- * Events
56 , Event(..)
57 , awaitEvent
58 , yieldEvent
59 , handleEvent
60 , exchange
61 , p2p
62
63 -- * Exceptions
64 , disconnect
65 , protocolError
66
67 -- * Block
68 , Block(..), BlockIx(..)
69
70 -- * Status
71 , PeerStatus(..), SessionStatus(..)
72 , inverseStatus
73 , canDownload, canUpload
74 ) where 10 ) where
75
76import Control.Applicative
77import Control.Concurrent.STM
78import Control.Exception
79import Control.Lens
80import Control.Monad.Reader
81import Control.Monad.State
82import Control.Monad.Trans.Resource
83
84import Data.IORef
85import Data.Conduit as C
86import Data.Conduit.Cereal as S
87import Data.Conduit.Network
88import Data.Serialize as S
89import Text.PrettyPrint as PP hiding (($$))
90
91import Network
92
93import Data.Torrent.Block
94import Data.Torrent.Bitfield as BF
95import Network.BitTorrent.Extension
96import Network.BitTorrent.Exchange.Protocol
97import Network.BitTorrent.Sessions.Types
98import System.Torrent.Storage
99
100
101{-----------------------------------------------------------------------
102 Exceptions
103-----------------------------------------------------------------------}
104
105-- | Terminate the current 'P2P' session.
106disconnect :: P2P a
107disconnect = monadThrow PeerDisconnected
108
109-- TODO handle all protocol details here so we can hide this from
110-- public interface |
111protocolError :: Doc -> P2P a
112protocolError = monadThrow . ProtocolError
113
114{-----------------------------------------------------------------------
115 Helpers
116-----------------------------------------------------------------------}
117
118getClientBF :: P2P Bitfield
119getClientBF = asks swarmSession >>= liftIO . getClientBitfield
120{-# INLINE getClientBF #-}
121
122-- | Count of client /have/ pieces.
123getHaveCount :: P2P PieceCount
124getHaveCount = haveCount <$> getClientBF
125{-# INLINE getHaveCount #-}
126
127-- | Count of client do not /have/ pieces.
128getWantCount :: P2P PieceCount
129getWantCount = totalCount <$> getClientBF
130{-# INLINE getWantCount #-}
131
132-- | Count of both /have/ and /want/ pieces.
133getPieceCount :: P2P PieceCount
134getPieceCount = asks findPieceCount
135{-# INLINE getPieceCount #-}
136
137-- for internal use only
138emptyBF :: P2P Bitfield
139emptyBF = liftM haveNone getPieceCount
140
141fullBF :: P2P Bitfield
142fullBF = liftM haveAll getPieceCount
143
144singletonBF :: PieceIx -> P2P Bitfield
145singletonBF i = liftM (BF.singleton i) getPieceCount
146
147adjustBF :: Bitfield -> P2P Bitfield
148adjustBF bf = (`adjustSize` bf) `liftM` getPieceCount
149
150peerWant :: P2P Bitfield
151peerWant = BF.difference <$> getClientBF <*> use bitfield
152
153clientWant :: P2P Bitfield
154clientWant = BF.difference <$> use bitfield <*> getClientBF
155
156peerOffer :: P2P Bitfield
157peerOffer = do
158 sessionStatus <- use status
159 if canDownload sessionStatus then clientWant else emptyBF
160
161clientOffer :: P2P Bitfield
162clientOffer = do
163 sessionStatus <- use status
164 if canUpload sessionStatus then peerWant else emptyBF
165
166
167
168revise :: P2P Bitfield
169revise = do
170 want <- clientWant
171 let peerInteresting = not (BF.null want)
172 clientInterested <- use (status.clientStatus.interested)
173
174 when (clientInterested /= peerInteresting) $ do
175 yieldMessage $ if peerInteresting then Interested else NotInterested
176 status.clientStatus.interested .= peerInteresting
177
178 return want
179
180requireExtension :: Extension -> P2P ()
181requireExtension required = do
182 enabled <- asks enabledExtensions
183 unless (required `elem` enabled) $
184 protocolError $ ppExtension required <+> "not enabled"
185
186-- haveMessage bf = do
187-- cbf <- undefined -- liftIO $ readIORef $ clientBitfield swarmSession
188-- if undefined -- ix `member` bf
189-- then nextEvent se
190-- else undefined -- return $ Available diff
191
192{-----------------------------------------------------------------------
193 Exchange
194-----------------------------------------------------------------------}
195
196
197-- | The 'Event' occur when either client or a peer change their
198-- state. 'Event' are similar to 'Message' but differ in. We could
199-- both wait for an event or raise an event using the 'awaitEvent' and
200-- 'yieldEvent' functions respectively.
201--
202--
203-- 'awaitEvent'\/'yieldEvent' properties:
204--
205-- * between any await or yield state of the (another)peer could not change.
206--
207data Event
208 -- | Generalize 'Bitfield', 'Have', 'HaveAll', 'HaveNone',
209 -- 'SuggestPiece', 'AllowedFast' messages.
210 = Available Bitfield
211
212 -- | Generalize 'Request' and 'Interested' messages.
213 | Want BlockIx
214
215 -- | Generalize 'Piece' and 'Unchoke' messages.
216 | Fragment Block
217 deriving Show
218
219-- INVARIANT:
220--
221-- * Available Bitfield is never empty
222--
223
224-- | You could think of 'awaitEvent' as wait until something interesting occur.
225--
226-- The following table shows which events may occur:
227--
228-- > +----------+---------+
229-- > | Leacher | Seeder |
230-- > |----------+---------+
231-- > | Available| |
232-- > | Want | Want |
233-- > | Fragment | |
234-- > +----------+---------+
235--
236-- The reason is that seeder is not interested in any piece, and
237-- both available or fragment events doesn't make sense in this context.
238--
239-- Some properties:
240--
241-- forall (Fragment block). isPiece block == True
242--
243awaitEvent :: P2P Event
244awaitEvent = {-# SCC awaitEvent #-} do
245 flushPending
246 msg <- awaitMessage
247 go msg
248 where
249 go KeepAlive = awaitEvent
250 go Choke = do
251 status.peerStatus.choking .= True
252 awaitEvent
253
254 go Unchoke = do
255 status.peerStatus.choking .= False
256 offer <- peerOffer
257 if BF.null offer
258 then awaitEvent
259 else return (Available offer)
260
261 go Interested = do
262 status.peerStatus.interested .= True
263 awaitEvent
264
265 go NotInterested = do
266 status.peerStatus.interested .= False
267 awaitEvent
268
269 go (Have idx) = do
270 bitfield %= have idx
271 _ <- revise
272
273 offer <- peerOffer
274 if not (BF.null offer)
275 then return (Available offer)
276 else awaitEvent
277
278 go (Bitfield bf) = do
279 new <- adjustBF bf
280 bitfield .= new
281 _ <- revise
282
283 offer <- peerOffer
284 if not (BF.null offer)
285 then return (Available offer)
286 else awaitEvent
287
288 go (Request bix) = do
289 bf <- clientOffer
290 if ixPiece bix `BF.member` bf
291 then return (Want bix)
292 else do
293-- check if extension is enabled
294-- yieldMessage (RejectRequest bix)
295 awaitEvent
296
297 go (Piece blk) = do
298 -- this protect us from malicious peers and duplication
299 wanted <- clientWant
300 if blkPiece blk `BF.member` wanted
301 then return (Fragment blk)
302 else awaitEvent
303
304 go (Cancel _) = do
305 error "cancel message not implemented"
306
307 go (Port _) = do
308 requireExtension ExtDHT
309 error "port message not implemented"
310
311 go HaveAll = do
312 requireExtension ExtFast
313 bitfield <~ fullBF
314 _ <- revise
315 awaitEvent
316
317 go HaveNone = do
318 requireExtension ExtFast
319 bitfield <~ emptyBF
320 _ <- revise
321 awaitEvent
322
323 go (SuggestPiece idx) = do
324 requireExtension ExtFast
325 bf <- use bitfield
326 if idx `BF.notMember` bf
327 then Available <$> singletonBF idx
328 else awaitEvent
329
330 go (RejectRequest _) = do
331 requireExtension ExtFast
332 awaitEvent
333
334 go (AllowedFast _) = do
335 requireExtension ExtFast
336 awaitEvent
337
338-- TODO minimize number of peerOffer calls
339
340-- | Raise an events which may occur
341--
342-- This table shows when a some specific events /makes sense/ to yield:
343--
344-- @
345-- +----------+---------+
346-- | Leacher | Seeder |
347-- |----------+---------+
348-- | Available| |
349-- | Want |Fragment |
350-- | Fragment | |
351-- +----------+---------+
352-- @
353--
354-- Seeder should not yield:
355--
356-- * Available -- seeder could not store anything new.
357--
358-- * Want -- seeder alread have everything, no reason to want.
359--
360-- Hovewer, it's okay to not obey the rules -- if we are yield some
361-- event which doesn't /makes sense/ in the current context then it
362-- most likely will be ignored without any network IO.
363--
364yieldEvent :: Event -> P2P ()
365yieldEvent e = {-# SCC yieldEvent #-} do
366 go e
367 flushPending
368 where
369 go (Available ixs) = do
370 ses <- asks swarmSession
371 liftIO $ atomically $ available ixs ses
372
373 go (Want bix) = do
374 offer <- peerOffer
375 if ixPiece bix `BF.member` offer
376 then yieldMessage (Request bix)
377 else return ()
378
379 go (Fragment blk) = do
380 offer <- clientOffer
381 if blkPiece blk `BF.member` offer
382 then yieldMessage (Piece blk)
383 else return ()
384
385
386handleEvent :: (Event -> P2P Event) -> P2P ()
387handleEvent action = awaitEvent >>= action >>= yieldEvent
388
389-- Event translation table looks like:
390--
391-- Available -> Want
392-- Want -> Fragment
393-- Fragment -> Available
394--
395-- If we join the chain we get the event loop:
396--
397-- Available -> Want -> Fragment --\
398-- /|\ |
399-- \---------------------------/
400--
401
402
403-- | Default P2P action.
404exchange :: Storage -> P2P ()
405exchange storage = {-# SCC exchange #-} awaitEvent >>= handler
406 where
407 handler (Available bf) = do
408 ixs <- selBlk (findMin bf) storage
409 mapM_ (yieldEvent . Want) ixs -- TODO yield vectored
410
411 handler (Want bix) = do
412 liftIO $ print bix
413 blk <- liftIO $ getBlk bix storage
414 yieldEvent (Fragment blk)
415
416 handler (Fragment blk @ Block {..}) = do
417 done <- liftIO $ putBlk blk storage
418 when done $ do
419 yieldEvent $ Available $ singleton blkPiece (succ blkPiece)
420
421 -- WARN this is not reliable: if peer do not return all piece
422 -- block we could slow don't until some other event occured
423 offer <- peerOffer
424 if BF.null offer
425 then return ()
426 else handler (Available offer)
427
428yieldInit :: P2P ()
429yieldInit = yieldMessage . Bitfield =<< getClientBF
430
431p2p :: P2P ()
432p2p = do
433 yieldInit
434 storage <- asks (storage . swarmSession)
435 forever $ do
436 exchange storage \ No newline at end of file
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs
deleted file mode 100644
index d558438f..00000000
--- a/src/Network/BitTorrent/Sessions.hs
+++ /dev/null
@@ -1,446 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8module Network.BitTorrent.Sessions
9 ( -- * Progress
10 Progress(..), startProgress
11 , ClientService(..)
12 , startService
13 , withRunning
14
15 -- * Client
16 , ClientSession ( ClientSession
17 , clientPeerId, allowedExtensions
18 )
19 , withClientSession
20
21 , ThreadCount
22 , defaultThreadCount
23
24 , TorrentLoc(..)
25 , registerTorrent
26 , unregisterTorrent
27 , getRegistered
28
29 , getCurrentProgress
30 , getSwarmCount
31 , getPeerCount
32 , getActiveSwarms
33 , getSwarm
34 , getStorage
35 , getTorrentInfo
36 , openSwarmSession
37
38 -- * Swarm
39 , SwarmSession( SwarmSession, torrentMeta
40 , clientSession, storage
41 )
42
43 , SessionCount
44 , getSessionCount
45 , getClientBitfield
46 , getActivePeers
47
48 , discover
49
50 , PeerSession ( connectedPeerAddr, enabledExtensions )
51 , getSessionState
52
53 , SessionState (..)
54 ) where
55
56import Prelude hiding (mapM_, elem)
57
58import Control.Applicative
59import Control.Concurrent
60import Control.Concurrent.STM
61import Control.Concurrent.BoundedChan as BC
62import Control.Concurrent.MSem as MSem
63import Control.Monad (forever, (>=>))
64import Control.Exception
65import Control.Monad.Trans
66
67import Data.IORef
68import Data.Map as M
69import Data.HashMap.Strict as HM
70import Data.Foldable as F
71import Data.Set as S
72
73import Network hiding (accept)
74import Network.BSD
75import Network.Socket
76
77import Data.Torrent.Bitfield as BF
78import Data.Torrent
79import Network.BitTorrent.Extension
80import Network.BitTorrent.Peer
81import Network.BitTorrent.Sessions.Types
82import Network.BitTorrent.Exchange.Protocol as BT
83import Network.BitTorrent.Tracker.Protocol as BT
84import Network.BitTorrent.Tracker as BT
85import Network.BitTorrent.Exchange as BT
86import System.Torrent.Storage
87
88{-----------------------------------------------------------------------
89 Client Services
90-----------------------------------------------------------------------}
91
92startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
93startService s port m = do
94 stopService s
95 putMVar s =<< spawn
96 where
97 spawn = ClientService port <$> forkIO (m port)
98
99stopService :: MVar ClientService -> IO ()
100stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread)
101
102-- | Service A might depend on service B.
103withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
104withRunning dep failure action = tryTakeMVar dep >>= maybe failure action
105
106{-----------------------------------------------------------------------
107 Torrent presence
108-----------------------------------------------------------------------}
109
110data TorrentPresence = Active SwarmSession
111 | Registered TorrentLoc
112 | Unknown
113
114torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence
115torrentPresence ClientSession {..} ih = do
116 sws <- readTVarIO swarmSessions
117 case M.lookup ih sws of
118 Just ss -> return $ Active ss
119 Nothing -> do
120 tm <- readTVarIO torrentMap
121 return $ maybe Unknown Registered $ HM.lookup ih tm
122
123{-----------------------------------------------------------------------
124 Client sessions
125-----------------------------------------------------------------------}
126
127startListener :: ClientSession -> PortNumber -> IO ()
128startListener cs @ ClientSession {..} port =
129 startService peerListener port $ listener cs $ \conn @ (_, PeerSession{..}) -> do
130 runP2P conn p2p
131
132-- | Create a new client session. The data passed to this function are
133-- usually loaded from configuration file.
134openClientSession :: SessionCount -> [Extension] -> PortNumber -> PortNumber -> IO ClientSession
135openClientSession n exts listenerPort _ = do
136 cs <- ClientSession
137 <$> genPeerId
138 <*> pure exts
139 <*> newEmptyMVar
140 <*> MSem.new n
141 <*> pure n
142 <*> newTVarIO M.empty
143 <*> newTVarIO (startProgress 0)
144 <*> newTVarIO HM.empty
145
146 startListener cs listenerPort
147 return cs
148
149closeClientSession :: ClientSession -> IO ()
150closeClientSession ClientSession {..} = do
151 stopService peerListener
152
153 sws <- readTVarIO swarmSessions
154 forM_ sws closeSwarmSession
155
156withClientSession :: SessionCount -> [Extension]
157 -> PortNumber -> PortNumber
158 -> (ClientSession -> IO ()) -> IO ()
159withClientSession c es l d = bracket (openClientSession c es l d) closeClientSession
160
161-- | Get current global progress of the client. This value is usually
162-- shown to a user.
163getCurrentProgress :: MonadIO m => ClientSession -> m Progress
164getCurrentProgress = liftIO . readTVarIO . currentProgress
165
166-- | Get number of swarms client aware of.
167getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
168getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions
169
170-- | Get number of peers the client currently connected to.
171getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
172getPeerCount ClientSession {..} = liftIO $ do
173 unused <- peekAvail activeThreads
174 return (maxActive - unused)
175
176getActiveSwarms :: ClientSession -> IO [SwarmSession]
177getActiveSwarms ClientSession {..} = M.elems <$> readTVarIO swarmSessions
178
179getListenerPort :: ClientSession -> IO PortNumber
180getListenerPort ClientSession {..} = servPort <$> readMVar peerListener
181
182{-----------------------------------------------------------------------
183 Swarm session
184-----------------------------------------------------------------------}
185
186defSeederConns :: SessionCount
187defSeederConns = defaultUnchokeSlots
188
189defLeecherConns :: SessionCount
190defLeecherConns = defaultNumWant
191
192-- discovery should hide tracker and DHT communication under the hood
193-- thus we can obtain an unified interface
194
195discover :: SwarmSession -> IO ()
196discover swarm @ SwarmSession {..} = {-# SCC discover #-} do
197 port <- getListenerPort clientSession
198
199 let conn = TConnection {
200 tconnAnnounce = tAnnounce torrentMeta
201 , tconnInfoHash = tInfoHash torrentMeta
202 , tconnPeerId = clientPeerId clientSession
203 , tconnPort = port
204 }
205
206 let progress = currentProgress clientSession
207 ch <- newBoundedChan 100 -- TODO
208 tid <- forkIO $ tracker ch progress conn
209 forever $ do
210 addr <- BC.readChan ch
211 forkThrottle swarm $ do
212 initiatePeerSession swarm addr $ \pconn -> do
213 print addr
214 runP2P pconn p2p
215
216registerSwarmSession :: SwarmSession -> STM ()
217registerSwarmSession ss @ SwarmSession {..} =
218 modifyTVar' (swarmSessions clientSession) $
219 M.insert (tInfoHash torrentMeta) ss
220
221unregisterSwarmSession :: SwarmSession -> STM ()
222unregisterSwarmSession SwarmSession {..} =
223 modifyTVar' (swarmSessions clientSession) $
224 M.delete $ tInfoHash torrentMeta
225
226openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession
227openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
228 t <- validateLocation loc
229 let bf = haveNone $ pieceCount $ tInfo t
230
231 ss <- SwarmSession t cs
232 <$> MSem.new defLeecherConns
233 <*> openStorage t dataDirPath bf
234 <*> newTVarIO S.empty
235 <*> newBroadcastTChanIO
236
237 atomically $ do
238 modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t
239 registerSwarmSession ss
240
241 _ <- forkIO $ discover ss
242
243 return ss
244
245closeSwarmSession :: SwarmSession -> IO ()
246closeSwarmSession se @ SwarmSession {..} = do
247 atomically $ unregisterSwarmSession se
248 -- TODO stop discovery
249 -- TODO killall peer sessions
250 -- TODO the order is important!
251 closeStorage storage
252
253getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
254getSwarm cs @ ClientSession {..} ih = do
255 tstatus <- torrentPresence cs ih
256 case tstatus of
257 Unknown -> throw $ UnknownTorrent ih
258 Active sw -> return sw
259 Registered loc -> openSwarmSession cs loc
260
261-- TODO do not spawn session!
262getStorage :: ClientSession -> InfoHash -> IO Storage
263getStorage cs ih = storage <$> getSwarm cs ih
264
265-- TODO keep sorted?
266getActivePeers :: SwarmSession -> IO [PeerSession]
267getActivePeers SwarmSession {..} = S.toList <$> readTVarIO connectedPeers
268
269getTorrentInfo :: ClientSession -> InfoHash -> IO (Maybe Torrent)
270getTorrentInfo cs ih = do
271 tstatus <- torrentPresence cs ih
272 case tstatus of
273 Unknown -> return Nothing
274 Active (SwarmSession {..}) -> return $ Just torrentMeta
275 Registered (TorrentLoc {..}) -> Just <$> fromFile metafilePath
276
277-- | Get the number of connected peers in the given swarm.
278getSessionCount :: SwarmSession -> IO SessionCount
279getSessionCount SwarmSession {..} = do
280 S.size <$> readTVarIO connectedPeers
281
282swarmHandshake :: SwarmSession -> Handshake
283swarmHandshake SwarmSession {..} = Handshake {
284 hsProtocol = defaultBTProtocol
285 , hsReserved = encodeExts $ allowedExtensions $ clientSession
286 , hsInfoHash = tInfoHash torrentMeta
287 , hsPeerId = clientPeerId $ clientSession
288 }
289
290{-----------------------------------------------------------------------
291 Peer sessions throttling
292-----------------------------------------------------------------------}
293
294-- | The number of threads suitable for a typical BT client.
295defaultThreadCount :: ThreadCount
296defaultThreadCount = 1000
297
298enterSwarm :: SwarmSession -> IO ()
299enterSwarm SwarmSession {..} = do
300 MSem.wait (activeThreads clientSession)
301 MSem.wait vacantPeers `onException`
302 MSem.signal (activeThreads clientSession)
303
304leaveSwarm :: SwarmSession -> IO ()
305leaveSwarm SwarmSession {..} = mask_ $ do
306 MSem.signal vacantPeers
307 MSem.signal (activeThreads clientSession)
308
309forkThrottle :: SwarmSession -> IO () -> IO ThreadId
310forkThrottle se action = do
311 enterSwarm se
312 (forkIO $ do
313 action `finally` leaveSwarm se)
314 `onException` leaveSwarm se
315
316-- TODO: check content files location;
317validateLocation :: TorrentLoc -> IO Torrent
318validateLocation = fromFile . metafilePath
319
320registerTorrent :: ClientSession -> TorrentLoc -> IO ()
321registerTorrent ClientSession {..} loc @ TorrentLoc {..} = do
322 torrent <- fromFile metafilePath
323 atomically $ modifyTVar' torrentMap $ HM.insert (tInfoHash torrent) loc
324
325-- TODO kill sessions
326unregisterTorrent :: ClientSession -> InfoHash -> IO ()
327unregisterTorrent ClientSession {..} ih = do
328 atomically $ modifyTVar' torrentMap $ HM.delete ih
329
330getRegistered :: ClientSession -> IO TorrentMap
331getRegistered ClientSession {..} = readTVarIO torrentMap
332
333{-----------------------------------------------------------------------
334 Peer session creation
335------------------------------------------------------------------------
336The peer session cycle looks like:
337
338 * acquire vacant session and vacant thread slot;
339 * (fork could be here, but not necessary)
340 * establish peer connection;
341 * register peer session;
342 * ... exchange process ...
343 * unregister peer session;
344 * close peer connection;
345 * release acquired session and thread slot.
346
347TODO: explain why this order
348TODO: thread throttling
349TODO: check if it connected yet peer
350TODO: utilize peer Id.
351TODO: use STM semaphore
352-----------------------------------------------------------------------}
353
354registerPeerSession :: PeerSession -> IO ()
355registerPeerSession ps @ PeerSession {..} =
356 atomically $ modifyTVar' (connectedPeers swarmSession) (S.insert ps)
357
358unregisterPeerSession :: PeerSession -> IO ()
359unregisterPeerSession ps @ PeerSession {..} =
360 atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)
361
362openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
363openSession ss @ SwarmSession {..} addr Handshake {..} = do
364 let clientCaps = encodeExts $ allowedExtensions $ clientSession
365 let enabled = decodeExts (enabledCaps clientCaps hsReserved)
366
367 bf <- getClientBitfield ss
368 ps <- PeerSession addr ss enabled
369 <$> atomically (dupTChan broadcastMessages)
370 <*> newIORef (initialSessionState (totalCount bf))
371 -- TODO we could implement more interesting throtling scheme
372 -- using connected peer information
373 registerPeerSession ps
374 return ps
375
376-- TODO kill thread
377closeSession :: PeerSession -> IO ()
378closeSession = unregisterPeerSession
379
380type PeerConn = (Socket, PeerSession)
381type Exchange = PeerConn -> IO ()
382
383-- | Exchange action depends on session and socket, whereas session depends
384-- on socket:
385--
386-- socket------>-----exchange
387-- | |
388-- \-->--session-->--/
389--
390-- To handle exceptions properly we double bracket socket and session
391-- then joining the resources and also ignoring session local exceptions.
392--
393runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
394runSession connector opener action =
395 handle isSessionException $
396 bracket connector close $ \sock ->
397 bracket (opener sock) closeSession $ \ses ->
398 action (sock, ses)
399
400-- | Used then the client want to connect to a peer.
401initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
402initiatePeerSession ss @ SwarmSession {..} addr
403 = runSession (connectToPeer addr) initiated
404 where
405 initiated sock = do
406 phs <- handshake sock (swarmHandshake ss)
407 ps <- openSession ss addr phs
408 return ps
409
410-- | Used the a peer want to connect to the client.
411acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
412acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
413 where
414 accepted sock = do
415 phs <- recvHandshake sock
416 swarm <- getSwarm cs $ hsInfoHash phs
417 ps <- openSession swarm addr phs
418 sendHandshake sock $ Handshake {
419 hsProtocol = defaultBTProtocol
420 , hsReserved = encodeExts $ enabledExtensions ps
421 , hsInfoHash = hsInfoHash phs
422 , hsPeerId = clientPeerId
423 }
424 return ps
425
426listener :: ClientSession -> Exchange -> PortNumber -> IO ()
427listener cs action serverPort = bracket openListener close loop
428 where
429 loop sock = forever $ handle isIOError $ do
430 (conn, addr) <- accept sock
431 putStrLn "accepted"
432 case addr of
433 SockAddrInet port host -> do
434 _ <- forkIO $ do
435 acceptPeerSession cs (PeerAddr Nothing host port) conn action
436 return ()
437 _ -> return ()
438
439 isIOError :: IOError -> IO ()
440 isIOError _ = return ()
441
442 openListener = do
443 sock <- socket AF_INET Stream =<< getProtocolNumber "tcp"
444 bindSocket sock (SockAddrInet serverPort iNADDR_ANY)
445 listen sock maxListenQueue
446 return sock
diff --git a/src/Network/BitTorrent/Sessions/Types.lhs b/src/Network/BitTorrent/Sessions/Types.lhs
deleted file mode 100644
index dea47405..00000000
--- a/src/Network/BitTorrent/Sessions/Types.lhs
+++ /dev/null
@@ -1,316 +0,0 @@
1> -- |
2> -- Copyright : (c) Sam Truzjan 2013
3> -- License : BSD3
4> -- Maintainer : pxqr.sta@gmail.com
5> -- Stability : experimental
6> -- Portability : portable
7> --
8>
9> {-# LANGUAGE RecordWildCards #-}
10> {-# LANGUAGE ViewPatterns #-}
11> {-# LANGUAGE TemplateHaskell #-}
12> {-# LANGUAGE DeriveDataTypeable #-}
13>
14> module Network.BitTorrent.Sessions.Types
15> ( ClientService(..)
16> , ThreadCount
17> , TorrentLoc (..)
18> , TorrentMap
19>
20> , ClientSession (..)
21>
22> , SwarmSession (..)
23> , getClientBitfield
24> , getPending, available
25>
26> , PeerSession (..)
27> , SessionCount
28> , findPieceCount
29>
30> , SessionState (..)
31> , status, bitfield
32> , initialSessionState
33> , getSessionState
34>
35> , SessionException (..)
36> , isSessionException, putSessionException
37> ) where
38
39> import Control.Applicative
40> import Control.Concurrent
41> import Control.Concurrent.STM
42> import Control.Concurrent.MSem as MSem
43> import Control.Lens
44> import Control.Exception
45
46> import Data.IORef
47> import Data.Default
48> import Data.Function
49> import Data.Map as M
50> import Data.HashMap.Strict as HM
51> import Data.Ord
52> import Data.Set as S
53> import Data.Typeable
54> import Text.PrettyPrint
55
56> import Network
57
58> import Data.Torrent.Bitfield as BF
59> import Network.BitTorrent.Extension
60> import Network.BitTorrent.Peer
61> import Network.BitTorrent.Exchange.Protocol as BT
62> import System.Torrent.Storage
63
64Thread layout
65------------------------------------------------------------------------
66
67When client session created 2 new threads appear:
68
69 * Peer listener - accept new P2P connection initiated by other
70peers;
71
72 * Tracker announcer - announce that the peer have this torrent.
73
74 * OPTIONAL: DHT listener - replies to DHT requests;
75
76When swarn session created 3 new threads appear:
77
78 * DHT request loop asks for new peers;
79
80 * Tracker request loop asks for new peers;
81
82 * controller which fork new avaand manage running P2P sessions.
83
84Peer session is one always forked thread.
85
86When client\/swarm\/peer session gets closed kill the corresponding
87threads, but flush data to disc. (for e.g. storage block map)
88
89So for e.g., in order to obtain our first block we need to spawn at
90least 7 threads: main thread, 2 client session threads, 3 swarm session
91threads and PeerSession thread.
92
93Thread throttling
94------------------------------------------------------------------------
95
96If we will not restrict number of threads we could end up
97with thousands of connected swarm and make no particular progress.
98
99Note also we do not bound number of swarms! This is not optimal
100strategy because each swarm might have say 1 thread and we could end
101up bounded by the meaningless limit. Bounding global number of p2p
102sessions should work better, and simpler.
103
104**TODO:** priority based throttling: leecher thread have more priority
105than seeder threads.
106
107> -- | Each client might have a limited number of threads.
108> type ThreadCount = Int
109
110Client Services
111------------------------------------------------------------------------
112
113There are two servers started as client start:
114
115 * DHT node listener - needed by other peers to discover
116 * Peer listener - need by other peers to join this client.
117
118Thus any client (assuming DHT is enabled) provides at least 2 services
119so we can abstract out into ClientService:
120
121> data ClientService = ClientService {
122> servPort :: !PortNumber
123> , servThread :: !ThreadId
124> } deriving Show
125
126Torrent Map
127------------------------------------------------------------------------
128
129TODO: keep track global peer have piece set.
130
131Keeping all seeding torrent metafiles in memory is a _bad_ idea: for
1321TB of data we need at least 100MB of metadata. (using 256KB piece
133size). This solution do not scale further.
134
135To avoid this we keep just *metainfo* about *metainfo*:
136
137> -- | Local info about torrent location.
138> data TorrentLoc = TorrentLoc {
139> -- | Full path to .torrent metafile.
140> metafilePath :: FilePath
141> -- | Full path to directory contating content files associated
142> -- with the metafile.
143> , dataDirPath :: FilePath
144> } deriving Show
145
146TorrentMap is used to keep track all known torrents for the
147client. When some peer trying to connect to us it's necessary to
148dispatch appropriate 'SwarmSession' (or start new one if there are
149none) in the listener loop: we only know 'InfoHash' from 'Handshake'
150but nothing more. So to accept new 'PeerSession' we need to lookup
151torrent metainfo and content files (if there are some) by the
152'InfoHash' and only after that enter exchange loop.
153
154Solution with TorrentLoc is much better and takes much more less
155space, moreover it depends on count of torrents but not on count of
156data itself. To scale further, in future we might add something like
157database (for e.g. sqlite) for this kind of things.
158
159> -- | Used to find torrent info and data in order to accept connection.
160> type TorrentMap = HashMap InfoHash TorrentLoc
161
162While *registering* torrent we need to check if torrent metafile is
163correct, all the files are present in the filesystem and so
164forth. However content validation using hashes will take a long time,
165so we need to do this on demand: if a peer asks for a block, we
166validate corresponding piece and only after read and send the block
167back.
168
169Client Sessions
170------------------------------------------------------------------------
171
172Basically, client session should contain options which user
173application store in configuration files and related to the
174protocol. Moreover it should contain the all client identification
175info, for e.g. DHT.
176
177Client session is the basic unit of bittorrent network, it has:
178
179 * The /peer ID/ used as unique identifier of the client in
180network. Obviously, this value is not changed during client session.
181
182 * The number of /protocol extensions/ it might use. This value is
183static as well, but if you want to dynamically reconfigure the client
184you might kill the end the current session and create a new with the
185fresh required extensions.
186
187 * The number of /swarms/ to join, each swarm described by the
188'SwarmSession'.
189
190Normally, you would have one client session, however, if we needed, in
191one application we could have many clients with different peer ID's
192and different enabled extensions at the same time.
193
194> -- |
195> data ClientSession = ClientSession {
196> -- | Used in handshakes and discovery mechanism.
197> clientPeerId :: !PeerId
198
199> -- | Extensions we should try to use. Hovewer some particular peer
200> -- might not support some extension, so we keep enabledExtension in
201> -- 'PeerSession'.
202> , allowedExtensions :: [Extension]
203
204> , peerListener :: !(MVar ClientService)
205
206> -- | Semaphor used to bound number of active P2P sessions.
207> , activeThreads :: !(MSem ThreadCount)
208
209> -- | Max number of active connections.
210> , maxActive :: !ThreadCount
211
212> -- | Used to traverse the swarm session.
213> , swarmSessions :: !(TVar (Map InfoHash SwarmSession))
214
215> -- | Used to keep track global client progress.
216> , currentProgress :: !(TVar Progress)
217
218> -- | Used to keep track available torrents.
219> , torrentMap :: !(TVar TorrentMap)
220> }
221
222NOTE: currentProgress field is reduntant: progress depends on the all swarm
223bitfields maybe we can remove the 'currentProgress' and compute it on
224demand?
225
226> instance Eq ClientSession where
227> (==) = (==) `on` clientPeerId
228
229> instance Ord ClientSession where
230> compare = comparing clientPeerId
231
232Swarm sessions
233------------------------------------------------------------------------
234
235NOTE: If client is a leecher then there is NO particular reason to
236set max sessions count more than the_number_of_unchoke_slots * k:
237
238 * thread slot(activeThread semaphore)
239 * will take but no
240
241So if client is a leecher then max sessions count depends on the
242number of unchoke slots.
243
244> -- | Used to bound the number of simultaneous connections and, which
245> -- is the same, P2P sessions within the swarm session.
246> type SessionCount = Int
247
248However if client is a seeder then the value depends on .
249
250> -- | Swarm session is
251> data SwarmSession = SwarmSession {
252> torrentMeta :: !Torrent
253
254> , clientSession :: !ClientSession
255
256TODO: lower "vacantPeers" when client becomes seeder according to
257throttling policy.
258
259Represent count of peers we _currently_ can connect to in the
260swarm. Used to bound number of concurrent threads. See also *Thread
261Throttling* section.
262
263> , vacantPeers :: !(MSem SessionCount)
264
265Client bitfield is used to keep track "the client have" piece set.
266Modify this carefully always updating global progress.
267
268> , storage :: !Storage
269
270We keep set of the all connected peers for the each particular torrent
271to prevent duplicated and therefore reduntant TCP connections. For
272example consider the following very simle and realistic scenario:
273
274 * Peer A lookup tracker for peers.
275
276 * Peer B lookup tracker for peers.
277
278 * Finally, Peer A connect to B and Peer B connect to peer A
279simultaneously.
280
281There some other situation the problem may occur: duplicates in
282successive tracker responses, tracker and DHT returns. So without any
283protection we end up with two session between the same peers. That's
284bad because this could lead:
285
286 * Reduced throughput - multiple sessions between the same peers will
287mutiply control overhead (control messages, session state).
288
289 * Thread occupation - duplicated sessions will eat thread slots and
290discourage other, possible more useful, peers to establish connection.
291
292To avoid this we could check, into the one transaction, if a peer is
293already connected and add a connection only if it is not.
294
295> , connectedPeers :: !(TVar (Set PeerSession))
296
297TODO: use bounded broadcast chan with priority queue and drop old entries.
298
299Channel used for replicate messages across all peers in swarm. For
300exsample if we get some piece we should sent to all connected (and
301interested in) peers HAVE message.
302
303> , broadcastMessages :: !(TChan Message)
304> }
305
306INVARIANT: max_sessions_count - sizeof connectedPeers = value vacantPeers
307
308> instance Eq SwarmSession where
309> (==) = (==) `on` (tInfoHash . torrentMeta)
310
311> instance Ord SwarmSession where
312> compare = comparing (tInfoHash . torrentMeta)
313
314> getClientBitfield :: SwarmSession -> IO Bitfield
315> getClientBitfield SwarmSession {..} = atomically $ getCompleteBitfield storage
316