From a7aa11359b5964bbd984b5e3aa66cf78664035b1 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 29 Dec 2013 08:06:09 +0400 Subject: Keep routing table in MVar. Since table updates is not atomic operations and may perform IO we should be able to gain /exclusive/ access to the table. Consider insertNode function: 1) Thread A read table_1; 2) Thread B read table_1; 3) Thread B ping node Z and it does not respond and get removed; 4) Thread B remove node Z from table_1 and get table_2; 5) Thread B put table_2; 6) Thread A insert a new node and get table_3; 4) Thread A put table_3; The problem is that final table_3 do have the removed node. At the moment, exclusive access solves this problem. --- src/Network/BitTorrent/DHT/Session.hs | 37 ++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 9db5947a..4ac1bee9 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -33,6 +33,7 @@ module Network.BitTorrent.DHT.Session import Control.Applicative import Control.Concurrent.STM +import Control.Concurrent.Lifted import Control.Exception.Lifted hiding (Handler) import Control.Monad.Base import Control.Monad.Logger @@ -93,7 +94,7 @@ invalidateTokens curTime ts @ SessionTokens {..} data Node ip = Node { manager :: !(Manager (DHT ip)) - , routingTable :: !(TVar (Table ip)) + , routingTable :: !(MVar (Table ip)) , contactInfo :: !(TVar (PeerStore ip)) , sessionTokens :: !(TVar SessionTokens) , loggerFun :: !(Loc -> LogSource -> LogLevel -> LogStr -> IO ()) @@ -134,7 +135,7 @@ runDHT naddr handlers action = runResourceT $ do (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager myId <- liftIO genNodeId node <- liftIO $ Node m - <$> newTVarIO (nullTable myId) + <$> newMVar (nullTable myId) <*> newTVarIO def <*> (newTVarIO =<< nullSessionTokens) <*> pure logger @@ -204,12 +205,7 @@ checkToken addr questionableToken = do getTable :: DHT ip (Table ip) getTable = do var <- asks routingTable - liftIO (readTVarIO var) - -putTable :: Table ip -> DHT ip () -putTable table = do - var <- asks routingTable - liftIO (atomically (writeTVar var table)) + liftIO (readMVar var) getNodeId :: DHT ip NodeId getNodeId = thisId <$> getTable @@ -220,16 +216,21 @@ getClosest nid = kclosest 8 nid <$> getTable getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip] getClosestHash ih = kclosestHash 8 ih <$> getTable -insertNode :: Address ip => NodeInfo ip -> DHT ip () -insertNode info = do - t <- getTable - mt <- routing (R.insert info t) - case mt of - Nothing -> $(logDebugS) "insertNode" "Routing table is full" - Just t' -> do - putTable t' - let logMsg = "Routing table updated: " <> pretty t <> " -> " <> pretty t' - $(logDebugS) "insertNode" (T.pack (render logMsg)) +-- FIXME some nodes can be ommited +insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId +insertNode info = fork $ do + var <- asks routingTable + modifyMVar_ var $ \ t -> do + result <- routing (R.insert info t) + case result of + Nothing -> do + $(logDebugS) "insertNode" $ "Routing table is full: " + <> T.pack (show (pretty t)) + return t + Just t' -> do + let logMsg = "Routing table updated: " <> pretty t <> " -> " <> pretty t' + $(logDebugS) "insertNode" (T.pack (render logMsg)) + return t' {----------------------------------------------------------------------- -- Peer storage -- cgit v1.2.3