diff options
Diffstat (limited to 'src/Network/BitTorrent/Sessions.hs')
-rw-r--r-- | src/Network/BitTorrent/Sessions.hs | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Sessions.hs b/src/Network/BitTorrent/Sessions.hs new file mode 100644 index 00000000..31b30e43 --- /dev/null +++ b/src/Network/BitTorrent/Sessions.hs | |||
@@ -0,0 +1,447 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam T. 2013 | ||
3 | -- License : MIT | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | {-# LANGUAGE OverloadedStrings #-} | ||
9 | {-# LANGUAGE RecordWildCards #-} | ||
10 | module Network.BitTorrent.Sessions | ||
11 | ( -- * Progress | ||
12 | Progress(..), startProgress | ||
13 | , ClientService(..) | ||
14 | , startService | ||
15 | , withRunning | ||
16 | |||
17 | -- * Client | ||
18 | , ClientSession ( ClientSession | ||
19 | , clientPeerId, allowedExtensions | ||
20 | , nodeListener, peerListener | ||
21 | ) | ||
22 | , withClientSession | ||
23 | , listenerPort, dhtPort | ||
24 | |||
25 | , ThreadCount | ||
26 | , defaultThreadCount | ||
27 | |||
28 | , TorrentLoc(..) | ||
29 | , registerTorrent | ||
30 | , unregisterTorrent | ||
31 | |||
32 | , getCurrentProgress | ||
33 | , getSwarmCount | ||
34 | , getPeerCount | ||
35 | |||
36 | -- * Swarm | ||
37 | , SwarmSession( SwarmSession, torrentMeta, clientSession ) | ||
38 | |||
39 | , SessionCount | ||
40 | , getSessionCount | ||
41 | |||
42 | , newLeecher | ||
43 | , newSeeder | ||
44 | , getClientBitfield | ||
45 | |||
46 | -- TODO hide this | ||
47 | , waitVacancy | ||
48 | , forkThrottle | ||
49 | |||
50 | -- * Peer | ||
51 | , PeerSession( PeerSession, connectedPeerAddr | ||
52 | , swarmSession, enabledExtensions | ||
53 | , sessionState | ||
54 | ) | ||
55 | , SessionState | ||
56 | , initiatePeerSession | ||
57 | , acceptPeerSession | ||
58 | , listener | ||
59 | |||
60 | -- * Timeouts | ||
61 | , updateIncoming, updateOutcoming | ||
62 | ) where | ||
63 | |||
64 | import Prelude hiding (mapM_) | ||
65 | |||
66 | import Control.Applicative | ||
67 | import Control.Concurrent | ||
68 | import Control.Concurrent.STM | ||
69 | import Control.Concurrent.MSem as MSem | ||
70 | import Control.Lens | ||
71 | import Control.Monad (when, forever, (>=>)) | ||
72 | import Control.Exception | ||
73 | import Control.Monad.Trans | ||
74 | |||
75 | import Data.IORef | ||
76 | import Data.Foldable (mapM_) | ||
77 | import Data.Map as M | ||
78 | import Data.HashMap.Strict as HM | ||
79 | import Data.Set as S | ||
80 | |||
81 | import Data.Serialize hiding (get) | ||
82 | |||
83 | import Network hiding (accept) | ||
84 | import Network.Socket | ||
85 | import Network.Socket.ByteString | ||
86 | |||
87 | import GHC.Event as Ev | ||
88 | |||
89 | import Data.Bitfield as BF | ||
90 | import Data.Torrent | ||
91 | import Network.BitTorrent.Extension | ||
92 | import Network.BitTorrent.Peer | ||
93 | import Network.BitTorrent.Exchange.Protocol as BT | ||
94 | import Network.BitTorrent.Tracker.Protocol as BT | ||
95 | import System.Torrent.Storage | ||
96 | import Network.BitTorrent.Sessions.Types | ||
97 | |||
98 | {----------------------------------------------------------------------- | ||
99 | Client Services | ||
100 | -----------------------------------------------------------------------} | ||
101 | |||
102 | startService :: MVar ClientService -> PortNumber -> (PortNumber -> IO ()) -> IO () | ||
103 | startService s port m = do | ||
104 | stopService s | ||
105 | putMVar s =<< spawn | ||
106 | where | ||
107 | spawn = ClientService port <$> forkIO (m port) | ||
108 | |||
109 | stopService :: MVar ClientService -> IO () | ||
110 | stopService = tryTakeMVar >=> maybe (return ()) (killThread . servThread) | ||
111 | |||
112 | -- | Service A might depend on service B. | ||
113 | withRunning :: MVar ClientService -> IO () -> (ClientService -> IO ()) -> IO () | ||
114 | withRunning dep failure action = tryTakeMVar dep >>= maybe failure action | ||
115 | |||
116 | {----------------------------------------------------------------------- | ||
117 | Torrent presence | ||
118 | -----------------------------------------------------------------------} | ||
119 | |||
120 | data TorrentPresence = Active SwarmSession | ||
121 | | Registered TorrentLoc | ||
122 | | Unknown | ||
123 | |||
124 | torrentPresence :: ClientSession -> InfoHash -> IO TorrentPresence | ||
125 | torrentPresence ClientSession {..} ih = do | ||
126 | sws <- readTVarIO swarmSessions | ||
127 | case M.lookup ih sws of | ||
128 | Just ss -> return $ Active ss | ||
129 | Nothing -> do | ||
130 | tm <- readTVarIO torrentMap | ||
131 | return $ maybe Unknown Registered $ HM.lookup ih tm | ||
132 | |||
133 | {----------------------------------------------------------------------- | ||
134 | Client sessions | ||
135 | -----------------------------------------------------------------------} | ||
136 | |||
137 | -- | Create a new client session. The data passed to this function are | ||
138 | -- usually loaded from configuration file. | ||
139 | openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions. | ||
140 | -> [Extension] -- ^ Extensions allowed to use. | ||
141 | -> IO ClientSession -- ^ Client with unique peer ID. | ||
142 | openClientSession n exts = do | ||
143 | mgr <- Ev.new | ||
144 | -- TODO kill this thread when leave client | ||
145 | _ <- forkIO $ loop mgr | ||
146 | ClientSession | ||
147 | <$> genPeerId | ||
148 | <*> pure exts | ||
149 | <*> newEmptyMVar | ||
150 | <*> newEmptyMVar | ||
151 | <*> MSem.new n | ||
152 | <*> pure n | ||
153 | <*> newTVarIO M.empty | ||
154 | <*> pure mgr | ||
155 | <*> newTVarIO (startProgress 0) | ||
156 | <*> newTVarIO HM.empty | ||
157 | |||
158 | closeClientSession :: ClientSession -> IO () | ||
159 | closeClientSession ClientSession {..} = | ||
160 | stopService nodeListener `finally` stopService peerListener | ||
161 | -- TODO stop all swarm sessions | ||
162 | |||
163 | withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO () | ||
164 | withClientSession c es = bracket (openClientSession c es) closeClientSession | ||
165 | |||
166 | -- | Get current global progress of the client. This value is usually | ||
167 | -- shown to a user. | ||
168 | getCurrentProgress :: MonadIO m => ClientSession -> m Progress | ||
169 | getCurrentProgress = liftIO . readTVarIO . currentProgress | ||
170 | |||
171 | -- | Get number of swarms client aware of. | ||
172 | getSwarmCount :: MonadIO m => ClientSession -> m SessionCount | ||
173 | getSwarmCount ClientSession {..} = liftIO $ M.size <$> readTVarIO swarmSessions | ||
174 | |||
175 | -- | Get number of peers the client currently connected to. | ||
176 | getPeerCount :: MonadIO m => ClientSession -> m ThreadCount | ||
177 | getPeerCount ClientSession {..} = liftIO $ do | ||
178 | unused <- peekAvail activeThreads | ||
179 | return (maxActive - unused) | ||
180 | |||
181 | listenerPort :: ClientSession -> IO PortNumber | ||
182 | listenerPort ClientSession {..} = servPort <$> readMVar peerListener | ||
183 | |||
184 | dhtPort :: ClientSession -> IO PortNumber | ||
185 | dhtPort ClientSession {..} = servPort <$> readMVar nodeListener | ||
186 | |||
187 | {----------------------------------------------------------------------- | ||
188 | Swarm session | ||
189 | -----------------------------------------------------------------------} | ||
190 | |||
191 | defSeederConns :: SessionCount | ||
192 | defSeederConns = defaultUnchokeSlots | ||
193 | |||
194 | defLeacherConns :: SessionCount | ||
195 | defLeacherConns = defaultNumWant | ||
196 | |||
197 | newSwarmSession :: Int -> Bitfield -> ClientSession -> Torrent | ||
198 | -> IO SwarmSession | ||
199 | newSwarmSession n bf cs @ ClientSession {..} t @ Torrent {..} | ||
200 | = SwarmSession t cs | ||
201 | <$> MSem.new n | ||
202 | <*> newTVarIO bf | ||
203 | <*> undefined | ||
204 | <*> newTVarIO S.empty | ||
205 | <*> newBroadcastTChanIO | ||
206 | |||
207 | -- > openSwarmSession :: ClientSession -> InfoHash -> IO SwarmSession | ||
208 | -- > openSwarmSession ClientSession {..} ih = do | ||
209 | -- > loc <- HM.lookup <$> readTVarIO torrentMap | ||
210 | -- > torrent <- validateLocation loc | ||
211 | -- > return undefined | ||
212 | |||
213 | closeSwarmSession :: SwarmSession -> IO () | ||
214 | closeSwarmSession se @ SwarmSession {..} = do | ||
215 | unregisterSwarmSession se | ||
216 | -- TODO stop discovery | ||
217 | -- TODO killall peer sessions | ||
218 | -- TODO the order is important! | ||
219 | closeStorage storage | ||
220 | |||
221 | unregisterSwarmSession :: SwarmSession -> IO () | ||
222 | unregisterSwarmSession SwarmSession {..} = | ||
223 | atomically $ modifyTVar (swarmSessions clientSession) $ | ||
224 | M.delete $ tInfoHash torrentMeta | ||
225 | |||
226 | getSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
227 | getSwarm cs @ ClientSession {..} ih = do | ||
228 | ss <- readTVarIO $ swarmSessions | ||
229 | case M.lookup ih ss of | ||
230 | Just sw -> return sw | ||
231 | Nothing -> undefined -- openSwarm cs | ||
232 | |||
233 | newSeeder :: ClientSession -> Torrent -> IO SwarmSession | ||
234 | newSeeder cs t @ Torrent {..} | ||
235 | = newSwarmSession defSeederConns (haveAll (pieceCount tInfo)) cs t | ||
236 | |||
237 | -- | New swarm in which the client allowed both download and upload. | ||
238 | newLeecher :: ClientSession -> Torrent -> IO SwarmSession | ||
239 | newLeecher cs t @ Torrent {..} = do | ||
240 | se <- newSwarmSession defLeacherConns (haveNone (pieceCount tInfo)) cs t | ||
241 | atomically $ modifyTVar' (currentProgress cs) (enqueuedProgress (contentLength tInfo)) | ||
242 | return se | ||
243 | |||
244 | -- | Get the number of connected peers in the given swarm. | ||
245 | getSessionCount :: SwarmSession -> IO SessionCount | ||
246 | getSessionCount SwarmSession {..} = do | ||
247 | S.size <$> readTVarIO connectedPeers | ||
248 | |||
249 | swarmHandshake :: SwarmSession -> Handshake | ||
250 | swarmHandshake SwarmSession {..} = Handshake { | ||
251 | hsProtocol = defaultBTProtocol | ||
252 | , hsReserved = encodeExts $ allowedExtensions $ clientSession | ||
253 | , hsInfoHash = tInfoHash torrentMeta | ||
254 | , hsPeerId = clientPeerId $ clientSession | ||
255 | } | ||
256 | |||
257 | {----------------------------------------------------------------------- | ||
258 | Peer sessions throttling | ||
259 | -----------------------------------------------------------------------} | ||
260 | |||
261 | -- | The number of threads suitable for a typical BT client. | ||
262 | defaultThreadCount :: ThreadCount | ||
263 | defaultThreadCount = 1000 | ||
264 | |||
265 | enterSwarm :: SwarmSession -> IO () | ||
266 | enterSwarm SwarmSession {..} = do | ||
267 | MSem.wait (activeThreads clientSession) | ||
268 | MSem.wait vacantPeers | ||
269 | |||
270 | leaveSwarm :: SwarmSession -> IO () | ||
271 | leaveSwarm SwarmSession {..} = do | ||
272 | MSem.signal vacantPeers | ||
273 | MSem.signal (activeThreads clientSession) | ||
274 | |||
275 | waitVacancy :: SwarmSession -> IO () -> IO () | ||
276 | waitVacancy se = bracket (enterSwarm se) (const (leaveSwarm se)) . const | ||
277 | |||
278 | forkThrottle :: SwarmSession -> IO () -> IO ThreadId | ||
279 | forkThrottle se action = do | ||
280 | enterSwarm se | ||
281 | (forkIO $ do | ||
282 | action `finally` leaveSwarm se) | ||
283 | `onException` leaveSwarm se | ||
284 | |||
285 | -- TODO: check content files location; | ||
286 | validateLocation :: TorrentLoc -> IO Torrent | ||
287 | validateLocation = fromFile . metafilePath | ||
288 | |||
289 | registerTorrent :: TVar TorrentMap -> TorrentLoc -> IO () | ||
290 | registerTorrent = error "registerTorrent" | ||
291 | |||
292 | unregisterTorrent :: TVar TorrentMap -> InfoHash -> IO () | ||
293 | unregisterTorrent = error "unregisterTorrent" | ||
294 | |||
295 | torrentSwarm :: ClientSession -> InfoHash -> TorrentPresence -> IO SwarmSession | ||
296 | torrentSwarm _ _ (Active sws) = return sws | ||
297 | torrentSwarm cs _ (Registered loc) = newSeeder cs =<< validateLocation loc | ||
298 | torrentSwarm _ ih Unknown = throw $ UnknownTorrent ih | ||
299 | |||
300 | lookupSwarm :: ClientSession -> InfoHash -> IO SwarmSession | ||
301 | lookupSwarm cs ih = torrentSwarm cs ih =<< torrentPresence cs ih | ||
302 | |||
303 | {----------------------------------------------------------------------- | ||
304 | Peer session creation | ||
305 | ------------------------------------------------------------------------ | ||
306 | The peer session cycle looks like: | ||
307 | |||
308 | * acquire vacant session and vacant thread slot; | ||
309 | * (fork could be here, but not necessary) | ||
310 | * establish peer connection; | ||
311 | * register peer session; | ||
312 | * ... exchange process ... | ||
313 | * unregister peer session; | ||
314 | * close peer connection; | ||
315 | * release acquired session and thread slot. | ||
316 | |||
317 | TODO: explain why this order | ||
318 | TODO: thread throttling | ||
319 | TODO: check if it connected yet peer | ||
320 | TODO: utilize peer Id. | ||
321 | TODO: use STM semaphore | ||
322 | -----------------------------------------------------------------------} | ||
323 | |||
324 | openSession :: SwarmSession -> PeerAddr -> Handshake -> IO PeerSession | ||
325 | openSession ss @ SwarmSession {..} addr Handshake {..} = do | ||
326 | let clientCaps = encodeExts $ allowedExtensions $ clientSession | ||
327 | let enabled = decodeExts (enabledCaps clientCaps hsReserved) | ||
328 | ps <- PeerSession addr ss enabled | ||
329 | <$> registerTimeout (eventManager clientSession) maxIncomingTime (return ()) | ||
330 | <*> registerTimeout (eventManager clientSession) maxOutcomingTime (return ()) | ||
331 | <*> atomically (dupTChan broadcastMessages) | ||
332 | <*> (newIORef . initialSessionState . totalCount =<< readTVarIO clientBitfield) | ||
333 | -- TODO we could implement more interesting throtling scheme | ||
334 | -- using connected peer information | ||
335 | atomically $ modifyTVar' connectedPeers (S.insert ps) | ||
336 | return ps | ||
337 | |||
338 | closeSession :: PeerSession -> IO () | ||
339 | closeSession ps @ PeerSession {..} = do | ||
340 | atomically $ modifyTVar' (connectedPeers swarmSession) (S.delete ps) | ||
341 | |||
342 | type PeerConn = (Socket, PeerSession) | ||
343 | type Exchange = PeerConn -> IO () | ||
344 | |||
345 | sendClientStatus :: PeerConn -> IO () | ||
346 | sendClientStatus (sock, PeerSession {..}) = do | ||
347 | cbf <- readTVarIO $ clientBitfield $ swarmSession | ||
348 | sendAll sock $ encode $ Bitfield cbf | ||
349 | |||
350 | port <- dhtPort $ clientSession swarmSession | ||
351 | when (ExtDHT `elem` enabledExtensions) $ do | ||
352 | sendAll sock $ encode $ Port port | ||
353 | |||
354 | -- | Exchange action depends on session and socket, whereas session depends | ||
355 | -- on socket: | ||
356 | -- | ||
357 | -- socket------>-----exchange | ||
358 | -- | | | ||
359 | -- \-->--session-->--/ | ||
360 | -- | ||
361 | -- To handle exceptions properly we double bracket socket and session | ||
362 | -- then joining the resources and also ignoring session local exceptions. | ||
363 | -- | ||
364 | runSession :: IO Socket -> (Socket -> IO PeerSession) -> Exchange -> IO () | ||
365 | runSession connector opener action = | ||
366 | handle isSessionException $ | ||
367 | bracket connector close $ \sock -> | ||
368 | bracket (opener sock) closeSession $ \ses -> | ||
369 | action (sock, ses) | ||
370 | |||
371 | -- | Used then the client want to connect to a peer. | ||
372 | initiatePeerSession :: SwarmSession -> PeerAddr -> Exchange -> IO () | ||
373 | initiatePeerSession ss @ SwarmSession {..} addr | ||
374 | = runSession (connectToPeer addr) initiated | ||
375 | where | ||
376 | initiated sock = do | ||
377 | phs <- handshake sock (swarmHandshake ss) | ||
378 | ps <- openSession ss addr phs | ||
379 | sendClientStatus (sock, ps) | ||
380 | return ps | ||
381 | |||
382 | -- | Used the a peer want to connect to the client. | ||
383 | acceptPeerSession :: ClientSession -> PeerAddr -> Socket -> Exchange -> IO () | ||
384 | acceptPeerSession cs@ClientSession {..} addr s = runSession (pure s) accepted | ||
385 | where | ||
386 | accepted sock = do | ||
387 | phs <- recvHandshake sock | ||
388 | swarm <- lookupSwarm cs $ hsInfoHash phs | ||
389 | ps <- openSession swarm addr phs | ||
390 | sendHandshake sock $ Handshake { | ||
391 | hsProtocol = defaultBTProtocol | ||
392 | , hsReserved = encodeExts $ enabledExtensions ps | ||
393 | , hsInfoHash = hsInfoHash phs | ||
394 | , hsPeerId = clientPeerId | ||
395 | } | ||
396 | sendClientStatus (sock, ps) | ||
397 | return ps | ||
398 | |||
399 | listener :: ClientSession -> Exchange -> PortNumber -> IO () | ||
400 | listener cs action serverPort = bracket openListener close loop | ||
401 | where | ||
402 | loop sock = forever $ handle isIOError $ do | ||
403 | (conn, addr) <- accept sock | ||
404 | case addr of | ||
405 | SockAddrInet port host -> do | ||
406 | acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
407 | _ -> return () | ||
408 | |||
409 | isIOError :: IOError -> IO () | ||
410 | isIOError _ = return () | ||
411 | |||
412 | openListener = do | ||
413 | sock <- socket AF_INET Stream defaultProtocol | ||
414 | bindSocket sock (SockAddrInet serverPort 0) | ||
415 | listen sock 1 | ||
416 | return sock | ||
417 | |||
418 | |||
419 | {----------------------------------------------------------------------- | ||
420 | Keepalives | ||
421 | ------------------------------------------------------------------------ | ||
422 | TODO move to exchange | ||
423 | -----------------------------------------------------------------------} | ||
424 | |||
425 | sec :: Int | ||
426 | sec = 1000 * 1000 | ||
427 | |||
428 | maxIncomingTime :: Int | ||
429 | maxIncomingTime = 120 * sec | ||
430 | |||
431 | maxOutcomingTime :: Int | ||
432 | maxOutcomingTime = 1 * sec | ||
433 | |||
434 | -- | Should be called after we have received any message from a peer. | ||
435 | updateIncoming :: PeerSession -> IO () | ||
436 | updateIncoming PeerSession {..} = do | ||
437 | updateTimeout (eventManager (clientSession swarmSession)) | ||
438 | incomingTimeout maxIncomingTime | ||
439 | |||
440 | -- | Should be called before we have send any message to a peer. | ||
441 | updateOutcoming :: PeerSession -> IO () | ||
442 | updateOutcoming PeerSession {..} = | ||
443 | updateTimeout (eventManager (clientSession swarmSession)) | ||
444 | outcomingTimeout maxOutcomingTime | ||
445 | |||
446 | sendKA :: Socket -> IO () | ||
447 | sendKA sock = return () | ||