summaryrefslogtreecommitdiff
path: root/bittorrent/src/Network/BitTorrent/Tracker/RPC
diff options
context:
space:
mode:
Diffstat (limited to 'bittorrent/src/Network/BitTorrent/Tracker/RPC')
-rw-r--r--bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs191
-rw-r--r--bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs454
2 files changed, 645 insertions, 0 deletions
diff --git a/bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs b/bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs
new file mode 100644
index 00000000..9b6e056a
--- /dev/null
+++ b/bittorrent/src/Network/BitTorrent/Tracker/RPC/HTTP.hs
@@ -0,0 +1,191 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : provisional
6-- Portability : portable
7--
8-- This module implement HTTP tracker protocol.
9--
10-- For more information see:
11-- <https://wiki.theory.org/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol>
12--
13{-# LANGUAGE DeriveDataTypeable #-}
14module Network.BitTorrent.Tracker.RPC.HTTP
15 ( -- * Manager
16 Options (..)
17 , Manager
18 , newManager
19 , closeManager
20 , withManager
21
22 -- * RPC
23 , RpcException (..)
24 , announce
25 , scrape
26 , scrapeOne
27 ) where
28
29import Control.Applicative
30import Control.Exception
31import Control.Monad
32import Control.Monad.Trans.Resource
33import Data.BEncode as BE
34import Data.ByteString as BS
35import Data.ByteString.Char8 as BC
36import Data.ByteString.Lazy as BL
37import Data.Default
38import Data.List as L
39import Data.Monoid
40import Data.Typeable hiding (Proxy)
41import Network.URI
42import Network.HTTP.Conduit hiding
43 (Manager, newManager, closeManager, withManager)
44import Network.HTTP.Client (defaultManagerSettings)
45import Network.HTTP.Client.Internal (setUri)
46import qualified Network.HTTP.Conduit as HTTP
47import Network.HTTP.Types.Header (hUserAgent)
48import Network.HTTP.Types.URI (SimpleQuery, renderSimpleQuery)
49
50import Data.Torrent (InfoHash)
51import Network.Address (libUserAgent)
52import Network.BitTorrent.Tracker.Message hiding (Request, Response)
53
54{-----------------------------------------------------------------------
55-- Exceptions
56-----------------------------------------------------------------------}
57
58data RpcException
59 = RequestFailed HttpException -- ^ failed HTTP request.
60 | ParserFailure String -- ^ unable to decode tracker response;
61 | ScrapelessTracker -- ^ tracker do not support scraping;
62 | BadScrape -- ^ unable to find info hash in response dict;
63 deriving (Show, Typeable)
64
65instance Exception RpcException
66
67packHttpException :: IO a -> IO a
68packHttpException m = try m >>= either (throwIO . RequestFailed) return
69
70{-----------------------------------------------------------------------
71-- Manager
72-----------------------------------------------------------------------}
73
74-- | HTTP tracker specific RPC options.
75data Options = Options
76 { -- | Global HTTP announce query preferences.
77 optAnnouncePrefs :: !AnnouncePrefs
78
79 -- | Whether to use HTTP proxy for HTTP tracker requests.
80 , optHttpProxy :: !(Maybe Proxy)
81
82 -- | Value to put in HTTP user agent header.
83 , optUserAgent :: !BS.ByteString
84
85 -- | HTTP manager options.
86 , optHttpOptions :: !ManagerSettings
87 }
88
89instance Default Options where
90 def = Options
91 { optAnnouncePrefs = def
92 , optHttpProxy = Nothing
93 , optUserAgent = BC.pack libUserAgent
94 , optHttpOptions = defaultManagerSettings
95 }
96
97-- | HTTP tracker manager.
98data Manager = Manager
99 { options :: !Options
100 , httpMgr :: !HTTP.Manager
101 }
102
103-- |
104newManager :: Options -> IO Manager
105newManager opts = Manager opts <$> HTTP.newManager (optHttpOptions opts)
106
107-- |
108closeManager :: Manager -> IO ()
109closeManager Manager {..} = HTTP.closeManager httpMgr
110
111-- | Normally you need to use 'Control.Monad.Trans.Resource.allocate'.
112withManager :: Options -> (Manager -> IO a) -> IO a
113withManager opts = bracket (newManager opts) closeManager
114
115{-----------------------------------------------------------------------
116-- Queries
117-----------------------------------------------------------------------}
118
119fillRequest :: Options -> SimpleQuery -> Request -> Request
120fillRequest Options {..} q r = r
121 { queryString = joinQuery (queryString r) (renderSimpleQuery False q)
122 , requestHeaders = (hUserAgent, optUserAgent) : requestHeaders r
123 , proxy = optHttpProxy
124 }
125 where
126 joinQuery a b
127 | BS.null a = b
128 | otherwise = a <> "&" <> b
129
130httpTracker :: BEncode a => Manager -> URI -> SimpleQuery -> IO a
131httpTracker Manager {..} uri q = packHttpException $ do
132 request <- fillRequest options q <$> setUri def {- http-client instance for Request -} uri
133 response <- runResourceT $ httpLbs request httpMgr
134 case BE.decode $ BL.toStrict $ responseBody response of
135 Left msg -> throwIO (ParserFailure msg)
136 Right info -> return info
137
138{-----------------------------------------------------------------------
139-- RPC
140-----------------------------------------------------------------------}
141
142-- | Send request and receive response from the tracker specified in
143-- announce list.
144--
145-- This function can throw 'RpcException'.
146--
147announce :: Manager -> URI -> AnnounceQuery -> IO AnnounceInfo
148announce mgr uri q = httpTracker mgr uri (renderAnnounceRequest uriQ)
149 where
150 uriQ = AnnounceRequest
151 { announceQuery = q
152 , announcePrefs = optAnnouncePrefs (options mgr)
153 }
154
155-- | Trying to convert /announce/ URL to /scrape/ URL. If 'scrapeURL'
156-- gives 'Nothing' then tracker do not support scraping.
157--
158scrapeURL :: URI -> Maybe URI
159scrapeURL uri = do
160 newPath <- replace (BC.pack (uriPath uri))
161 return uri { uriPath = BC.unpack newPath }
162 where
163 replace p = do
164 let ps = BC.splitWith (== '/') p
165 guard (not (L.null ps))
166 guard ("announce" `BS.isPrefixOf` L.last ps)
167 let newSuff = "scrape" <> BS.drop (BS.length "announce") (L.last ps)
168 return (BS.intercalate "/" (L.init ps ++ [newSuff]))
169
170-- | For each 'InfoHash' of torrents request scrape info from the tracker.
171-- However if the info hash list is 'null', the tracker should list
172-- all available torrents.
173--
174-- This function can throw 'RpcException'.
175--
176scrape :: Manager -> URI -> ScrapeQuery -> IO ScrapeInfo
177scrape m u q = do
178 case scrapeURL u of
179 Nothing -> throwIO ScrapelessTracker
180 Just uri -> httpTracker m uri (renderScrapeQuery q)
181
182-- | More particular version of 'scrape', just for one torrent.
183--
184-- This function can throw 'RpcException'.
185--
186scrapeOne :: Manager -> URI -> InfoHash -> IO ScrapeEntry
187scrapeOne m uri ih = do
188 xs <- scrape m uri [ih]
189 case L.lookup ih xs of
190 Nothing -> throwIO BadScrape
191 Just a -> return a
diff --git a/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs
new file mode 100644
index 00000000..31b6b870
--- /dev/null
+++ b/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs
@@ -0,0 +1,454 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013-2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : provisional
6-- Portability : portable
7--
8-- This module implement UDP tracker protocol.
9--
10-- For protocol details and uri scheme see:
11-- <http://www.bittorrent.org/beps/bep_0015.html>,
12-- <https://www.iana.org/assignments/uri-schemes/prov/udp>
13--
14{-# LANGUAGE RecordWildCards #-}
15{-# LANGUAGE FlexibleInstances #-}
16{-# LANGUAGE GeneralizedNewtypeDeriving #-}
17{-# LANGUAGE DeriveDataTypeable #-}
18module Network.BitTorrent.Tracker.RPC.UDP
19 ( -- * Manager
20 Options (..)
21 , Manager
22 , newManager
23 , closeManager
24 , withManager
25
26 -- * RPC
27 , RpcException (..)
28 , announce
29 , scrape
30 ) where
31
32import Control.Applicative
33import Control.Concurrent
34import Control.Exception
35import Control.Monad
36import Data.Default
37import Data.IORef
38import Data.List as L
39import Data.Map as M
40import Data.Maybe
41import Data.Serialize
42import Data.Text as T
43import Data.Time
44import Data.Time.Clock.POSIX
45import Data.Traversable
46import Data.Typeable
47import Text.Read (readMaybe)
48import Network.Socket hiding (Connected, connect, listen)
49import Network.Socket.ByteString as BS
50import Network.URI
51import System.Timeout
52
53import Network.BitTorrent.Tracker.Message
54
55{-----------------------------------------------------------------------
56-- Options
57-----------------------------------------------------------------------}
58
59-- | 'System.Timeout.timeout' specific.
60sec :: Int
61sec = 1000000
62
63-- | See <http://www.bittorrent.org/beps/bep_0015.html#time-outs>
64defMinTimeout :: Int
65defMinTimeout = 15
66
67-- | See <http://www.bittorrent.org/beps/bep_0015.html#time-outs>
68defMaxTimeout :: Int
69defMaxTimeout = 15 * 2 ^ (8 :: Int)
70
71-- | See: <http://www.bittorrent.org/beps/bep_0015.html#time-outs>
72defMultiplier :: Int
73defMultiplier = 2
74
75-- TODO why 98?
76defMaxPacketSize :: Int
77defMaxPacketSize = 98
78
79-- | Manager configuration.
80data Options = Options
81 { -- | Max size of a /response/ packet.
82 --
83 -- 'optMaxPacketSize' /must/ be a positive value.
84 --
85 optMaxPacketSize :: {-# UNPACK #-} !Int
86
87 -- | Starting timeout interval in seconds. If a response is not
88 -- received after 'optMinTimeout' then 'Manager' repeat RPC with
89 -- timeout interval multiplied by 'optMultiplier' and so on until
90 -- timeout interval reach 'optMaxTimeout'.
91 --
92 -- 'optMinTimeout' /must/ be a positive value.
93 --
94 , optMinTimeout :: {-# UNPACK #-} !Int
95
96 -- | Final timeout interval in seconds. After 'optMaxTimeout'
97 -- reached and tracker still not responding both 'announce' and
98 -- 'scrape' functions will throw 'TimeoutExpired' exception.
99 --
100 -- 'optMaxTimeout' /must/ be greater than 'optMinTimeout'.
101 --
102 , optMaxTimeout :: {-# UNPACK #-} !Int
103
104 -- | 'optMultiplier' /must/ be a positive value.
105 , optMultiplier :: {-# UNPACK #-} !Int
106 } deriving (Show, Eq)
107
108-- | Options suitable for bittorrent client.
109instance Default Options where
110 def = Options
111 { optMaxPacketSize = defMaxPacketSize
112 , optMinTimeout = defMinTimeout
113 , optMaxTimeout = defMaxTimeout
114 , optMultiplier = defMultiplier
115 }
116
117checkOptions :: Options -> IO ()
118checkOptions Options {..} = do
119 unless (optMaxPacketSize > 0) $ do
120 throwIO $ userError "optMaxPacketSize must be positive"
121
122 unless (optMinTimeout > 0) $ do
123 throwIO $ userError "optMinTimeout must be positive"
124
125 unless (optMaxTimeout > 0) $ do
126 throwIO $ userError "optMaxTimeout must be positive"
127
128 unless (optMultiplier > 0) $ do
129 throwIO $ userError "optMultiplier must be positive"
130
131 unless (optMaxTimeout > optMinTimeout) $ do
132 throwIO $ userError "optMaxTimeout must be greater than optMinTimeout"
133
134
135{-----------------------------------------------------------------------
136-- Manager state
137-----------------------------------------------------------------------}
138
139type ConnectionCache = Map SockAddr Connection
140
141type PendingResponse = MVar (Either RpcException Response)
142type PendingTransactions = Map TransactionId PendingResponse
143type PendingQueries = Map SockAddr PendingTransactions
144
145-- | UDP tracker manager.
146data Manager = Manager
147 { options :: !Options
148 , sock :: !Socket
149-- , dnsCache :: !(IORef (Map URI SockAddr))
150 , connectionCache :: !(IORef ConnectionCache)
151 , pendingResps :: !(MVar PendingQueries)
152 , listenerThread :: !(MVar ThreadId)
153 }
154
155initManager :: Options -> IO Manager
156initManager opts = Manager opts
157 <$> socket AF_INET Datagram defaultProtocol
158 <*> newIORef M.empty
159 <*> newMVar M.empty
160 <*> newEmptyMVar
161
162unblockAll :: PendingQueries -> IO ()
163unblockAll m = traverse (traverse unblockCall) m >> return ()
164 where
165 unblockCall ares = putMVar ares (Left ManagerClosed)
166
167resetState :: Manager -> IO ()
168resetState Manager {..} = do
169 writeIORef connectionCache err
170 m <- swapMVar pendingResps err
171 unblockAll m
172 mtid <- tryTakeMVar listenerThread
173 case mtid of
174 Nothing -> return () -- thread killed by 'closeManager'
175 Just _ -> return () -- thread killed by exception from 'listen'
176 return ()
177 where
178 err = error "UDP tracker manager closed"
179
180-- | This function will throw 'IOException' on invalid 'Options'.
181newManager :: Options -> IO Manager
182newManager opts = do
183 checkOptions opts
184 mgr <- initManager opts
185 tid <- forkIO (listen mgr `finally` resetState mgr)
186 putMVar (listenerThread mgr) tid
187 return mgr
188
189-- | Unblock all RPCs by throwing 'ManagerClosed' exception. No rpc
190-- calls should be performed after manager becomes closed.
191closeManager :: Manager -> IO ()
192closeManager Manager {..} = do
193 close sock
194 mtid <- tryTakeMVar listenerThread
195 case mtid of
196 Nothing -> return ()
197 Just tid -> killThread tid
198
199-- | Normally you need to use 'Control.Monad.Trans.Resource.allocate'.
200withManager :: Options -> (Manager -> IO a) -> IO a
201withManager opts = bracket (newManager opts) closeManager
202
203{-----------------------------------------------------------------------
204-- Exceptions
205-----------------------------------------------------------------------}
206
207data RpcException
208 -- | Unable to lookup hostname;
209 = HostUnknown
210
211 -- | Unable to lookup hostname;
212 | HostLookupFailed
213
214 -- | Expecting 'udp:', but some other scheme provided.
215 | UnrecognizedScheme String
216
217 -- | Tracker exists but not responding for specific number of seconds.
218 | TimeoutExpired Int
219
220 -- | Tracker responded with unexpected message type.
221 | UnexpectedResponse
222 { expectedMsg :: String
223 , actualMsg :: String
224 }
225
226 -- | RPC succeed, but tracker responded with error code.
227 | QueryFailed Text
228
229 -- | RPC manager closed while waiting for response.
230 | ManagerClosed
231 deriving (Eq, Show, Typeable)
232
233instance Exception RpcException
234
235{-----------------------------------------------------------------------
236-- Host Addr resolution
237-----------------------------------------------------------------------}
238
239setPort :: PortNumber -> SockAddr -> SockAddr
240setPort p (SockAddrInet _ h) = SockAddrInet p h
241setPort p (SockAddrInet6 _ f h s) = SockAddrInet6 p f h s
242setPort _ addr = addr
243
244resolveURI :: URI -> IO SockAddr
245resolveURI URI { uriAuthority = Just (URIAuth {..}) } = do
246 infos <- getAddrInfo Nothing (Just uriRegName) Nothing
247 let port = fromMaybe 0 (readMaybe (L.drop 1 uriPort) :: Maybe Int)
248 case infos of
249 AddrInfo {..} : _ -> return $ setPort (fromIntegral port) addrAddress
250 _ -> throwIO HostLookupFailed
251resolveURI _ = throwIO HostUnknown
252
253-- TODO caching?
254getTrackerAddr :: Manager -> URI -> IO SockAddr
255getTrackerAddr _ uri
256 | uriScheme uri == "udp:" = resolveURI uri
257 | otherwise = throwIO (UnrecognizedScheme (uriScheme uri))
258
259{-----------------------------------------------------------------------
260 Connection
261-----------------------------------------------------------------------}
262
263connectionLifetime :: NominalDiffTime
264connectionLifetime = 60
265
266data Connection = Connection
267 { connectionId :: ConnectionId
268 , connectionTimestamp :: UTCTime
269 } deriving Show
270
271-- placeholder for the first 'connect'
272initialConnection :: Connection
273initialConnection = Connection initialConnectionId (posixSecondsToUTCTime 0)
274
275establishedConnection :: ConnectionId -> IO Connection
276establishedConnection cid = Connection cid <$> getCurrentTime
277
278isExpired :: Connection -> IO Bool
279isExpired Connection {..} = do
280 currentTime <- getCurrentTime
281 let timeDiff = diffUTCTime currentTime connectionTimestamp
282 return $ timeDiff > connectionLifetime
283
284{-----------------------------------------------------------------------
285-- Transactions
286-----------------------------------------------------------------------}
287
288-- | Sometimes 'genTransactionId' may return already used transaction
289-- id. We use a good entropy source but the issue /still/ (with very
290-- small probabality) may happen. If the collision happen then this
291-- function tries to find nearest unused slot, otherwise pending
292-- transactions table is full.
293firstUnused :: SockAddr -> TransactionId -> PendingQueries -> TransactionId
294firstUnused addr rid m = do
295 case M.splitLookup rid <$> M.lookup addr m of
296 Nothing -> rid
297 Just (_ , Nothing, _ ) -> rid
298 Just (lt, Just _ , gt) ->
299 case backwardHole (keys lt) rid <|> forwardHole rid (keys gt) of
300 Nothing -> error "firstUnused: table is full" -- impossible
301 Just tid -> tid
302 where
303 forwardHole a []
304 | a == maxBound = Nothing
305 | otherwise = Just (succ a)
306 forwardHole a (b : xs)
307 | succ a == b = forwardHole b xs
308 | otherwise = Just (succ a)
309
310 backwardHole [] a
311 | a == minBound = Nothing
312 | otherwise = Just (pred a)
313 backwardHole (b : xs) a
314 | b == pred a = backwardHole xs b
315 | otherwise = Just (pred a)
316
317register :: SockAddr -> TransactionId -> PendingResponse
318 -> PendingQueries -> PendingQueries
319register addr tid ares = M.alter insertId addr
320 where
321 insertId Nothing = Just (M.singleton tid ares)
322 insertId (Just m) = Just (M.insert tid ares m)
323
324unregister :: SockAddr -> TransactionId
325 -> PendingQueries -> PendingQueries
326unregister addr tid = M.update deleteId addr
327 where
328 deleteId m
329 | M.null m' = Nothing
330 | otherwise = Just m'
331 where
332 m' = M.delete tid m
333
334-- | Generate a new unused transaction id and register as pending.
335allocTransaction :: Manager -> SockAddr -> PendingResponse -> IO TransactionId
336allocTransaction Manager {..} addr ares =
337 modifyMVar pendingResps $ \ m -> do
338 rndId <- genTransactionId
339 let tid = firstUnused addr rndId m
340 return (register addr tid ares m, tid)
341
342-- | Wake up blocked thread and return response back.
343commitTransaction :: Manager -> SockAddr -> TransactionId -> Response -> IO ()
344commitTransaction Manager {..} addr tid resp =
345 modifyMVarMasked_ pendingResps $ \ m -> do
346 case M.lookup tid =<< M.lookup addr m of
347 Nothing -> return m -- tracker responded after 'cancelTransaction' fired
348 Just ares -> do
349 putMVar ares (Right resp)
350 return $ unregister addr tid m
351
352-- | Abort transaction forcefully.
353cancelTransaction :: Manager -> SockAddr -> TransactionId -> IO ()
354cancelTransaction Manager {..} addr tid =
355 modifyMVarMasked_ pendingResps $ \m ->
356 return $ unregister addr tid m
357
358-- | Handle responses from trackers.
359listen :: Manager -> IO ()
360listen mgr @ Manager {..} = do
361 forever $ do
362 (bs, addr) <- BS.recvFrom sock (optMaxPacketSize options)
363 case decode bs of
364 Left _ -> return () -- parser failed, ignoring
365 Right (TransactionR {..}) -> commitTransaction mgr addr transIdR response
366
367-- | Perform RPC transaction. If the action interrupted transaction
368-- will be aborted.
369transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response
370transaction mgr @ Manager {..} addr conn request = do
371 ares <- newEmptyMVar
372 tid <- allocTransaction mgr addr ares
373 performTransaction tid ares
374 `onException` cancelTransaction mgr addr tid
375 where
376 performTransaction tid ares = do
377 let trans = TransactionQ (connectionId conn) tid request
378 BS.sendAllTo sock (encode trans) addr
379 takeMVar ares >>= either throwIO return
380
381{-----------------------------------------------------------------------
382-- Connection cache
383-----------------------------------------------------------------------}
384
385connect :: Manager -> SockAddr -> Connection -> IO ConnectionId
386connect m addr conn = do
387 resp <- transaction m addr conn Connect
388 case resp of
389 Connected cid -> return cid
390 Failed msg -> throwIO $ QueryFailed msg
391 _ -> throwIO $ UnexpectedResponse "connected" (responseName resp)
392
393newConnection :: Manager -> SockAddr -> IO Connection
394newConnection m addr = do
395 connId <- connect m addr initialConnection
396 establishedConnection connId
397
398refreshConnection :: Manager -> SockAddr -> Connection -> IO Connection
399refreshConnection mgr addr conn = do
400 expired <- isExpired conn
401 if expired
402 then do
403 connId <- connect mgr addr conn
404 establishedConnection connId
405 else do
406 return conn
407
408withCache :: Manager -> SockAddr
409 -> (Maybe Connection -> IO Connection) -> IO Connection
410withCache mgr addr action = do
411 cache <- readIORef (connectionCache mgr)
412 conn <- action (M.lookup addr cache)
413 writeIORef (connectionCache mgr) (M.insert addr conn cache)
414 return conn
415
416getConnection :: Manager -> SockAddr -> IO Connection
417getConnection mgr addr = withCache mgr addr $
418 maybe (newConnection mgr addr) (refreshConnection mgr addr)
419
420{-----------------------------------------------------------------------
421-- RPC
422-----------------------------------------------------------------------}
423
424retransmission :: Options -> IO a -> IO a
425retransmission Options {..} action = go optMinTimeout
426 where
427 go curTimeout
428 | curTimeout > optMaxTimeout = throwIO $ TimeoutExpired curTimeout
429 | otherwise = do
430 r <- timeout (curTimeout * sec) action
431 maybe (go (optMultiplier * curTimeout)) return r
432
433queryTracker :: Manager -> URI -> Request -> IO Response
434queryTracker mgr uri req = do
435 addr <- getTrackerAddr mgr uri
436 retransmission (options mgr) $ do
437 conn <- getConnection mgr addr
438 transaction mgr addr conn req
439
440-- | This function can throw 'RpcException'.
441announce :: Manager -> URI -> AnnounceQuery -> IO AnnounceInfo
442announce mgr uri q = do
443 resp <- queryTracker mgr uri (Announce q)
444 case resp of
445 Announced info -> return info
446 _ -> throwIO $ UnexpectedResponse "announce" (responseName resp)
447
448-- | This function can throw 'RpcException'.
449scrape :: Manager -> URI -> ScrapeQuery -> IO ScrapeInfo
450scrape mgr uri ihs = do
451 resp <- queryTracker mgr uri (Scrape ihs)
452 case resp of
453 Scraped info -> return $ L.zip ihs info
454 _ -> throwIO $ UnexpectedResponse "scrape" (responseName resp)