From 7be32f085a25f847b6eb07a085b4db8b3d3f6da3 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Fri, 21 Feb 2014 22:24:47 +0400 Subject: Add UDP tracker listener --- src/Network/BitTorrent/Tracker/RPC/UDP.hs | 138 +++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 41 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/src/Network/BitTorrent/Tracker/RPC/UDP.hs index bc4f9dd0..499b6d37 100644 --- a/src/Network/BitTorrent/Tracker/RPC/UDP.hs +++ b/src/Network/BitTorrent/Tracker/RPC/UDP.hs @@ -29,9 +29,9 @@ module Network.BitTorrent.Tracker.RPC.UDP ) where import Control.Applicative +import Control.Concurrent import Control.Exception import Control.Monad -import Data.ByteString (ByteString) import Data.Default import Data.IORef import Data.List as L @@ -46,7 +46,7 @@ import Data.Time.Clock.POSIX import Data.Typeable import Data.Word import Text.Read (readMaybe) -import Network.Socket hiding (Connected, connect) +import Network.Socket hiding (Connected, connect, listen) import Network.Socket.ByteString as BS import Network.URI import System.Entropy @@ -56,7 +56,7 @@ import Numeric import Network.BitTorrent.Tracker.Message {----------------------------------------------------------------------- --- Manager +-- Options -----------------------------------------------------------------------} sec :: Int @@ -89,21 +89,54 @@ instance Default Options where , optMaxTimeout = defMaxTimeout } +{----------------------------------------------------------------------- +-- Manager state +-----------------------------------------------------------------------} + +type PendingTransactions = Map TransactionId (MVar Response) + data Manager = Manager { options :: !Options , sock :: !Socket -- , dnsCache :: !(IORef (Map URI SockAddr)) , connectionCache :: !(IORef (Map SockAddr Connection)) --- , pendingResps :: !(IORef (Map Connection [MessageId])) + , pendingResps :: !(MVar (Map SockAddr PendingTransactions)) + , listenerThread :: !(MVar ThreadId) } -newManager :: Options -> IO Manager -newManager opts = Manager opts +initManager :: Options -> IO Manager +initManager opts = Manager opts <$> socket AF_INET Datagram defaultProtocol <*> newIORef M.empty + <*> newMVar M.empty + <*> newEmptyMVar + +resetState :: Manager -> IO () +resetState Manager {..} = do + writeIORef connectionCache err + _ <-swapMVar pendingResps err + m <- tryTakeMVar listenerThread + case m of + Nothing -> return () -- thread killed by 'closeManager' + Just _ -> return () -- thread killed by exception from 'listen' + return () + where + err = error "UDP tracker manager closed" + +newManager :: Options -> IO Manager +newManager opts = do + mgr <- initManager opts + tid <- forkIO (listen mgr `finally` resetState mgr) + putMVar (listenerThread mgr) tid + return mgr closeManager :: Manager -> IO () -closeManager Manager {..} = close sock +closeManager Manager {..} = do + close sock + mtid <- tryTakeMVar listenerThread + case mtid of + Nothing -> return () + Just tid -> killThread tid withManager :: Options -> (Manager -> IO a) -> IO a withManager opts = bracket (newManager opts) closeManager @@ -122,21 +155,6 @@ data RpcException -- | Tracker exists but not responding for specific number of seconds. | TrackerNotResponding Int - -- | Source\/destination socket address mismatch. - -- - -- WARNING: This is a BUG and will be fixed! - -- - | UnexpectedSource - - -- | Source\/destination transaction id mismatch. - -- - -- WARNING: This is a BUG and will be fixed! - -- - | TransactionFailed - - -- | Unable to decode tracker response; - | ParserFailure String - -- | Tracker respond with unexpected message type. | UnexpectedResponse { expectedMsg :: String @@ -192,10 +210,9 @@ instance Show ConnectionId where initialConnectionId :: ConnectionId initialConnectionId = ConnectionId 0x41727101980 --- TODO rename -- | Transaction Id is used within a UDP RPC. newtype TransactionId = TransactionId Word32 - deriving (Eq, Serialize) + deriving (Eq, Ord, Serialize) instance Show TransactionId where showsPrec _ (TransactionId tid) = showString "0x" <> showHex tid @@ -343,27 +360,66 @@ isExpired Connection {..} = do return $ timeDiff > connectionLifetime {----------------------------------------------------------------------- --- Basic transaction +-- Transactions -----------------------------------------------------------------------} -call :: Manager -> SockAddr -> ByteString -> IO ByteString -call Manager {..} addr arg = do - BS.sendAllTo sock arg addr - (res, addr') <- BS.recvFrom sock (optMaxPacketSize options) - unless (addr' == addr) $ do - throwIO $ UnexpectedSource - return res +allocTransaction :: Manager -> SockAddr -> MVar Response -> IO TransactionId +allocTransaction Manager {..} addr ares = modifyMVar pendingResps bindId + where + bindId m = do + tid <- genTransactionId + case M.lookup tid =<< M.lookup addr m of + Just _ -> bindId m -- already used, retry + Nothing -> return (M.alter (insertId tid) addr m, tid) + + insertId tid Nothing = Just (M.singleton tid ares) + insertId tid (Just m) = Just (M.insert tid ares m) + +commitTransaction :: Manager -> SockAddr -> TransactionId -> Response -> IO () +commitTransaction Manager {..} addr tid resp = + modifyMVar_ pendingResps $ \ m -> do + case M.lookup tid =<< M.lookup addr m of + Nothing -> return m -- tracker responded after 'cancelTransaction' fired + Just ares -> do + putMVar ares resp + return $ M.update deleteId addr m + where + deleteId m + | M.null m' = Nothing + | otherwise = Just m' + where + m' = M.delete tid m + +cancelTransaction :: Manager -> SockAddr -> TransactionId -> IO () +cancelTransaction Manager {..} addr tid = + modifyMVar_ pendingResps $ \m -> + return $ M.update deleteId addr m + where + deleteId m + | M.null m' = Nothing + | otherwise = Just m' + where + m' = M.delete tid m + +listen :: Manager -> IO () +listen mgr @ Manager {..} = do + forever $ do + (bs, addr) <- BS.recvFrom sock (optMaxPacketSize options) + case decode bs of + Left _ -> return () + Right (TransactionR {..}) -> commitTransaction mgr addr transIdR response transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response -transaction m addr conn request = do - tid <- genTransactionId - let trans = TransactionQ (connectionId conn) tid request - res <- call m addr (encode trans) - case decode res of - Right (TransactionR {..}) - | tid == transIdR -> return response - | otherwise -> throwIO $ TransactionFailed - Left msg -> throwIO $ ParserFailure msg +transaction mgr @ Manager {..} addr conn request = do + ares <- newEmptyMVar + tid <- allocTransaction mgr addr ares + performTransaction tid ares + `onException` cancelTransaction mgr addr tid + where + performTransaction tid ares = do + let trans = TransactionQ (connectionId conn) tid request + BS.sendAllTo sock (encode trans) addr + takeMVar ares {----------------------------------------------------------------------- -- Connection cache -- cgit v1.2.3