1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
|
-- |
-- Copyright : (c) Sam Truzjan 2013
-- License : BSD3
-- Maintainer : pxqr.sta@gmail.com
-- Stability : experimental
-- Portability : portable
--
module Network.BitTorrent.Sessions
( -- * Progress
Progress(..), startProgress
, ClientService(..)
, startService
, withRunning
-- * Client
, ClientSession ( ClientSession
, clientPeerId, allowedExtensions
)
, withClientSession
, ThreadCount
, defaultThreadCount
, TorrentLoc(..)
, registerTorrent
, unregisterTorrent
, getRegistered
, getCurrentProgress
, getSwarmCount
, getPeerCount
, getActiveSwarms
, getSwarm
, getStorage
, getTorrentInfo
, openSwarmSession
-- * Swarm
, SwarmSession( SwarmSession, torrentMeta
, clientSession, storage
)
, SessionCount
, getSessionCount
, getClientBitfield
, getActivePeers
, discover
, PeerSession ( connectedPeerAddr, enabledExtensions )
, getSessionState
, SessionState (..)
) where
import Prelude hiding (mapM_, elem)
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.BoundedChan as BC
import Control.Concurrent.MSem as MSem
import Control.Monad (forever, (>=>))
import Control.Exception
import Control.Monad.Trans
import Data.IORef
import Data.Map as M
import Data.HashMap.Strict as HM
import Data.Foldable as F
import Data.Set as S
import Network hiding (accept)
import Network.BSD
import Network.Socket
import Data.Torrent.Bitfield as BF
import Data.Torrent
import Network.BitTorrent.Extension
import Network.BitTorrent.Peer
import Network.BitTorrent.Sessions.Types
import Network.BitTorrent.Exchange.Protocol as BT
import Network.BitTorrent.Tracker.Protocol as BT
import Network.BitTorrent.Tracker as BT
import Network.BitTorrent.Exchange as BT
import System.Torrent.Storage
{-----------------------------------------------------------------------
Client Services
-----------------------------------------------------------------------}
startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO ()
startService s port m = do
stopService s
putMVar s =<< spawn
where
spawn = ClientService port <$> forkIO (m port)
stopService :: MVar ClientService -> IO ()
stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread)
-- | Service A might depend on service B.
withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO ()
withRunning dep failure action = tryTakeMVar dep >>= maybe failure action
{-----------------------------------------------------------------------
Torrent presence
-----------------------------------------------------------------------}
data TorrentPresence = Active SwarmSession
| Registered TorrentLoc
| Unknown
torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence
torrentPresence ClientSession {..} ih = do
sws <- readTVarIO swarmSessions
case M.lookup ih sws of
Just ss -> return $ Active ss
Nothing -> do
tm <- readTVarIO torrentMap
return $ maybe Unknown Registered $ HM.lookup ih tm
{-----------------------------------------------------------------------
Client sessions
-----------------------------------------------------------------------}
startListener :: ClientSession -> PortNumber -> IO ()
startListener cs @ ClientSession {..} port =
startService peerListener port $ listener cs $ \conn @ (_, PeerSession{..}) -> do
runP2P conn p2p
-- | Create a new client session. The data passed to this function are
-- usually loaded from configuration file.
openClientSession :: SessionCount -> [Extension] -> PortNumber -> PortNumber -> IO ClientSession
openClientSession n exts listenerPort _ = do
cs <- ClientSession
<$> genPeerId
<*> pure exts
<*> newEmptyMVar
<*> MSem.new n
<*> pure n
<*> newTVarIO M.empty
<*> newTVarIO (startProgress 0)
<*> newTVarIO HM.empty
startListener cs listenerPort
return cs
closeClientSession :: ClientSession -> IO ()
closeClientSession ClientSession {..} = do
stopService peerListener
sws <- readTVarIO swarmSessions
forM_ sws closeSwarmSession
withClientSession :: SessionCount -> [Extension]
-> PortNumber -> PortNumber
-> (ClientSession -> IO ()) -> IO ()
withClientSession c es l d = bracket (openClientSession c es l d) closeClientSession
-- | Get current global progress of the client. This value is usually
-- shown to a user.
getCurrentProgress :: MonadIO m => ClientSession -> m Progress
getCurrentProgress = liftIO . readTVarIO . currentProgress
-- | Get number of swarms client aware of.
getSwarmCount :: MonadIO m => ClientSession -> m SessionCount
getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions
-- | Get number of peers the client currently connected to.
getPeerCount :: MonadIO m => ClientSession -> m ThreadCount
getPeerCount ClientSession {..} = liftIO $ do
unused <- peekAvail activeThreads
return (maxActive - unused)
getActiveSwarms :: ClientSession -> IO [SwarmSession]
getActiveSwarms ClientSession {..} = M.elems <$> readTVarIO swarmSessions
getListenerPort :: ClientSession -> IO PortNumber
getListenerPort ClientSession {..} = servPort <$> readMVar peerListener
{-----------------------------------------------------------------------
Swarm session
-----------------------------------------------------------------------}
defSeederConns :: SessionCount
defSeederConns = defaultUnchokeSlots
defLeecherConns :: SessionCount
defLeecherConns = defaultNumWant
-- discovery should hide tracker and DHT communication under the hood
-- thus we can obtain an unified interface
discover :: SwarmSession -> IO ()
discover swarm @ SwarmSession {..} = {-# SCC discover #-} do
port <- getListenerPort clientSession
let conn = TConnection {
tconnAnnounce = tAnnounce torrentMeta
, tconnInfoHash = tInfoHash torrentMeta
, tconnPeerId = clientPeerId clientSession
, tconnPort = port
}
let progress = currentProgress clientSession
ch <- newBoundedChan 100 -- TODO
tid <- forkIO $ tracker ch progress conn
forever $ do
addr <- BC.readChan ch
forkThrottle swarm $ do
initiatePeerSession swarm addr $ \pconn -> do
print addr
runP2P pconn p2p
registerSwarmSession :: SwarmSession -> STM ()
registerSwarmSession ss @ SwarmSession {..} =
modifyTVar' (swarmSessions clientSession) $
M.insert (tInfoHash torrentMeta) ss
unregisterSwarmSession :: SwarmSession -> STM ()
unregisterSwarmSession SwarmSession {..} =
modifyTVar' (swarmSessions clientSession) $
M.delete $ tInfoHash torrentMeta
openSwarmSession :: ClientSession -> TorrentLoc -> IO SwarmSession
openSwarmSession cs @ ClientSession {..} loc @ TorrentLoc {..} = do
t <- validateLocation loc
let bf = haveNone $ pieceCount $ tInfo t
ss <- SwarmSession t cs
<$> MSem.new defLeecherConns
<*> openStorage t dataDirPath bf
<*> newTVarIO S.empty
<*> newBroadcastTChanIO
atomically $ do
modifyTVar' currentProgress $ enqueuedProgress $ contentLength $ tInfo t
registerSwarmSession ss
_ <- forkIO $ discover ss
return ss
closeSwarmSession :: SwarmSession -> IO ()
closeSwarmSession se @ SwarmSession {..} = do
atomically $ unregisterSwarmSession se
-- TODO stop discovery
-- TODO killall peer sessions
-- TODO the order is important!
closeStorage storage
getSwarm :: ClientSession -> InfoHash -> IO SwarmSession
getSwarm cs @ ClientSession {..} ih = do
tstatus <- torrentPresence cs ih
case tstatus of
Unknown -> throw $ UnknownTorrent ih
Active sw -> return sw
Registered loc -> openSwarmSession cs loc
-- TODO do not spawn session!
getStorage :: ClientSession -> InfoHash -> IO Storage
getStorage cs ih = storage <$> getSwarm cs ih
-- TODO keep sorted?
getActivePeers :: SwarmSession -> IO [PeerSession]
getActivePeers SwarmSession {..} = S.toList <$> readTVarIO connectedPeers
getTorrentInfo :: ClientSession -> InfoHash -> IO (Maybe Torrent)
getTorrentInfo cs ih = do
tstatus <- torrentPresence cs ih
case tstatus of
Unknown -> return Nothing
Active (SwarmSession {..}) -> return $ Just torrentMeta
Registered (TorrentLoc {..}) -> Just <$> fromFile metafilePath
-- | Get the number of connected peers in the given swarm.
getSessionCount :: SwarmSession -> IO SessionCount
getSessionCount SwarmSession {..} = do
S.size <$> readTVarIO connectedPeers
swarmHandshake :: SwarmSession -> Handshake
swarmHandshake SwarmSession {..} = Handshake {
hsProtocol = defaultBTProtocol
, hsReserved = encodeExts $ allowedExtensions $ clientSession
, hsInfoHash = tInfoHash torrentMeta
, hsPeerId = clientPeerId $ clientSession
}
{-----------------------------------------------------------------------
Peer sessions throttling
-----------------------------------------------------------------------}
-- | The number of threads suitable for a typical BT client.
defaultThreadCount :: ThreadCount
defaultThreadCount = 1000
enterSwarm :: SwarmSession -> IO ()
enterSwarm SwarmSession {..} = do
MSem.wait (activeThreads clientSession)
MSem.wait vacantPeers `onException`
MSem.signal (activeThreads clientSession)
leaveSwarm :: SwarmSession -> IO ()
leaveSwarm SwarmSession {..} = mask_ $ do
MSem.signal vacantPeers
MSem.signal (activeThreads clientSession)
forkThrottle :: SwarmSession -> IO () -> IO ThreadId
forkThrottle se action = do
enterSwarm se
(forkIO $ do
action `finally` leaveSwarm se)
`onException` leaveSwarm se
-- TODO: check content files location;
validateLocation :: TorrentLoc -> IO Torrent
validateLocation = fromFile . metafilePath
registerTorrent :: ClientSession -> TorrentLoc -> IO ()
registerTorrent ClientSession {..} loc @ TorrentLoc {..} = do
torrent <- fromFile metafilePath
atomically $ modifyTVar' torrentMap $ HM.insert (tInfoHash torrent) loc
-- TODO kill sessions
unregisterTorrent :: ClientSession -> InfoHash -> IO ()
unregisterTorrent ClientSession {..} ih = do
atomically $ modifyTVar' torrentMap $ HM.delete ih
getRegistered :: ClientSession -> IO TorrentMap
getRegistered ClientSession {..} = readTVarIO torrentMap
{-----------------------------------------------------------------------
Peer session creation
------------------------------------------------------------------------
The peer session cycle looks like:
* acquire vacant session and vacant thread slot;
* (fork could be here, but not necessary)
* establish peer connection;
* register peer session;
* ... exchange process ...
* unregister peer session;
* close peer connection;
* release acquired session and thread slot.
TODO: explain why this order
TODO: thread throttling
TODO: check if it connected yet peer
TODO: utilize peer Id.
TODO: use STM semaphore
-----------------------------------------------------------------------}
registerPeerSession :: PeerSession -> IO ()
registerPeerSession ps @ PeerSession {..} =
atomically $ modifyTVar' (connectedPeers swarmSession) (S.insert ps)
unregisterPeerSession :: PeerSession -> IO ()
unregisterPeerSession ps @ PeerSession {..} =
atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps)
openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession
openSession ss @ SwarmSession {..} addr Handshake {..} = do
let clientCaps = encodeExts $ allowedExtensions $ clientSession
let enabled = decodeExts (enabledCaps clientCaps hsReserved)
bf <- getClientBitfield ss
ps <- PeerSession addr ss enabled
<$> atomically (dupTChan broadcastMessages)
<*> newIORef (initialSessionState (totalCount bf))
-- TODO we could implement more interesting throtling scheme
-- using connected peer information
registerPeerSession ps
return ps
-- TODO kill thread
closeSession :: PeerSession -> IO ()
closeSession = unregisterPeerSession
type PeerConn = (Socket, PeerSession)
type Exchange = PeerConn -> IO ()
-- | Exchange action depends on session and socket, whereas session depends
-- on socket:
--
-- socket------>-----exchange
-- | |
-- \-->--session-->--/
--
-- To handle exceptions properly we double bracket socket and session
-- then joining the resources and also ignoring session local exceptions.
--
runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO ()
runSession connector opener action =
handle isSessionException $
bracket connector close $ \sock ->
bracket (opener sock) closeSession $ \ses ->
action (sock, ses)
-- | Used then the client want to connect to a peer.
initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO ()
initiatePeerSession ss @ SwarmSession {..} addr
= runSession (connectToPeer addr) initiated
where
initiated sock = do
phs <- handshake sock (swarmHandshake ss)
ps <- openSession ss addr phs
return ps
-- | Used the a peer want to connect to the client.
acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO ()
acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted
where
accepted sock = do
phs <- recvHandshake sock
swarm <- getSwarm cs $ hsInfoHash phs
ps <- openSession swarm addr phs
sendHandshake sock $ Handshake {
hsProtocol = defaultBTProtocol
, hsReserved = encodeExts $ enabledExtensions ps
, hsInfoHash = hsInfoHash phs
, hsPeerId = clientPeerId
}
return ps
listener :: ClientSession -> Exchange -> PortNumber -> IO ()
listener cs action serverPort = bracket openListener close loop
where
loop sock = forever $ handle isIOError $ do
(conn, addr) <- accept sock
putStrLn "accepted"
case addr of
SockAddrInet port host -> do
_ <- forkIO $ do
acceptPeerSession cs (PeerAddr Nothing host port) conn action
return ()
_ -> return ()
isIOError :: IOError -> IO ()
isIOError _ = return ()
openListener = do
sock <- socket AF_INET Stream =<< getProtocolNumber "tcp"
bindSocket sock (SockAddrInet serverPort iNADDR_ANY)
listen sock maxListenQueue
return sock
|