summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Sessions.hs
blob: d558438f8163a3701d66cdd15fc8309b3d5dfe83 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
-- |
--   Copyright   :  (c) Sam Truzjan 2013
--   License     :  BSD3
--   Maintainer  :  pxqr.sta@gmail.com
--   Stability   :  experimental
--   Portability :  portable
--
module Network.BitTorrent.Sessions
       ( -- * Progress
         Progress(..), startProgress
       , ClientService(..)
       , startService
       , withRunning

         -- * Client
       , ClientSession ( ClientSession
                       , clientPeerId, allowedExtensions
                       )
       , withClientSession

       , ThreadCount
       , defaultThreadCount

       , TorrentLoc(..)
       , registerTorrent
       , unregisterTorrent
       , getRegistered

       , getCurrentProgress
       , getSwarmCount
       , getPeerCount
       , getActiveSwarms
       , getSwarm
       , getStorage
       , getTorrentInfo
       , openSwarmSession

         -- * Swarm
       , SwarmSession( SwarmSession, torrentMeta
                     , clientSession, storage
                     )

       , SessionCount
       , getSessionCount
       , getClientBitfield
       , getActivePeers

       , discover

       , PeerSession ( connectedPeerAddr, enabledExtensions )
       , getSessionState

       , SessionState (..)
       ) where

import Prelude hiding (mapM_, elem)

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.BoundedChan as BC
import Control.Concurrent.MSem as MSem
import Control.Monad (forever, (>=>))
import Control.Exception
import Control.Monad.Trans

import Data.IORef
import Data.Map as M
import Data.HashMap.Strict as HM
import Data.Foldable as F
import Data.Set as S

import Network hiding (accept)
import Network.BSD
import Network.Socket

import Data.Torrent.Bitfield as BF
import Data.Torrent
import Network.BitTorrent.Extension
import Network.BitTorrent.Peer
import Network.BitTorrent.Sessions.Types
import Network.BitTorrent.Exchange.Protocol as BT
import Network.BitTorrent.Tracker.Protocol as BT
import Network.BitTorrent.Tracker as BT
import Network.BitTorrent.Exchange as BT
import System.Torrent.Storage

{-----------------------------------------------------------------------
    Client Services
-----------------------------------------------------------------------}

startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
startService s port m = do
    stopService s
    putMVar s =<< spawn
  where
    spawn = ClientService port <$> forkIO (m port)

stopService :: MVar ClientService -> IO ()
stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread)

-- | Service A might depend on service B.
withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
withRunning dep failure action = tryTakeMVar dep >>= maybe failure action

{-----------------------------------------------------------------------
    Torrent presence
-----------------------------------------------------------------------}

data TorrentPresence = Active     SwarmSession
                     | Registered TorrentLoc
                     | Unknown

torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence
torrentPresence ClientSession {..} ih = do
  sws <- readTVarIO swarmSessions
  case M.lookup ih sws of
    Just ss -> return $ Active ss
    Nothing -> do
      tm <- readTVarIO torrentMap
      return $ maybe Unknown Registered $ HM.lookup ih tm

{-----------------------------------------------------------------------
    Client sessions
-----------------------------------------------------------------------}

startListener :: ClientSession -> PortNumber -> IO ()
startListener cs @ ClientSession {..} port =
  startService peerListener port $ listener cs $ \conn @ (_, PeerSession{..}) -> do
      runP2P conn p2p

-- | Create a new client session. The data passed to this function are
-- usually loaded from configuration file.
openClientSession :: SessionCount -> [Extension] -> PortNumber -> PortNumber -> IO ClientSession
openClientSession n exts listenerPort _ = do
  cs <- ClientSession
    <$> genPeerId
    <*> pure exts
    <*> newEmptyMVar
    <*> MSem.new n
    <*> pure n
    <*> newTVarIO M.empty
    <*> newTVarIO (startProgress 0)
    <*> newTVarIO HM.empty

  startListener cs listenerPort
  return cs

closeClientSession :: ClientSession -> IO ()
closeClientSession ClientSession {..} = do
  stopService peerListener

  sws <- readTVarIO swarmSessions
  forM_ sws closeSwarmSession

withClientSession :: SessionCount -> [Extension]
                  -> PortNumber -> PortNumber
                  -> (ClientSession -> IO ()) -> IO ()
withClientSession c es l d = bracket (openClientSession c es l d) closeClientSession

-- | Get current global progress of the client. This value is usually
-- shown to a user.
getCurrentProgress :: MonadIO m => ClientSession -> m Progress
getCurrentProgress = liftIO . readTVarIO . currentProgress

-- | Get number of swarms client aware of.
getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions

