diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Tracker/RPC/UDP.hs | 138 |
1 files changed, 97 insertions, 41 deletions
diff --git a/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/src/Network/BitTorrent/Tracker/RPC/UDP.hs index bc4f9dd0..499b6d37 100644 --- a/src/Network/BitTorrent/Tracker/RPC/UDP.hs +++ b/src/Network/BitTorrent/Tracker/RPC/UDP.hs | |||
@@ -29,9 +29,9 @@ module Network.BitTorrent.Tracker.RPC.UDP | |||
29 | ) where | 29 | ) where |
30 | 30 | ||
31 | import Control.Applicative | 31 | import Control.Applicative |
32 | import Control.Concurrent | ||
32 | import Control.Exception | 33 | import Control.Exception |
33 | import Control.Monad | 34 | import Control.Monad |
34 | import Data.ByteString (ByteString) | ||
35 | import Data.Default | 35 | import Data.Default |
36 | import Data.IORef | 36 | import Data.IORef |
37 | import Data.List as L | 37 | import Data.List as L |
@@ -46,7 +46,7 @@ import Data.Time.Clock.POSIX | |||
46 | import Data.Typeable | 46 | import Data.Typeable |
47 | import Data.Word | 47 | import Data.Word |
48 | import Text.Read (readMaybe) | 48 | import Text.Read (readMaybe) |
49 | import Network.Socket hiding (Connected, connect) | 49 | import Network.Socket hiding (Connected, connect, listen) |
50 | import Network.Socket.ByteString as BS | 50 | import Network.Socket.ByteString as BS |
51 | import Network.URI | 51 | import Network.URI |
52 | import System.Entropy | 52 | import System.Entropy |
@@ -56,7 +56,7 @@ import Numeric | |||
56 | import Network.BitTorrent.Tracker.Message | 56 | import Network.BitTorrent.Tracker.Message |
57 | 57 | ||
58 | {----------------------------------------------------------------------- | 58 | {----------------------------------------------------------------------- |
59 | -- Manager | 59 | -- Options |
60 | -----------------------------------------------------------------------} | 60 | -----------------------------------------------------------------------} |
61 | 61 | ||
62 | sec :: Int | 62 | sec :: Int |
@@ -89,21 +89,54 @@ instance Default Options where | |||
89 | , optMaxTimeout = defMaxTimeout | 89 | , optMaxTimeout = defMaxTimeout |
90 | } | 90 | } |
91 | 91 | ||
92 | {----------------------------------------------------------------------- | ||
93 | -- Manager state | ||
94 | -----------------------------------------------------------------------} | ||
95 | |||
96 | type PendingTransactions = Map TransactionId (MVar Response) | ||
97 | |||
92 | data Manager = Manager | 98 | data Manager = Manager |
93 | { options :: !Options | 99 | { options :: !Options |
94 | , sock :: !Socket | 100 | , sock :: !Socket |
95 | -- , dnsCache :: !(IORef (Map URI SockAddr)) | 101 | -- , dnsCache :: !(IORef (Map URI SockAddr)) |
96 | , connectionCache :: !(IORef (Map SockAddr Connection)) | 102 | , connectionCache :: !(IORef (Map SockAddr Connection)) |
97 | -- , pendingResps :: !(IORef (Map Connection [MessageId])) | 103 | , pendingResps :: !(MVar (Map SockAddr PendingTransactions)) |
104 | , listenerThread :: !(MVar ThreadId) | ||
98 | } | 105 | } |
99 | 106 | ||
100 | newManager :: Options -> IO Manager | 107 | initManager :: Options -> IO Manager |
101 | newManager opts = Manager opts | 108 | initManager opts = Manager opts |
102 | <$> socket AF_INET Datagram defaultProtocol | 109 | <$> socket AF_INET Datagram defaultProtocol |
103 | <*> newIORef M.empty | 110 | <*> newIORef M.empty |
111 | <*> newMVar M.empty | ||
112 | <*> newEmptyMVar | ||
113 | |||
114 | resetState :: Manager -> IO () | ||
115 | resetState Manager {..} = do | ||
116 | writeIORef connectionCache err | ||
117 | _ <-swapMVar pendingResps err | ||
118 | m <- tryTakeMVar listenerThread | ||
119 | case m of | ||
120 | Nothing -> return () -- thread killed by 'closeManager' | ||
121 | Just _ -> return () -- thread killed by exception from 'listen' | ||
122 | return () | ||
123 | where | ||
124 | err = error "UDP tracker manager closed" | ||
125 | |||
126 | newManager :: Options -> IO Manager | ||
127 | newManager opts = do | ||
128 | mgr <- initManager opts | ||
129 | tid <- forkIO (listen mgr `finally` resetState mgr) | ||
130 | putMVar (listenerThread mgr) tid | ||
131 | return mgr | ||
104 | 132 | ||
105 | closeManager :: Manager -> IO () | 133 | closeManager :: Manager -> IO () |
106 | closeManager Manager {..} = close sock | 134 | closeManager Manager {..} = do |
135 | close sock | ||
136 | mtid <- tryTakeMVar listenerThread | ||
137 | case mtid of | ||
138 | Nothing -> return () | ||
139 | Just tid -> killThread tid | ||
107 | 140 | ||
108 | withManager :: Options -> (Manager -> IO a) -> IO a | 141 | withManager :: Options -> (Manager -> IO a) -> IO a |
109 | withManager opts = bracket (newManager opts) closeManager | 142 | withManager opts = bracket (newManager opts) closeManager |
@@ -122,21 +155,6 @@ data RpcException | |||
122 | -- | Tracker exists but not responding for specific number of seconds. | 155 | -- | Tracker exists but not responding for specific number of seconds. |
123 | | TrackerNotResponding Int | 156 | | TrackerNotResponding Int |
124 | 157 | ||
125 | -- | Source\/destination socket address mismatch. | ||
126 | -- | ||
127 | -- WARNING: This is a BUG and will be fixed! | ||
128 | -- | ||
129 | | UnexpectedSource | ||
130 | |||
131 | -- | Source\/destination transaction id mismatch. | ||
132 | -- | ||
133 | -- WARNING: This is a BUG and will be fixed! | ||
134 | -- | ||
135 | | TransactionFailed | ||
136 | |||
137 | -- | Unable to decode tracker response; | ||
138 | | ParserFailure String | ||
139 | |||
140 | -- | Tracker respond with unexpected message type. | 158 | -- | Tracker respond with unexpected message type. |
141 | | UnexpectedResponse | 159 | | UnexpectedResponse |
142 | { expectedMsg :: String | 160 | { expectedMsg :: String |
@@ -192,10 +210,9 @@ instance Show ConnectionId where | |||
192 | initialConnectionId :: ConnectionId | 210 | initialConnectionId :: ConnectionId |
193 | initialConnectionId = ConnectionId 0x41727101980 | 211 | initialConnectionId = ConnectionId 0x41727101980 |
194 | 212 | ||
195 | -- TODO rename | ||
196 | -- | Transaction Id is used within a UDP RPC. | 213 | -- | Transaction Id is used within a UDP RPC. |
197 | newtype TransactionId = TransactionId Word32 | 214 | newtype TransactionId = TransactionId Word32 |
198 | deriving (Eq, Serialize) | 215 | deriving (Eq, Ord, Serialize) |
199 | 216 | ||
200 | instance Show TransactionId where | 217 | instance Show TransactionId where |
201 | showsPrec _ (TransactionId tid) = showString "0x" <> showHex tid | 218 | showsPrec _ (TransactionId tid) = showString "0x" <> showHex tid |
@@ -343,27 +360,66 @@ isExpired Connection {..} = do | |||
343 | return $ timeDiff > connectionLifetime | 360 | return $ timeDiff > connectionLifetime |
344 | 361 | ||
345 | {----------------------------------------------------------------------- | 362 | {----------------------------------------------------------------------- |
346 | -- Basic transaction | 363 | -- Transactions |
347 | -----------------------------------------------------------------------} | 364 | -----------------------------------------------------------------------} |
348 | 365 | ||
349 | call :: Manager -> SockAddr -> ByteString -> IO ByteString | 366 | allocTransaction :: Manager -> SockAddr -> MVar Response -> IO TransactionId |
350 | call Manager {..} addr arg = do | 367 | allocTransaction Manager {..} addr ares = modifyMVar pendingResps bindId |
351 | BS.sendAllTo sock arg addr | 368 | where |
352 | (res, addr') <- BS.recvFrom sock (optMaxPacketSize options) | 369 | bindId m = do |
353 | unless (addr' == addr) $ do | 370 | tid <- genTransactionId |
354 | throwIO $ UnexpectedSource | 371 | case M.lookup tid =<< M.lookup addr m of |
355 | return res | 372 | Just _ -> bindId m -- already used, retry |
373 | Nothing -> return (M.alter (insertId tid) addr m, tid) | ||
374 | |||
375 | insertId tid Nothing = Just (M.singleton tid ares) | ||
376 | insertId tid (Just m) = Just (M.insert tid ares m) | ||
377 | |||
378 | commitTransaction :: Manager -> SockAddr -> TransactionId -> Response -> IO () | ||
379 | commitTransaction Manager {..} addr tid resp = | ||
380 | modifyMVar_ pendingResps $ \ m -> do | ||
381 | case M.lookup tid =<< M.lookup addr m of | ||
382 | Nothing -> return m -- tracker responded after 'cancelTransaction' fired | ||
383 | Just ares -> do | ||
384 | putMVar ares resp | ||
385 | return $ M.update deleteId addr m | ||
386 | where | ||
387 | deleteId m | ||
388 | | M.null m' = Nothing | ||
389 | | otherwise = Just m' | ||
390 | where | ||
391 | m' = M.delete tid m | ||
392 | |||
393 | cancelTransaction :: Manager -> SockAddr -> TransactionId -> IO () | ||
394 | cancelTransaction Manager {..} addr tid = | ||
395 | modifyMVar_ pendingResps $ \m -> | ||
396 | return $ M.update deleteId addr m | ||
397 | where | ||
398 | deleteId m | ||
399 | | M.null m' = Nothing | ||
400 | | otherwise = Just m' | ||
401 | where | ||
402 | m' = M.delete tid m | ||
403 | |||
404 | listen :: Manager -> IO () | ||
405 | listen mgr @ Manager {..} = do | ||
406 | forever $ do | ||
407 | (bs, addr) <- BS.recvFrom sock (optMaxPacketSize options) | ||
408 | case decode bs of | ||
409 | Left _ -> return () | ||
410 | Right (TransactionR {..}) -> commitTransaction mgr addr transIdR response | ||
356 | 411 | ||
357 | transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response | 412 | transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response |
358 | transaction m addr conn request = do | 413 | transaction mgr @ Manager {..} addr conn request = do |
359 | tid <- genTransactionId | 414 | ares <- newEmptyMVar |
360 | let trans = TransactionQ (connectionId conn) tid request | 415 | tid <- allocTransaction mgr addr ares |
361 | res <- call m addr (encode trans) | 416 | performTransaction tid ares |
362 | case decode res of | 417 | `onException` cancelTransaction mgr addr tid |
363 | Right (TransactionR {..}) | 418 | where |
364 | | tid == transIdR -> return response | 419 | performTransaction tid ares = do |
365 | | otherwise -> throwIO $ TransactionFailed | 420 | let trans = TransactionQ (connectionId conn) tid request |
366 | Left msg -> throwIO $ ParserFailure msg | 421 | BS.sendAllTo sock (encode trans) addr |
422 | takeMVar ares | ||
367 | 423 | ||
368 | {----------------------------------------------------------------------- | 424 | {----------------------------------------------------------------------- |
369 | -- Connection cache | 425 | -- Connection cache |