From 30ba75b71f250d6080268513728df5ba3a2e8d61 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sat, 22 Feb 2014 00:21:15 +0400 Subject: Unblock pending transactions at closeManager --- src/Network/BitTorrent/Tracker/RPC/UDP.hs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'src/Network/BitTorrent/Tracker/RPC/UDP.hs') diff --git a/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/src/Network/BitTorrent/Tracker/RPC/UDP.hs index 0e592398..1ea47100 100644 --- a/src/Network/BitTorrent/Tracker/RPC/UDP.hs +++ b/src/Network/BitTorrent/Tracker/RPC/UDP.hs @@ -43,6 +43,7 @@ import Data.Text as T import Data.Text.Encoding import Data.Time import Data.Time.Clock.POSIX +import Data.Traversable import Data.Typeable import Data.Word import Text.Read (readMaybe) @@ -94,8 +95,10 @@ instance Default Options where -----------------------------------------------------------------------} type ConnectionCache = Map SockAddr Connection -type PendingTransactions = Map TransactionId (MVar Response) -type PendingQueries = Map SockAddr PendingTransactions + +type PendingResponse = MVar (Either RpcException Response) +type PendingTransactions = Map TransactionId PendingResponse +type PendingQueries = Map SockAddr PendingTransactions data Manager = Manager { options :: !Options @@ -113,12 +116,18 @@ initManager opts = Manager opts <*> newMVar M.empty <*> newEmptyMVar +unblockAll :: PendingQueries -> IO () +unblockAll m = traverse (traverse unblock) m >> return () + where + unblock ares = putMVar ares (Left ManagerClosed) + resetState :: Manager -> IO () resetState Manager {..} = do - writeIORef connectionCache err - _ <-swapMVar pendingResps err - m <- tryTakeMVar listenerThread - case m of + writeIORef connectionCache err + m <- swapMVar pendingResps err + unblockAll m + mtid <- tryTakeMVar listenerThread + case mtid of Nothing -> return () -- thread killed by 'closeManager' Just _ -> return () -- thread killed by exception from 'listen' return () @@ -165,6 +174,9 @@ data RpcException -- | RPC succeed, but tracker respond with error code. | QueryFailed Text + + -- | RPC manager closed while waiting for response. + | ManagerClosed deriving (Show, Typeable) instance Exception RpcException @@ -365,7 +377,7 @@ isExpired Connection {..} = do -- Transactions -----------------------------------------------------------------------} -register :: SockAddr -> TransactionId -> MVar Response +register :: SockAddr -> TransactionId -> PendingResponse -> PendingQueries -> PendingQueries register addr tid ares = M.alter insertId addr where @@ -399,7 +411,7 @@ commitTransaction Manager {..} addr tid resp = case M.lookup tid =<< M.lookup addr m of Nothing -> return m -- tracker responded after 'cancelTransaction' fired Just ares -> do - putMVar ares resp + putMVar ares (Right resp) return $ unregister addr tid m -- | Abort transaction forcefully. @@ -429,7 +441,7 @@ transaction mgr @ Manager {..} addr conn request = do performTransaction tid ares = do let trans = TransactionQ (connectionId conn) tid request BS.sendAllTo sock (encode trans) addr - takeMVar ares + takeMVar ares >>= either throwIO return {----------------------------------------------------------------------- -- Connection cache -- cgit v1.2.3