From 5d707daf963bdd13c1e3c16a2abf2006764ed170 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Thu, 2 Jan 2014 17:51:58 +0400 Subject: Use optTimeout option in each rpc call --- src/Network/BitTorrent/DHT/Session.hs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) (limited to 'src/Network/BitTorrent/DHT') diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index ac285bdc..a76880d7 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -48,6 +48,7 @@ import Control.Monad.Reader import Control.Monad.Trans.Control import Control.Monad.Trans.Resource import Data.Default +import Data.Fixed import Data.Hashable import Data.List as L import Data.Monoid @@ -57,6 +58,7 @@ import Data.Time import Data.Time.Clock.POSIX import System.Log.FastLogger import System.Random (randomIO) +import System.Timeout.Lifted import Text.PrettyPrint as PP hiding ((<>)) import Text.PrettyPrint.Class @@ -101,6 +103,11 @@ instance Default Options where , optTimeout = 5 -- seconds } +milliseconds :: NominalDiffTime -> Int +milliseconds dt = fromEnum millis + where + millis = realToFrac dt :: Milli + {----------------------------------------------------------------------- -- Tokens policy -----------------------------------------------------------------------} @@ -185,11 +192,9 @@ runDHT handlers opts naddr action = runResourceT $ do -- Routing -----------------------------------------------------------------------} --- TODO fork? routing :: Address ip => Routing ip a -> DHT ip (Maybe a) routing = runRouting ping refreshNodes getTimestamp --- TODO add timeout ping :: Address ip => NodeAddr ip -> DHT ip Bool ping addr = do $(logDebugS) "routing.questionable_node" (T.pack (render (pretty addr))) @@ -250,6 +255,7 @@ getTable = do var <- asks routingTable liftIO (readMVar var) +-- FIXME no blocking getNodeId :: DHT ip NodeId getNodeId = thisId <$> getTable @@ -305,6 +311,7 @@ getPeerList ih = do -- Messaging -----------------------------------------------------------------------} +-- | Throws exception if node is not responding. (<@>) :: forall a b ip. Address ip => KRPC (Query a) (Response b) => a -> NodeAddr ip -> DHT ip b q <@> addr = do @@ -313,11 +320,19 @@ q <@> addr = do let Method name = method :: Method (Query a) (Response b) let signature = T.decodeUtf8 name <> " @ " <> T.pack (render (pretty addr)) $(logDebugS) "queryNode" $ "Query sent | " <> signature - Response remoteId r <- query (toSockAddr addr) (Query nid q) - $(logDebugS) "queryNode" $ "Query recv | " <> signature - insertNode (NodeInfo remoteId addr) - return r + interval <- asks (milliseconds . optTimeout . options) + result <- timeout interval $ query (toSockAddr addr) (Query nid q) + case result of + Nothing -> do + $(logWarnS) "queryNode" $ "not responding @ " + <> T.pack (show (pretty addr)) + throwIO $ KError GenericError "timeout expired" "" + + Just (Response remoteId r) -> do + $(logDebugS) "queryNode" $ "Query recv | " <> signature + insertNode (NodeInfo remoteId addr) + return r type NodeHandler ip = Handler (DHT ip) -- cgit v1.2.3