diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2013-12-29 08:06:09 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2013-12-29 08:06:09 +0400 |
commit | a7aa11359b5964bbd984b5e3aa66cf78664035b1 (patch) | |
tree | 50de4a5f72157e2e69a205ebff497fb0e1787f22 /src/Network/BitTorrent/DHT/Session.hs | |
parent | 4a46766d5fb0882151e80f9137983a8c2dfb7869 (diff) |
Keep routing table in MVar.
Since table updates is not atomic operations and may perform IO we should
be able to gain /exclusive/ access to the table. Consider insertNode
function:
1) Thread A read table_1;
2) Thread B read table_1;
3) Thread B ping node Z and it does not respond and get removed;
4) Thread B remove node Z from table_1 and get table_2;
5) Thread B put table_2;
6) Thread A insert a new node and get table_3;
4) Thread A put table_3;
The problem is that final table_3 do have the removed node. At the
moment, exclusive access solves this problem.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 37 |
1 files changed, 19 insertions, 18 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 9db5947a..4ac1bee9 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -33,6 +33,7 @@ module Network.BitTorrent.DHT.Session | |||
33 | 33 | ||
34 | import Control.Applicative | 34 | import Control.Applicative |
35 | import Control.Concurrent.STM | 35 | import Control.Concurrent.STM |
36 | import Control.Concurrent.Lifted | ||
36 | import Control.Exception.Lifted hiding (Handler) | 37 | import Control.Exception.Lifted hiding (Handler) |
37 | import Control.Monad.Base | 38 | import Control.Monad.Base |
38 | import Control.Monad.Logger | 39 | import Control.Monad.Logger |
@@ -93,7 +94,7 @@ invalidateTokens curTime ts @ SessionTokens {..} | |||
93 | 94 | ||
94 | data Node ip = Node | 95 | data Node ip = Node |
95 | { manager :: !(Manager (DHT ip)) | 96 | { manager :: !(Manager (DHT ip)) |
96 | , routingTable :: !(TVar (Table ip)) | 97 | , routingTable :: !(MVar (Table ip)) |
97 | , contactInfo :: !(TVar (PeerStore ip)) | 98 | , contactInfo :: !(TVar (PeerStore ip)) |
98 | , sessionTokens :: !(TVar SessionTokens) | 99 | , sessionTokens :: !(TVar SessionTokens) |
99 | , loggerFun :: !(Loc -> LogSource -> LogLevel -> LogStr -> IO ()) | 100 | , loggerFun :: !(Loc -> LogSource -> LogLevel -> LogStr -> IO ()) |
@@ -134,7 +135,7 @@ runDHT naddr handlers action = runResourceT $ do | |||
134 | (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager | 135 | (_, m) <- allocate (newManager (toSockAddr naddr) handlers) closeManager |
135 | myId <- liftIO genNodeId | 136 | myId <- liftIO genNodeId |
136 | node <- liftIO $ Node m | 137 | node <- liftIO $ Node m |
137 | <$> newTVarIO (nullTable myId) | 138 | <$> newMVar (nullTable myId) |
138 | <*> newTVarIO def | 139 | <*> newTVarIO def |
139 | <*> (newTVarIO =<< nullSessionTokens) | 140 | <*> (newTVarIO =<< nullSessionTokens) |
140 | <*> pure logger | 141 | <*> pure logger |
@@ -204,12 +205,7 @@ checkToken addr questionableToken = do | |||
204 | getTable :: DHT ip (Table ip) | 205 | getTable :: DHT ip (Table ip) |
205 | getTable = do | 206 | getTable = do |
206 | var <- asks routingTable | 207 | var <- asks routingTable |
207 | liftIO (readTVarIO var) | 208 | liftIO (readMVar var) |
208 | |||
209 | putTable :: Table ip -> DHT ip () | ||
210 | putTable table = do | ||
211 | var <- asks routingTable | ||
212 | liftIO (atomically (writeTVar var table)) | ||
213 | 209 | ||
214 | getNodeId :: DHT ip NodeId | 210 | getNodeId :: DHT ip NodeId |
215 | getNodeId = thisId <$> getTable | 211 | getNodeId = thisId <$> getTable |
@@ -220,16 +216,21 @@ getClosest nid = kclosest 8 nid <$> getTable | |||
220 | getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip] | 216 | getClosestHash :: Eq ip => InfoHash -> DHT ip [NodeInfo ip] |
221 | getClosestHash ih = kclosestHash 8 ih <$> getTable | 217 | getClosestHash ih = kclosestHash 8 ih <$> getTable |
222 | 218 | ||
223 | insertNode :: Address ip => NodeInfo ip -> DHT ip () | 219 | -- FIXME some nodes can be ommited |
224 | insertNode info = do | 220 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId |
225 | t <- getTable | 221 | insertNode info = fork $ do |
226 | mt <- routing (R.insert info t) | 222 | var <- asks routingTable |
227 | case mt of | 223 | modifyMVar_ var $ \ t -> do |
228 | Nothing -> $(logDebugS) "insertNode" "Routing table is full" | 224 | result <- routing (R.insert info t) |
229 | Just t' -> do | 225 | case result of |
230 | putTable t' | 226 | Nothing -> do |
231 | let logMsg = "Routing table updated: " <> pretty t <> " -> " <> pretty t' | 227 | $(logDebugS) "insertNode" $ "Routing table is full: " |
232 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | 228 | <> T.pack (show (pretty t)) |
229 | return t | ||
230 | Just t' -> do | ||
231 | let logMsg = "Routing table updated: " <> pretty t <> " -> " <> pretty t' | ||
232 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | ||
233 | return t' | ||
233 | 234 | ||
234 | {----------------------------------------------------------------------- | 235 | {----------------------------------------------------------------------- |
235 | -- Peer storage | 236 | -- Peer storage |