-- | Get number of peers the client currently connected to.
getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
getPeerCount ClientSession {..} = liftIO $ do
  unused  <- peekAvail activeThreads
  return (maxActive - unused)

getActiveSwarms :: ClientSession -> IO [SwarmSession]
getActiveSwarms ClientSession {..} = M.elems <$> readTVarIO swarmSessions

getListenerPort :: ClientSession -> IO PortNumber
getListenerPort ClientSession {..} = servPort <$> readMVar peerListener

{-----------------------------------------------------------------------
    Swarm session
-----------------------------------------------------------------------}

defSeederConns :: SessionCount
defSeederConns = defaultUnchokeSlots

defLeecherConns :: SessionCount
defLeecherConns = defaultNumWant

-- discovery should hide tracker and DHT communication under the hood
-- thus we can obtain an unified interface

discover :: SwarmSession -> IO ()
discover swarm @ SwarmSession {..} = {-# SCC discover #-} do
  port <- getListenerPort clientSession

  let conn = TConnection {
        tconnAnnounce = tAnnounce torrentMeta
      , tconnInfoHash = tInfoHash torrentMeta
      , tconnPeerId   = clientPeerId clientSession
      , tconnPort     = port
      }

  let progress = currentProgress clientSession
  ch  <- newBoundedChan 100 -- TODO
  tid <- forkIO $ tracker ch progress conn
  forever $ do
    addr <- BC.readChan ch
    forkThrottle swarm $ do
        initiatePeerSession swarm addr $ \pconn -> do
          print addr
          runP2P pconn p2p

registerSwarmSession :: SwarmSession -> STM ()
registerSwarmSession ss @ SwarmSession {..} =
  modifyTVar' (swarmSessions clientSession) $
    M.insert (tInfoHash torrentMeta) ss

unregisterSwarmSession :: SwarmSession -> STM ()
unregisterSwarmSession SwarmSession {..} =
  modifyTVar' (swarmSessions clientSession) $
    M.delete $ tInfoHash torrentMeta

openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession
openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
  t <- validateLocation loc
  let bf = haveNone $ pieceCount $ tInfo t

  ss <- SwarmSession t cs
    <$> MSem.new defLeecherConns
    <*> openStorage t dataDirPath bf
    <*> newTVarIO S.empty
    <*> newBroadcastTChanIO

  atomically $ do
    modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t
    registerSwarmSession ss

  _ <- forkIO $ discover ss

  return ss

closeSwarmSession :: SwarmSession -> IO ()
closeSwarmSession se @ SwarmSession {..} = do
  atomically $ unregisterSwarmSession se
  -- TODO stop discovery
  -- TODO killall peer sessions
  -- TODO the order is important!
  closeStorage storage

getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
getSwarm cs @ ClientSession {..} ih = do
  tstatus <- torrentPresence cs ih
  case tstatus of
    Unknown        -> throw $ UnknownTorrent ih
    Active sw      -> return sw
    Registered loc -> openSwarmSession cs loc

-- TODO do not spawn session!
getStorage :: ClientSession -> InfoHash -> IO Storage
getStorage cs ih = storage <$> getSwarm cs ih

-- TODO keep sorted?
getActivePeers :: SwarmSession -> IO [PeerSession]
getActivePeers SwarmSession {..} = S.toList <$> readTVarIO connectedPeers

getTorrentInfo :: ClientSession -> InfoHash -> IO (Maybe Torrent)
getTorrentInfo cs ih = do
  tstatus <- torrentPresence cs ih
  case tstatus of
    Unknown                        -> return   Nothing
    Active     (SwarmSession {..}) -> return $ Just torrentMeta
    Registered (TorrentLoc   {..}) -> Just <$> fromFile metafilePath

-- | Get the number of connected peers in the given swarm.
getSessionCount :: SwarmSession -> IO SessionCount
getSessionCount SwarmSession {..} = do
  S.size <$> readTVarIO connectedPeers

swarmHandshake :: SwarmSession   ->   Handshake
swarmHandshake    SwarmSession {..} = Handshake {
    hsProtocol = defaultBTProtocol
  , hsReserved = encodeExts $ allowedExtensions $ clientSession
  , hsInfoHash = tInfoHash torrentMeta
  , hsPeerId   = clientPeerId $ clientSession
  }

{-----------------------------------------------------------------------
    Peer sessions throttling
-----------------------------------------------------------------------}

-- | The number of threads suitable for a typical BT client.
defaultThreadCount :: ThreadCount
defaultThreadCount = 1000

enterSwarm :: SwarmSession -> IO ()
enterSwarm SwarmSession {..} = do
  MSem.wait (activeThreads clientSession)
  MSem.wait vacantPeers `onException`
    MSem.signal  (activeThreads clientSession)

leaveSwarm :: SwarmSession -> IO ()
leaveSwarm SwarmSession {..} = mask_ $ do
  MSem.signal vacantPeers
  MSem.signal (activeThreads clientSession)

forkThrottle :: SwarmSession -> IO () -> IO ThreadId
forkThrottle se action = do
   enterSwarm se
   (forkIO $ do
     action `finally` leaveSwarm se)
        `onException` leaveSwarm se

-- TODO: check content files location;
validateLocation :: TorrentLoc -> IO Torrent
validateLocation = fromFile . metafilePath

registerTorrent :: ClientSession -> TorrentLoc -> IO ()
registerTorrent ClientSession {..} loc @ TorrentLoc {..} = do
  torrent <- fromFile metafilePath
  atomically $ modifyTVar' torrentMap $ HM.insert (tInfoHash torrent) loc

-- TODO kill sessions
unregisterTorrent :: ClientSession -> InfoHash -> IO ()
unregisterTorrent ClientSession {..} ih = do
  atomically $ modifyTVar' torrentMap $ HM.delete ih

getRegistered :: ClientSession -> IO TorrentMap
getRegistered ClientSession {..} = readTVarIO torrentMap

{-----------------------------------------------------------------------
  Peer session creation
------------------------------------------------------------------------
The peer session cycle looks like:

  * acquire vacant session and vacant thread slot;
  * (fork could be here, but not necessary)
  *   establish peer connection;
  *     register peer session;
  *       ... exchange process ...
  *     unregister peer session;
  *   close peer connection;
  * release acquired session and thread slot.

TODO: explain why this order
TODO: thread throttling
TODO: check if it connected yet peer
TODO: utilize peer Id.
TODO: use STM semaphore
-----------------------------------------------------------------------}

registerPeerSession :: PeerSession -> IO ()
registerPeerSession ps @ PeerSession {..}  =
  atomically $ modifyTVar' (connectedPeers swarmSession) (S.insert ps)

unregisterPeerSession :: PeerSession -> IO ()
unregisterPeerSession ps @ PeerSession {..} =
  atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)

openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
openSession ss @ SwarmSession {..} addr Handshake {..} = do
  let clientCaps = encodeExts $ allowedExtensions $ clientSession
  let enabled    = decodeExts (enabledCaps clientCaps hsReserved)

  bf <- getClientBitfield ss
  ps <- PeerSession addr ss enabled
    <$> atomically (dupTChan broadcastMessages)
    <*> newIORef (initialSessionState (totalCount bf))
    -- TODO we could implement more interesting throtling scheme
    -- using connected peer information
  registerPeerSession ps
  return ps

-- TODO kill thread
closeSession :: PeerSession -> IO ()
closeSession = unregisterPeerSession

type PeerConn = (Socket, PeerSession)
type Exchange = PeerConn -> IO ()

-- | Exchange action depends on session and socket, whereas session depends
--   on socket:
--
--   socket------>-----exchange
--     |                 |
--      \-->--session-->--/
--
--   To handle exceptions properly we double bracket socket and session
--   then joining the resources and also ignoring session local exceptions.
--
runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
runSession connector opener action =
  handle isSessionException $
    bracket connector close $ \sock ->
      bracket (opener sock) closeSession $ \ses ->
        action (sock, ses)

-- | Used then the client want to connect to a peer.
initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
initiatePeerSession ss @ SwarmSession {..} addr
    = runSession (connectToPeer addr) initiated
  where
    initiated sock = do
      phs  <- handshake sock (swarmHandshake ss)
      ps   <- openSession ss addr phs
      return ps

-- | Used the a peer want to connect to the client.
acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
  where
    accepted sock = do
      phs   <- recvHandshake sock
      swarm <- getSwarm cs $ hsInfoHash phs
      ps    <- openSession swarm addr phs
      sendHandshake sock $ Handshake {
          hsProtocol = defaultBTProtocol
        , hsReserved = encodeExts $ enabledExtensions ps
        , hsInfoHash = hsInfoHash phs
        , hsPeerId   = clientPeerId
        }
      return ps

listener :: ClientSession -> Exchange -> PortNumber -> IO ()
listener cs action serverPort = bracket openListener close loop
  where
    loop sock = forever $ handle isIOError $ do
      (conn, addr) <- accept sock
      putStrLn "accepted"
      case addr of
        SockAddrInet port host -> do
          _ <- forkIO $ do
            acceptPeerSession cs (PeerAddr Nothing host port) conn action
          return ()
        _                      -> return ()

    isIOError :: IOError -> IO ()
    isIOError _ = return ()

    openListener  = do
      sock <- socket AF_INET Stream =<< getProtocolNumber "tcp"
      bindSocket sock (SockAddrInet serverPort iNADDR_ANY)
      listen sock maxListenQueue
      return sock