diff options
Diffstat (limited to 'bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs')
-rw-r--r-- | bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs | 454 |
1 files changed, 454 insertions, 0 deletions
diff --git a/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs new file mode 100644 index 00000000..31b6b870 --- /dev/null +++ b/bittorrent/src/Network/BitTorrent/Tracker/RPC/UDP.hs | |||
@@ -0,0 +1,454 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013-2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : provisional | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module implement UDP tracker protocol. | ||
9 | -- | ||
10 | -- For protocol details and uri scheme see: | ||
11 | -- <http://www.bittorrent.org/beps/bep_0015.html>, | ||
12 | -- <https://www.iana.org/assignments/uri-schemes/prov/udp> | ||
13 | -- | ||
14 | {-# LANGUAGE RecordWildCards #-} | ||
15 | {-# LANGUAGE FlexibleInstances #-} | ||
16 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
17 | {-# LANGUAGE DeriveDataTypeable #-} | ||
18 | module Network.BitTorrent.Tracker.RPC.UDP | ||
19 | ( -- * Manager | ||
20 | Options (..) | ||
21 | , Manager | ||
22 | , newManager | ||
23 | , closeManager | ||
24 | , withManager | ||
25 | |||
26 | -- * RPC | ||
27 | , RpcException (..) | ||
28 | , announce | ||
29 | , scrape | ||
30 | ) where | ||
31 | |||
32 | import Control.Applicative | ||
33 | import Control.Concurrent | ||
34 | import Control.Exception | ||
35 | import Control.Monad | ||
36 | import Data.Default | ||
37 | import Data.IORef | ||
38 | import Data.List as L | ||
39 | import Data.Map as M | ||
40 | import Data.Maybe | ||
41 | import Data.Serialize | ||
42 | import Data.Text as T | ||
43 | import Data.Time | ||
44 | import Data.Time.Clock.POSIX | ||
45 | import Data.Traversable | ||
46 | import Data.Typeable | ||
47 | import Text.Read (readMaybe) | ||
48 | import Network.Socket hiding (Connected, connect, listen) | ||
49 | import Network.Socket.ByteString as BS | ||
50 | import Network.URI | ||
51 | import System.Timeout | ||
52 | |||
53 | import Network.BitTorrent.Tracker.Message | ||
54 | |||
55 | {----------------------------------------------------------------------- | ||
56 | -- Options | ||
57 | -----------------------------------------------------------------------} | ||
58 | |||
59 | -- | 'System.Timeout.timeout' specific. | ||
60 | sec :: Int | ||
61 | sec = 1000000 | ||
62 | |||
63 | -- | See <http://www.bittorrent.org/beps/bep_0015.html#time-outs> | ||
64 | defMinTimeout :: Int | ||
65 | defMinTimeout = 15 | ||
66 | |||
67 | -- | See <http://www.bittorrent.org/beps/bep_0015.html#time-outs> | ||
68 | defMaxTimeout :: Int | ||
69 | defMaxTimeout = 15 * 2 ^ (8 :: Int) | ||
70 | |||
71 | -- | See: <http://www.bittorrent.org/beps/bep_0015.html#time-outs> | ||
72 | defMultiplier :: Int | ||
73 | defMultiplier = 2 | ||
74 | |||
75 | -- TODO why 98? | ||
76 | defMaxPacketSize :: Int | ||
77 | defMaxPacketSize = 98 | ||
78 | |||
79 | -- | Manager configuration. | ||
80 | data Options = Options | ||
81 | { -- | Max size of a /response/ packet. | ||
82 | -- | ||
83 | -- 'optMaxPacketSize' /must/ be a positive value. | ||
84 | -- | ||
85 | optMaxPacketSize :: {-# UNPACK #-} !Int | ||
86 | |||
87 | -- | Starting timeout interval in seconds. If a response is not | ||
88 | -- received after 'optMinTimeout' then 'Manager' repeat RPC with | ||
89 | -- timeout interval multiplied by 'optMultiplier' and so on until | ||
90 | -- timeout interval reach 'optMaxTimeout'. | ||
91 | -- | ||
92 | -- 'optMinTimeout' /must/ be a positive value. | ||
93 | -- | ||
94 | , optMinTimeout :: {-# UNPACK #-} !Int | ||
95 | |||
96 | -- | Final timeout interval in seconds. After 'optMaxTimeout' | ||
97 | -- reached and tracker still not responding both 'announce' and | ||
98 | -- 'scrape' functions will throw 'TimeoutExpired' exception. | ||
99 | -- | ||
100 | -- 'optMaxTimeout' /must/ be greater than 'optMinTimeout'. | ||
101 | -- | ||
102 | , optMaxTimeout :: {-# UNPACK #-} !Int | ||
103 | |||
104 | -- | 'optMultiplier' /must/ be a positive value. | ||
105 | , optMultiplier :: {-# UNPACK #-} !Int | ||
106 | } deriving (Show, Eq) | ||
107 | |||
108 | -- | Options suitable for bittorrent client. | ||
109 | instance Default Options where | ||
110 | def = Options | ||
111 | { optMaxPacketSize = defMaxPacketSize | ||
112 | , optMinTimeout = defMinTimeout | ||
113 | , optMaxTimeout = defMaxTimeout | ||
114 | , optMultiplier = defMultiplier | ||
115 | } | ||
116 | |||
117 | checkOptions :: Options -> IO () | ||
118 | checkOptions Options {..} = do | ||
119 | unless (optMaxPacketSize > 0) $ do | ||
120 | throwIO $ userError "optMaxPacketSize must be positive" | ||
121 | |||
122 | unless (optMinTimeout > 0) $ do | ||
123 | throwIO $ userError "optMinTimeout must be positive" | ||
124 | |||
125 | unless (optMaxTimeout > 0) $ do | ||
126 | throwIO $ userError "optMaxTimeout must be positive" | ||
127 | |||
128 | unless (optMultiplier > 0) $ do | ||
129 | throwIO $ userError "optMultiplier must be positive" | ||
130 | |||
131 | unless (optMaxTimeout > optMinTimeout) $ do | ||
132 | throwIO $ userError "optMaxTimeout must be greater than optMinTimeout" | ||
133 | |||
134 | |||
135 | {----------------------------------------------------------------------- | ||
136 | -- Manager state | ||
137 | -----------------------------------------------------------------------} | ||
138 | |||
139 | type ConnectionCache = Map SockAddr Connection | ||
140 | |||
141 | type PendingResponse = MVar (Either RpcException Response) | ||
142 | type PendingTransactions = Map TransactionId PendingResponse | ||
143 | type PendingQueries = Map SockAddr PendingTransactions | ||
144 | |||
145 | -- | UDP tracker manager. | ||
146 | data Manager = Manager | ||
147 | { options :: !Options | ||
148 | , sock :: !Socket | ||
149 | -- , dnsCache :: !(IORef (Map URI SockAddr)) | ||
150 | , connectionCache :: !(IORef ConnectionCache) | ||
151 | , pendingResps :: !(MVar PendingQueries) | ||
152 | , listenerThread :: !(MVar ThreadId) | ||
153 | } | ||
154 | |||
155 | initManager :: Options -> IO Manager | ||
156 | initManager opts = Manager opts | ||
157 | <$> socket AF_INET Datagram defaultProtocol | ||
158 | <*> newIORef M.empty | ||
159 | <*> newMVar M.empty | ||
160 | <*> newEmptyMVar | ||
161 | |||
162 | unblockAll :: PendingQueries -> IO () | ||
163 | unblockAll m = traverse (traverse unblockCall) m >> return () | ||
164 | where | ||
165 | unblockCall ares = putMVar ares (Left ManagerClosed) | ||
166 | |||
167 | resetState :: Manager -> IO () | ||
168 | resetState Manager {..} = do | ||
169 | writeIORef connectionCache err | ||
170 | m <- swapMVar pendingResps err | ||
171 | unblockAll m | ||
172 | mtid <- tryTakeMVar listenerThread | ||
173 | case mtid of | ||
174 | Nothing -> return () -- thread killed by 'closeManager' | ||
175 | Just _ -> return () -- thread killed by exception from 'listen' | ||
176 | return () | ||
177 | where | ||
178 | err = error "UDP tracker manager closed" | ||
179 | |||
180 | -- | This function will throw 'IOException' on invalid 'Options'. | ||
181 | newManager :: Options -> IO Manager | ||
182 | newManager opts = do | ||
183 | checkOptions opts | ||
184 | mgr <- initManager opts | ||
185 | tid <- forkIO (listen mgr `finally` resetState mgr) | ||
186 | putMVar (listenerThread mgr) tid | ||
187 | return mgr | ||
188 | |||
189 | -- | Unblock all RPCs by throwing 'ManagerClosed' exception. No rpc | ||
190 | -- calls should be performed after manager becomes closed. | ||
191 | closeManager :: Manager -> IO () | ||
192 | closeManager Manager {..} = do | ||
193 | close sock | ||
194 | mtid <- tryTakeMVar listenerThread | ||
195 | case mtid of | ||
196 | Nothing -> return () | ||
197 | Just tid -> killThread tid | ||
198 | |||
199 | -- | Normally you need to use 'Control.Monad.Trans.Resource.allocate'. | ||
200 | withManager :: Options -> (Manager -> IO a) -> IO a | ||
201 | withManager opts = bracket (newManager opts) closeManager | ||
202 | |||
203 | {----------------------------------------------------------------------- | ||
204 | -- Exceptions | ||
205 | -----------------------------------------------------------------------} | ||
206 | |||
207 | data RpcException | ||
208 | -- | Unable to lookup hostname; | ||
209 | = HostUnknown | ||
210 | |||
211 | -- | Unable to lookup hostname; | ||
212 | | HostLookupFailed | ||
213 | |||
214 | -- | Expecting 'udp:', but some other scheme provided. | ||
215 | | UnrecognizedScheme String | ||
216 | |||
217 | -- | Tracker exists but not responding for specific number of seconds. | ||
218 | | TimeoutExpired Int | ||
219 | |||
220 | -- | Tracker responded with unexpected message type. | ||
221 | | UnexpectedResponse | ||
222 | { expectedMsg :: String | ||
223 | , actualMsg :: String | ||
224 | } | ||
225 | |||
226 | -- | RPC succeed, but tracker responded with error code. | ||
227 | | QueryFailed Text | ||
228 | |||
229 | -- | RPC manager closed while waiting for response. | ||
230 | | ManagerClosed | ||
231 | deriving (Eq, Show, Typeable) | ||
232 | |||
233 | instance Exception RpcException | ||
234 | |||
235 | {----------------------------------------------------------------------- | ||
236 | -- Host Addr resolution | ||
237 | -----------------------------------------------------------------------} | ||
238 | |||
239 | setPort :: PortNumber -> SockAddr -> SockAddr | ||
240 | setPort p (SockAddrInet _ h) = SockAddrInet p h | ||
241 | setPort p (SockAddrInet6 _ f h s) = SockAddrInet6 p f h s | ||
242 | setPort _ addr = addr | ||
243 | |||
244 | resolveURI :: URI -> IO SockAddr | ||
245 | resolveURI URI { uriAuthority = Just (URIAuth {..}) } = do | ||
246 | infos <- getAddrInfo Nothing (Just uriRegName) Nothing | ||
247 | let port = fromMaybe 0 (readMaybe (L.drop 1 uriPort) :: Maybe Int) | ||
248 | case infos of | ||
249 | AddrInfo {..} : _ -> return $ setPort (fromIntegral port) addrAddress | ||
250 | _ -> throwIO HostLookupFailed | ||
251 | resolveURI _ = throwIO HostUnknown | ||
252 | |||
253 | -- TODO caching? | ||
254 | getTrackerAddr :: Manager -> URI -> IO SockAddr | ||
255 | getTrackerAddr _ uri | ||
256 | | uriScheme uri == "udp:" = resolveURI uri | ||
257 | | otherwise = throwIO (UnrecognizedScheme (uriScheme uri)) | ||
258 | |||
259 | {----------------------------------------------------------------------- | ||
260 | Connection | ||
261 | -----------------------------------------------------------------------} | ||
262 | |||
263 | connectionLifetime :: NominalDiffTime | ||
264 | connectionLifetime = 60 | ||
265 | |||
266 | data Connection = Connection | ||
267 | { connectionId :: ConnectionId | ||
268 | , connectionTimestamp :: UTCTime | ||
269 | } deriving Show | ||
270 | |||
271 | -- placeholder for the first 'connect' | ||
272 | initialConnection :: Connection | ||
273 | initialConnection = Connection initialConnectionId (posixSecondsToUTCTime 0) | ||
274 | |||
275 | establishedConnection :: ConnectionId -> IO Connection | ||
276 | establishedConnection cid = Connection cid <$> getCurrentTime | ||
277 | |||
278 | isExpired :: Connection -> IO Bool | ||
279 | isExpired Connection {..} = do | ||
280 | currentTime <- getCurrentTime | ||
281 | let timeDiff = diffUTCTime currentTime connectionTimestamp | ||
282 | return $ timeDiff > connectionLifetime | ||
283 | |||
284 | {----------------------------------------------------------------------- | ||
285 | -- Transactions | ||
286 | -----------------------------------------------------------------------} | ||
287 | |||
288 | -- | Sometimes 'genTransactionId' may return already used transaction | ||
289 | -- id. We use a good entropy source but the issue /still/ (with very | ||
290 | -- small probabality) may happen. If the collision happen then this | ||
291 | -- function tries to find nearest unused slot, otherwise pending | ||
292 | -- transactions table is full. | ||
293 | firstUnused :: SockAddr -> TransactionId -> PendingQueries -> TransactionId | ||
294 | firstUnused addr rid m = do | ||
295 | case M.splitLookup rid <$> M.lookup addr m of | ||
296 | Nothing -> rid | ||
297 | Just (_ , Nothing, _ ) -> rid | ||
298 | Just (lt, Just _ , gt) -> | ||
299 | case backwardHole (keys lt) rid <|> forwardHole rid (keys gt) of | ||
300 | Nothing -> error "firstUnused: table is full" -- impossible | ||
301 | Just tid -> tid | ||
302 | where | ||
303 | forwardHole a [] | ||
304 | | a == maxBound = Nothing | ||
305 | | otherwise = Just (succ a) | ||
306 | forwardHole a (b : xs) | ||
307 | | succ a == b = forwardHole b xs | ||
308 | | otherwise = Just (succ a) | ||
309 | |||
310 | backwardHole [] a | ||
311 | | a == minBound = Nothing | ||
312 | | otherwise = Just (pred a) | ||
313 | backwardHole (b : xs) a | ||
314 | | b == pred a = backwardHole xs b | ||
315 | | otherwise = Just (pred a) | ||
316 | |||
317 | register :: SockAddr -> TransactionId -> PendingResponse | ||
318 | -> PendingQueries -> PendingQueries | ||
319 | register addr tid ares = M.alter insertId addr | ||
320 | where | ||
321 | insertId Nothing = Just (M.singleton tid ares) | ||
322 | insertId (Just m) = Just (M.insert tid ares m) | ||
323 | |||
324 | unregister :: SockAddr -> TransactionId | ||
325 | -> PendingQueries -> PendingQueries | ||
326 | unregister addr tid = M.update deleteId addr | ||
327 | where | ||
328 | deleteId m | ||
329 | | M.null m' = Nothing | ||
330 | | otherwise = Just m' | ||
331 | where | ||
332 | m' = M.delete tid m | ||
333 | |||
334 | -- | Generate a new unused transaction id and register as pending. | ||
335 | allocTransaction :: Manager -> SockAddr -> PendingResponse -> IO TransactionId | ||
336 | allocTransaction Manager {..} addr ares = | ||
337 | modifyMVar pendingResps $ \ m -> do | ||
338 | rndId <- genTransactionId | ||
339 | let tid = firstUnused addr rndId m | ||
340 | return (register addr tid ares m, tid) | ||
341 | |||
342 | -- | Wake up blocked thread and return response back. | ||
343 | commitTransaction :: Manager -> SockAddr -> TransactionId -> Response -> IO () | ||
344 | commitTransaction Manager {..} addr tid resp = | ||
345 | modifyMVarMasked_ pendingResps $ \ m -> do | ||
346 | case M.lookup tid =<< M.lookup addr m of | ||
347 | Nothing -> return m -- tracker responded after 'cancelTransaction' fired | ||
348 | Just ares -> do | ||
349 | putMVar ares (Right resp) | ||
350 | return $ unregister addr tid m | ||
351 | |||
352 | -- | Abort transaction forcefully. | ||
353 | cancelTransaction :: Manager -> SockAddr -> TransactionId -> IO () | ||
354 | cancelTransaction Manager {..} addr tid = | ||
355 | modifyMVarMasked_ pendingResps $ \m -> | ||
356 | return $ unregister addr tid m | ||
357 | |||
358 | -- | Handle responses from trackers. | ||
359 | listen :: Manager -> IO () | ||
360 | listen mgr @ Manager {..} = do | ||
361 | forever $ do | ||
362 | (bs, addr) <- BS.recvFrom sock (optMaxPacketSize options) | ||
363 | case decode bs of | ||
364 | Left _ -> return () -- parser failed, ignoring | ||
365 | Right (TransactionR {..}) -> commitTransaction mgr addr transIdR response | ||
366 | |||
367 | -- | Perform RPC transaction. If the action interrupted transaction | ||
368 | -- will be aborted. | ||
369 | transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response | ||
370 | transaction mgr @ Manager {..} addr conn request = do | ||
371 | ares <- newEmptyMVar | ||
372 | tid <- allocTransaction mgr addr ares | ||
373 | performTransaction tid ares | ||
374 | `onException` cancelTransaction mgr addr tid | ||
375 | where | ||
376 | performTransaction tid ares = do | ||
377 | let trans = TransactionQ (connectionId conn) tid request | ||
378 | BS.sendAllTo sock (encode trans) addr | ||
379 | takeMVar ares >>= either throwIO return | ||
380 | |||
381 | {----------------------------------------------------------------------- | ||
382 | -- Connection cache | ||
383 | -----------------------------------------------------------------------} | ||
384 | |||
385 | connect :: Manager -> SockAddr -> Connection -> IO ConnectionId | ||
386 | connect m addr conn = do | ||
387 | resp <- transaction m addr conn Connect | ||
388 | case resp of | ||
389 | Connected cid -> return cid | ||
390 | Failed msg -> throwIO $ QueryFailed msg | ||
391 | _ -> throwIO $ UnexpectedResponse "connected" (responseName resp) | ||
392 | |||
393 | newConnection :: Manager -> SockAddr -> IO Connection | ||
394 | newConnection m addr = do | ||
395 | connId <- connect m addr initialConnection | ||
396 | establishedConnection connId | ||
397 | |||
398 | refreshConnection :: Manager -> SockAddr -> Connection -> IO Connection | ||
399 | refreshConnection mgr addr conn = do | ||
400 | expired <- isExpired conn | ||
401 | if expired | ||
402 | then do | ||
403 | connId <- connect mgr addr conn | ||
404 | establishedConnection connId | ||
405 | else do | ||
406 | return conn | ||
407 | |||
408 | withCache :: Manager -> SockAddr | ||
409 | -> (Maybe Connection -> IO Connection) -> IO Connection | ||
410 | withCache mgr addr action = do | ||
411 | cache <- readIORef (connectionCache mgr) | ||
412 | conn <- action (M.lookup addr cache) | ||
413 | writeIORef (connectionCache mgr) (M.insert addr conn cache) | ||
414 | return conn | ||
415 | |||
416 | getConnection :: Manager -> SockAddr -> IO Connection | ||
417 | getConnection mgr addr = withCache mgr addr $ | ||
418 | maybe (newConnection mgr addr) (refreshConnection mgr addr) | ||
419 | |||
420 | {----------------------------------------------------------------------- | ||
421 | -- RPC | ||
422 | -----------------------------------------------------------------------} | ||
423 | |||
424 | retransmission :: Options -> IO a -> IO a | ||
425 | retransmission Options {..} action = go optMinTimeout | ||
426 | where | ||
427 | go curTimeout | ||
428 | | curTimeout > optMaxTimeout = throwIO $ TimeoutExpired curTimeout | ||
429 | | otherwise = do | ||
430 | r <- timeout (curTimeout * sec) action | ||
431 | maybe (go (optMultiplier * curTimeout)) return r | ||
432 | |||
433 | queryTracker :: Manager -> URI -> Request -> IO Response | ||
434 | queryTracker mgr uri req = do | ||
435 | addr <- getTrackerAddr mgr uri | ||
436 | retransmission (options mgr) $ do | ||
437 | conn <- getConnection mgr addr | ||
438 | transaction mgr addr conn req | ||
439 | |||
440 | -- | This function can throw 'RpcException'. | ||
441 | announce :: Manager -> URI -> AnnounceQuery -> IO AnnounceInfo | ||
442 | announce mgr uri q = do | ||
443 | resp <- queryTracker mgr uri (Announce q) | ||
444 | case resp of | ||
445 | Announced info -> return info | ||
446 | _ -> throwIO $ UnexpectedResponse "announce" (responseName resp) | ||
447 | |||
448 | -- | This function can throw 'RpcException'. | ||
449 | scrape :: Manager -> URI -> ScrapeQuery -> IO ScrapeInfo | ||
450 | scrape mgr uri ihs = do | ||
451 | resp <- queryTracker mgr uri (Scrape ihs) | ||
452 | case resp of | ||
453 | Scraped info -> return $ L.zip ihs info | ||
454 | _ -> throwIO $ UnexpectedResponse "scrape" (responseName resp) | ||