diff options
Diffstat (limited to 'src/Network/BitTorrent/Tracker/RPC/UDP.hs')
-rw-r--r-- | src/Network/BitTorrent/Tracker/RPC/UDP.hs | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/src/Network/BitTorrent/Tracker/RPC/UDP.hs index 0e592398..1ea47100 100644 --- a/src/Network/BitTorrent/Tracker/RPC/UDP.hs +++ b/src/Network/BitTorrent/Tracker/RPC/UDP.hs | |||
@@ -43,6 +43,7 @@ import Data.Text as T | |||
43 | import Data.Text.Encoding | 43 | import Data.Text.Encoding |
44 | import Data.Time | 44 | import Data.Time |
45 | import Data.Time.Clock.POSIX | 45 | import Data.Time.Clock.POSIX |
46 | import Data.Traversable | ||
46 | import Data.Typeable | 47 | import Data.Typeable |
47 | import Data.Word | 48 | import Data.Word |
48 | import Text.Read (readMaybe) | 49 | import Text.Read (readMaybe) |
@@ -94,8 +95,10 @@ instance Default Options where | |||
94 | -----------------------------------------------------------------------} | 95 | -----------------------------------------------------------------------} |
95 | 96 | ||
96 | type ConnectionCache = Map SockAddr Connection | 97 | type ConnectionCache = Map SockAddr Connection |
97 | type PendingTransactions = Map TransactionId (MVar Response) | 98 | |
98 | type PendingQueries = Map SockAddr PendingTransactions | 99 | type PendingResponse = MVar (Either RpcException Response) |
100 | type PendingTransactions = Map TransactionId PendingResponse | ||
101 | type PendingQueries = Map SockAddr PendingTransactions | ||
99 | 102 | ||
100 | data Manager = Manager | 103 | data Manager = Manager |
101 | { options :: !Options | 104 | { options :: !Options |
@@ -113,12 +116,18 @@ initManager opts = Manager opts | |||
113 | <*> newMVar M.empty | 116 | <*> newMVar M.empty |
114 | <*> newEmptyMVar | 117 | <*> newEmptyMVar |
115 | 118 | ||
119 | unblockAll :: PendingQueries -> IO () | ||
120 | unblockAll m = traverse (traverse unblock) m >> return () | ||
121 | where | ||
122 | unblock ares = putMVar ares (Left ManagerClosed) | ||
123 | |||
116 | resetState :: Manager -> IO () | 124 | resetState :: Manager -> IO () |
117 | resetState Manager {..} = do | 125 | resetState Manager {..} = do |
118 | writeIORef connectionCache err | 126 | writeIORef connectionCache err |
119 | _ <-swapMVar pendingResps err | 127 | m <- swapMVar pendingResps err |
120 | m <- tryTakeMVar listenerThread | 128 | unblockAll m |
121 | case m of | 129 | mtid <- tryTakeMVar listenerThread |
130 | case mtid of | ||
122 | Nothing -> return () -- thread killed by 'closeManager' | 131 | Nothing -> return () -- thread killed by 'closeManager' |
123 | Just _ -> return () -- thread killed by exception from 'listen' | 132 | Just _ -> return () -- thread killed by exception from 'listen' |
124 | return () | 133 | return () |
@@ -165,6 +174,9 @@ data RpcException | |||
165 | 174 | ||
166 | -- | RPC succeed, but tracker respond with error code. | 175 | -- | RPC succeed, but tracker respond with error code. |
167 | | QueryFailed Text | 176 | | QueryFailed Text |
177 | |||
178 | -- | RPC manager closed while waiting for response. | ||
179 | | ManagerClosed | ||
168 | deriving (Show, Typeable) | 180 | deriving (Show, Typeable) |
169 | 181 | ||
170 | instance Exception RpcException | 182 | instance Exception RpcException |
@@ -365,7 +377,7 @@ isExpired Connection {..} = do | |||
365 | -- Transactions | 377 | -- Transactions |
366 | -----------------------------------------------------------------------} | 378 | -----------------------------------------------------------------------} |
367 | 379 | ||
368 | register :: SockAddr -> TransactionId -> MVar Response | 380 | register :: SockAddr -> TransactionId -> PendingResponse |
369 | -> PendingQueries -> PendingQueries | 381 | -> PendingQueries -> PendingQueries |
370 | register addr tid ares = M.alter insertId addr | 382 | register addr tid ares = M.alter insertId addr |
371 | where | 383 | where |
@@ -399,7 +411,7 @@ commitTransaction Manager {..} addr tid resp = | |||
399 | case M.lookup tid =<< M.lookup addr m of | 411 | case M.lookup tid =<< M.lookup addr m of |
400 | Nothing -> return m -- tracker responded after 'cancelTransaction' fired | 412 | Nothing -> return m -- tracker responded after 'cancelTransaction' fired |
401 | Just ares -> do | 413 | Just ares -> do |
402 | putMVar ares resp | 414 | putMVar ares (Right resp) |
403 | return $ unregister addr tid m | 415 | return $ unregister addr tid m |
404 | 416 | ||
405 | -- | Abort transaction forcefully. | 417 | -- | Abort transaction forcefully. |
@@ -429,7 +441,7 @@ transaction mgr @ Manager {..} addr conn request = do | |||
429 | performTransaction tid ares = do | 441 | performTransaction tid ares = do |
430 | let trans = TransactionQ (connectionId conn) tid request | 442 | let trans = TransactionQ (connectionId conn) tid request |
431 | BS.sendAllTo sock (encode trans) addr | 443 | BS.sendAllTo sock (encode trans) addr |
432 | takeMVar ares | 444 | takeMVar ares >>= either throwIO return |
433 | 445 | ||
434 | {----------------------------------------------------------------------- | 446 | {----------------------------------------------------------------------- |
435 | -- Connection cache | 447 | -- Connection cache |