diff options
author | joe <joe@jerkface.net> | 2017-01-05 12:18:43 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-05 12:18:43 -0500 |
commit | 5e2f43d967aa2d07368b7d5552f65a69b3979ab5 (patch) | |
tree | 7c84933442cdcd4b6e52c644842e64e34ae906b0 /src/Network/BitTorrent/DHT/Query.hs | |
parent | 990296703f511efe2bc2899d514dbe2a20247c88 (diff) |
Routing Table : use STM and per-bucket ping queues
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 7f20ad6d..99d8cdaf 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -73,6 +73,7 @@ import Network.BitTorrent.Address | |||
73 | import Network.BitTorrent.DHT.Message | 73 | import Network.BitTorrent.DHT.Message |
74 | import Network.BitTorrent.DHT.Routing as R | 74 | import Network.BitTorrent.DHT.Routing as R |
75 | import Network.BitTorrent.DHT.Session | 75 | import Network.BitTorrent.DHT.Session |
76 | import Control.Concurrent.STM | ||
76 | 77 | ||
77 | {----------------------------------------------------------------------- | 78 | {----------------------------------------------------------------------- |
78 | -- Handlers | 79 | -- Handlers |
@@ -104,7 +105,10 @@ findNodeH = nodeHandler $ \ _ (FindNode nid) -> do | |||
104 | -- | Default 'GetPeers' handler. | 105 | -- | Default 'GetPeers' handler. |
105 | getPeersH :: Address ip => NodeHandler ip | 106 | getPeersH :: Address ip => NodeHandler ip |
106 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do | 107 | getPeersH = nodeHandler $ \ naddr (GetPeers ih) -> do |
107 | GotPeers <$> getPeerList ih <*> grantToken naddr | 108 | ps <- getPeerList ih |
109 | tok <- grantToken naddr | ||
110 | $(logDebugS) "getPeersH" $ "INFO-HASH " <> T.pack (show (ih,fmap fromAddr naddr :: NodeAddr (Maybe IP))) | ||
111 | return $ GotPeers ps tok | ||
108 | 112 | ||
109 | -- | Default 'Announce' handler. | 113 | -- | Default 'Announce' handler. |
110 | announceH :: Address ip => NodeHandler ip | 114 | announceH :: Address ip => NodeHandler ip |
@@ -236,20 +240,33 @@ refreshNodes nid = do | |||
236 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId | 240 | insertNode :: Address ip => NodeInfo ip -> DHT ip ThreadId |
237 | insertNode info = fork $ do | 241 | insertNode info = fork $ do |
238 | var <- asks routingTable | 242 | var <- asks routingTable |
239 | t <- takeMVar var | 243 | tm <- getTimestamp |
240 | t' <- do -- modifyMVar_ var $ \ t -> do | 244 | let showTable = do |
241 | result <- routing (R.insert info t) | 245 | t <- liftIO $ atomically $ readTVar var |
242 | case result of | 246 | let logMsg = "Routing table: " <> pPrint t |
243 | Nothing -> do | 247 | $(logDebugS) "insertNode" (T.pack (render logMsg)) |
244 | $(logDebugS) "insertNode" $ "Routing table is full: " | 248 | t <- liftIO $ atomically $ readTVar var |
245 | <> T.pack (show (pPrint t)) | 249 | let arrival = TryInsert info |
246 | return t | 250 | arrival4 = TryInsert (fmap fromAddr info) :: Event (Maybe IPv4) |
247 | Just t' -> do | 251 | $(logDebugS) "insertNode" $ T.pack (show arrival4) |
248 | let logMsg = "Routing table updated: " | 252 | ps <- liftIO $ atomically $ do |
249 | <> pPrint t <> " -> " <> pPrint t' | 253 | t <- readTVar var |
250 | $(logDebugS) "insertNode" (T.pack (render logMsg)) | 254 | (ps,t') <- R.insert tm arrival t |
251 | return t' | 255 | writeTVar var t' |
252 | putMVar var t' | 256 | return ps |
257 | showTable | ||
258 | fork $ forM_ ps $ \(CheckPing ns)-> do | ||
259 | forM_ ns $ \n -> do | ||
260 | alive <- PingResult n <$> probeNode (nodeAddr n) | ||
261 | let PingResult _ b = alive | ||
262 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) | ||
263 | tm <- getTimestamp | ||
264 | liftIO $ atomically $ do | ||
265 | t <- readTVar var | ||
266 | (_,t') <- R.insert tm alive t | ||
267 | writeTVar var t' | ||
268 | showTable | ||
269 | return () | ||
253 | 270 | ||
254 | -- | Throws exception if node is not responding. | 271 | -- | Throws exception if node is not responding. |
255 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) | 272 | queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) |
@@ -268,9 +285,4 @@ q <@> addr = snd <$> queryNode addr q | |||
268 | {-# INLINE (<@>) #-} | 285 | {-# INLINE (<@>) #-} |
269 | 286 | ||
270 | restoreTable :: Address ip => Table ip -> DHT ip () | 287 | restoreTable :: Address ip => Table ip -> DHT ip () |
271 | restoreTable tbl = do | 288 | restoreTable tbl = mapM_ (insertNode . fst) $ L.concat $ R.toList tbl |
272 | tblvar <- asks routingTable | ||
273 | tbl0 <- liftIO $ takeMVar tblvar | ||
274 | mb <- routing $ merge tbl tbl0 | ||
275 | maybe (return ()) (liftIO . putMVar tblvar) mb | ||
276 | |||