summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Sessions.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Sessions.hs')
-rw-r--r--src/Network/BitTorrent/Sessions.hs447
1 files changed, 447 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs
new file mode 100644
index 00000000..31b30e43
--- /dev/null
+++ b/src/Network/BitTorrent/Sessions.hs
@@ -0,0 +1,447 @@
1-- |
2-- Copyright : (c) Sam T. 2013
3-- License : MIT
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8{-# LANGUAGE OverloadedStrings #-}
9{-# LANGUAGE RecordWildCards #-}
10module Network.BitTorrent.Sessions
11 ( -- * Progress
12 Progress(..), startProgress
13 , ClientService(..)
14 , startService
15 , withRunning
16
17 -- * Client
18 , ClientSession ( ClientSession
19 , clientPeerId, allowedExtensions
20 , nodeListener, peerListener
21 )
22 , withClientSession
23 , listenerPort, dhtPort
24
25 , ThreadCount
26 , defaultThreadCount
27
28 , TorrentLoc(..)
29 , registerTorrent
30 , unregisterTorrent
31
32 , getCurrentProgress
33 , getSwarmCount
34 , getPeerCount
35
36 -- * Swarm
37 , SwarmSession( SwarmSession, torrentMeta, clientSession )
38
39 , SessionCount
40 , getSessionCount
41
42 , newLeecher
43 , newSeeder
44 , getClientBitfield
45
46 -- TODO hide this
47 , waitVacancy
48 , forkThrottle
49
50 -- * Peer
51 , PeerSession( PeerSession, connectedPeerAddr
52 , swarmSession, enabledExtensions
53 , sessionState
54 )
55 , SessionState
56 , initiatePeerSession
57 , acceptPeerSession
58 , listener
59
60 -- * Timeouts
61 , updateIncoming, updateOutcoming
62 ) where
63
64import Prelude hiding (mapM_)
65
66import Control.Applicative
67import Control.Concurrent
68import Control.Concurrent.STM
69import Control.Concurrent.MSem as MSem
70import Control.Lens
71import Control.Monad (when, forever, (>=>))
72import Control.Exception
73import Control.Monad.Trans
74
75import Data.IORef
76import Data.Foldable (mapM_)
77import Data.Map as M
78import Data.HashMap.Strict as HM
79import Data.Set as S
80
81import Data.Serialize hiding (get)
82
83import Network hiding (accept)
84import Network.Socket
85import Network.Socket.ByteString
86
87import GHC.Event as Ev
88
89import Data.Bitfield as BF
90import Data.Torrent
91import Network.BitTorrent.Extension
92import Network.BitTorrent.Peer
93import Network.BitTorrent.Exchange.Protocol as BT
94import Network.BitTorrent.Tracker.Protocol as BT
95import System.Torrent.Storage
96import Network.BitTorrent.Sessions.Types
97
98{-----------------------------------------------------------------------
99 Client Services
100-----------------------------------------------------------------------}
101
102startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
103startService s port m = do
104 stopService s
105 putMVar s =<< spawn
106 where
107 spawn = ClientService port <$> forkIO (m port)
108
109stopService :: MVar ClientService -> IO ()
110stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread)
111
112-- | Service A might depend on service B.
113withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
114withRunning dep failure action = tryTakeMVar dep >>= maybe failure action
115
116{-----------------------------------------------------------------------
117 Torrent presence
118-----------------------------------------------------------------------}
119
120data TorrentPresence = Active SwarmSession
121 | Registered TorrentLoc
122 | Unknown
123
124torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence
125torrentPresence ClientSession {..} ih = do
126 sws <- readTVarIO swarmSessions
127 case M.lookup ih sws of
128 Just ss -> return $ Active ss
129 Nothing -> do
130 tm <- readTVarIO torrentMap
131 return $ maybe Unknown Registered $ HM.lookup ih tm
132
133{-----------------------------------------------------------------------
134 Client sessions
135-----------------------------------------------------------------------}
136
137-- | Create a new client session. The data passed to this function are
138-- usually loaded from configuration file.
139openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions.
140 -> [Extension] -- ^ Extensions allowed to use.
141 -> IO ClientSession -- ^ Client with unique peer ID.
142openClientSession n exts = do
143 mgr <- Ev.new
144 -- TODO kill this thread when leave client
145 _ <- forkIO $ loop mgr
146 ClientSession
147 <$> genPeerId
148 <*> pure exts
149 <*> newEmptyMVar
150 <*> newEmptyMVar
151 <*> MSem.new n
152 <*> pure n
153 <*> newTVarIO M.empty
154 <*> pure mgr
155 <*> newTVarIO (startProgress 0)
156 <*> newTVarIO HM.empty
157
158closeClientSession :: ClientSession -> IO ()
159closeClientSession ClientSession {..} =
160 stopService nodeListener `finally` stopService peerListener
161-- TODO stop all swarm sessions
162
163withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO ()
164withClientSession c es = bracket (openClientSession c es) closeClientSession
165
166-- | Get current global progress of the client. This value is usually
167-- shown to a user.
168getCurrentProgress :: MonadIO m => ClientSession -> m Progress
169getCurrentProgress = liftIO . readTVarIO . currentProgress
170
171-- | Get number of swarms client aware of.
172getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
173getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions
174
175-- | Get number of peers the client currently connected to.
176getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
177getPeerCount ClientSession {..} = liftIO $ do
178 unused <- peekAvail activeThreads
179 return (maxActive - unused)
180
181listenerPort :: ClientSession -> IO PortNumber
182listenerPort ClientSession {..} = servPort <$> readMVar peerListener
183
184dhtPort :: ClientSession -> IO PortNumber
185dhtPort ClientSession {..} = servPort <$> readMVar nodeListener
186
187{-----------------------------------------------------------------------
188 Swarm session
189-----------------------------------------------------------------------}
190
191defSeederConns :: SessionCount
192defSeederConns = defaultUnchokeSlots
193
194defLeacherConns :: SessionCount
195defLeacherConns = defaultNumWant
196
197newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent
198 -> IO SwarmSession
199newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..}
200 = SwarmSession t cs
201 <$> MSem.new n
202 <*> newTVarIO bf
203 <*> undefined
204 <*> newTVarIO S.empty
205 <*> newBroadcastTChanIO
206
207-- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession
208-- > openSwarmSession ClientSession {..} ih = do
209-- > loc <- HM.lookup <$> readTVarIO torrentMap
210-- > torrent <- validateLocation loc
211-- > return undefined
212
213closeSwarmSession :: SwarmSession -> IO ()
214closeSwarmSession se @ SwarmSession {..} = do
215 unregisterSwarmSession se
216 -- TODO stop discovery
217 -- TODO killall peer sessions
218 -- TODO the order is important!
219 closeStorage storage
220
221unregisterSwarmSession :: SwarmSession -> IO ()
222unregisterSwarmSession SwarmSession {..} =
223 atomically $ modifyTVar (swarmSessions clientSession) $
224 M.delete $ tInfoHash torrentMeta
225
226getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
227getSwarm cs @ ClientSession {..} ih = do
228 ss <- readTVarIO $ swarmSessions
229 case M.lookup ih ss of
230 Just sw -> return sw
231 Nothing -> undefined -- openSwarm cs
232
233newSeeder :: ClientSession -> Torrent -> IO SwarmSession
234newSeeder cs t @ Torrent {..}
235 = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t
236
237-- | New swarm in which the client allowed both download and upload.
238newLeecher :: ClientSession -> Torrent -> IO SwarmSession
239newLeecher cs t @ Torrent {..} = do
240 se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t
241 atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo))
242 return se
243
244-- | Get the number of connected peers in the given swarm.
245getSessionCount :: SwarmSession -> IO SessionCount
246getSessionCount SwarmSession {..} = do
247 S.size <$> readTVarIO connectedPeers
248
249swarmHandshake :: SwarmSession -> Handshake
250swarmHandshake SwarmSession {..} = Handshake {
251 hsProtocol = defaultBTProtocol
252 , hsReserved = encodeExts $ allowedExtensions $ clientSession
253 , hsInfoHash = tInfoHash torrentMeta
254 , hsPeerId = clientPeerId $ clientSession
255 }
256
257{-----------------------------------------------------------------------
258 Peer sessions throttling
259-----------------------------------------------------------------------}
260
261-- | The number of threads suitable for a typical BT client.
262defaultThreadCount :: ThreadCount
263defaultThreadCount = 1000
264
265enterSwarm :: SwarmSession -> IO ()
266enterSwarm SwarmSession {..} = do
267 MSem.wait (activeThreads clientSession)
268 MSem.wait vacantPeers
269
270leaveSwarm :: SwarmSession -> IO ()
271leaveSwarm SwarmSession {..} = do
272 MSem.signal vacantPeers
273 MSem.signal (activeThreads clientSession)
274
275waitVacancy :: SwarmSession -> IO () -> IO ()
276waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const
277
278forkThrottle :: SwarmSession -> IO () -> IO ThreadId
279forkThrottle se action = do
280 enterSwarm se
281 (forkIO $ do
282 action `finally` leaveSwarm se)
283 `onException` leaveSwarm se
284
285-- TODO: check content files location;
286validateLocation :: TorrentLoc -> IO Torrent
287validateLocation = fromFile . metafilePath
288
289registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO ()
290registerTorrent = error "registerTorrent"
291
292unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO ()
293unregisterTorrent = error "unregisterTorrent"
294
295torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession
296torrentSwarm _ _ (Active sws) = return sws
297torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc
298torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih
299
300lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession
301lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih
302
303{-----------------------------------------------------------------------
304 Peer session creation
305------------------------------------------------------------------------
306The peer session cycle looks like:
307
308 * acquire vacant session and vacant thread slot;
309 * (fork could be here, but not necessary)
310 * establish peer connection;
311 * register peer session;
312 * ... exchange process ...
313 * unregister peer session;
314 * close peer connection;
315 * release acquired session and thread slot.
316
317TODO: explain why this order
318TODO: thread throttling
319TODO: check if it connected yet peer
320TODO: utilize peer Id.
321TODO: use STM semaphore
322-----------------------------------------------------------------------}
323
324openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
325openSession ss @ SwarmSession {..} addr Handshake {..} = do
326 let clientCaps = encodeExts $ allowedExtensions $ clientSession
327 let enabled = decodeExts (enabledCaps clientCaps hsReserved)
328 ps <- PeerSession addr ss enabled
329 <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ())
330 <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ())
331 <*> atomically (dupTChan broadcastMessages)
332 <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield)
333 -- TODO we could implement more interesting throtling scheme
334 -- using connected peer information
335 atomically $ modifyTVar' connectedPeers (S.insert ps)
336 return ps
337
338closeSession :: PeerSession -> IO ()
339closeSession ps @ PeerSession {..} = do
340 atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)
341
342type PeerConn = (Socket, PeerSession)
343type Exchange = PeerConn -> IO ()
344
345sendClientStatus :: PeerConn -> IO ()
346sendClientStatus (sock, PeerSession {..}) = do
347 cbf <- readTVarIO $ clientBitfield $ swarmSession
348 sendAll sock $ encode $ Bitfield cbf
349
350 port <- dhtPort $ clientSession swarmSession
351 when (ExtDHT `elem` enabledExtensions) $ do
352 sendAll sock $ encode $ Port port
353
354-- | Exchange action depends on session and socket, whereas session depends
355-- on socket:
356--
357-- socket------>-----exchange
358-- | |
359-- \-->--session-->--/
360--
361-- To handle exceptions properly we double bracket socket and session
362-- then joining the resources and also ignoring session local exceptions.
363--
364runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
365runSession connector opener action =
366 handle isSessionException $
367 bracket connector close $ \sock ->
368 bracket (opener sock) closeSession $ \ses ->
369 action (sock, ses)
370
371-- | Used then the client want to connect to a peer.
372initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
373initiatePeerSession ss @ SwarmSession {..} addr
374 = runSession (connectToPeer addr) initiated
375 where
376 initiated sock = do
377 phs <- handshake sock (swarmHandshake ss)
378 ps <- openSession ss addr phs
379 sendClientStatus (sock, ps)
380 return ps
381
382-- | Used the a peer want to connect to the client.
383acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
384acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
385 where
386 accepted sock = do
387 phs <- recvHandshake sock
388 swarm <- lookupSwarm cs $ hsInfoHash phs
389 ps <- openSession swarm addr phs
390 sendHandshake sock $ Handshake {
391 hsProtocol = defaultBTProtocol
392 , hsReserved = encodeExts $ enabledExtensions ps
393 , hsInfoHash = hsInfoHash phs
394 , hsPeerId = clientPeerId
395 }
396 sendClientStatus (sock, ps)
397 return ps
398
399listener :: ClientSession -> Exchange -> PortNumber -> IO ()
400listener cs action serverPort = bracket openListener close loop
401 where
402 loop sock = forever $ handle isIOError $ do
403 (conn, addr) <- accept sock
404 case addr of
405 SockAddrInet port host -> do
406 acceptPeerSession cs (PeerAddr Nothing host port) conn action
407 _ -> return ()
408
409 isIOError :: IOError -> IO ()
410 isIOError _ = return ()
411
412 openListener = do
413 sock <- socket AF_INET Stream defaultProtocol
414 bindSocket sock (SockAddrInet serverPort 0)
415 listen sock 1
416 return sock
417
418
419{-----------------------------------------------------------------------
420 Keepalives
421------------------------------------------------------------------------
422TODO move to exchange
423-----------------------------------------------------------------------}
424
425sec :: Int
426sec = 1000 * 1000
427
428maxIncomingTime :: Int
429maxIncomingTime = 120 * sec
430
431maxOutcomingTime :: Int
432maxOutcomingTime = 1 * sec
433
434-- | Should be called after we have received any message from a peer.
435updateIncoming :: PeerSession -> IO ()
436updateIncoming PeerSession {..} = do
437 updateTimeout (eventManager (clientSession swarmSession))
438 incomingTimeout maxIncomingTime
439
440-- | Should be called before we have send any message to a peer.
441updateOutcoming :: PeerSession -> IO ()
442updateOutcoming PeerSession {..} =
443 updateTimeout (eventManager (clientSession swarmSession))
444 outcomingTimeout maxOutcomingTime
445
446sendKA :: Socket -> IO ()
447sendKA sock = return ()