summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-12-27 01:20:59 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-01 23:28:00 -0500
commitd8a7ad88bfdb76b7c481c0ce89de63528a06e734 (patch)
tree621f3145cb3b08b5133229372501bef8c84a7cb6
parent9f33d972b60959d69318e5f243ffae4252d6d3f5 (diff)
Made the BucketRefresher state accessible from CommonAPI.
-rw-r--r--dht/examples/dhtd.hs26
-rw-r--r--kad/src/Network/Kademlia/Bootstrap.hs44
-rw-r--r--kad/src/Network/Kademlia/CommonAPI.hs8
-rw-r--r--kad/src/Network/Kademlia/Persistence.hs2
-rw-r--r--kad/src/Network/Kademlia/Search.hs9
5 files changed, 65 insertions, 24 deletions
diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs
index ef3f6bd4..4f83beb2 100644
--- a/dht/examples/dhtd.hs
+++ b/dht/examples/dhtd.hs
@@ -551,7 +551,7 @@ clientSession s@Session{..} sock cnum h = do
551 551
552 ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts 552 ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts
553 -> cmd0 $ do 553 -> cmd0 $ do
554 bkts <- atomically $ readTVar dhtBuckets 554 bkts <- atomically $ readTVar (refreshBuckets dhtBuckets)
555 let r = reportTable bkts 555 let r = reportTable bkts
556 hPutClient h $ 556 hPutClient h $
557 showReport $ 557 showReport $
@@ -562,7 +562,7 @@ clientSession s@Session{..} sock cnum h = do
562 ("r", s) | Just DHT{dhtQuery,dhtBuckets} <- Map.lookup netname dhts 562 ("r", s) | Just DHT{dhtQuery,dhtBuckets} <- Map.lookup netname dhts
563 , Just DHTQuery{qsearch} <- Map.lookup "node" dhtQuery 563 , Just DHTQuery{qsearch} <- Map.lookup "node" dhtQuery
564 -> cmd0 $ do 564 -> cmd0 $ do
565 ni <- atomically $ thisNode <$> readTVar dhtBuckets 565 ni <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets)
566 let kad = searchSpace qsearch 566 let kad = searchSpace qsearch
567 nid = kademliaLocation kad ni 567 nid = kademliaLocation kad ni
568 b = case readMaybe $ strp s of 568 b = case readMaybe $ strp s of
@@ -768,7 +768,7 @@ clientSession s@Session{..} sock cnum h = do
768 -- arguments: method 768 -- arguments: method
769 -- nid 769 -- nid
770 -- (optional dest-ni) 770 -- (optional dest-ni)
771 self <- atomically $ thisNode <$> readTVar dhtBuckets 771 self <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets)
772 let (method,xs) = break isSpace $ dropWhile isSpace s 772 let (method,xs) = break isSpace $ dropWhile isSpace s
773 (nidstr,ys) = break isSpace $ dropWhile isSpace xs 773 (nidstr,ys) = break isSpace $ dropWhile isSpace xs
774 destination = dropWhile isSpace ys 774 destination = dropWhile isSpace ys
@@ -819,7 +819,7 @@ clientSession s@Session{..} sock cnum h = do
819 -- data (jid or key) data 819 -- data (jid or key) data
820 -- dest-rendezvous(r) token 820 -- dest-rendezvous(r) token
821 -- (optional extra-text) (optional dest-ni) 821 -- (optional extra-text) (optional dest-ni)
822 self <- atomically $ thisNode <$> readTVar dhtBuckets 822 self <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets)
823 let (method,xs) = break isSpace $ dropWhile isSpace s 823 let (method,xs) = break isSpace $ dropWhile isSpace s
824 (dtastr,ys) = break isSpace $ dropWhile isSpace xs 824 (dtastr,ys) = break isSpace $ dropWhile isSpace xs
825 (tokenstr,zs) = break isSpace $ dropWhile isSpace ys 825 (tokenstr,zs) = break isSpace $ dropWhile isSpace ys
@@ -941,7 +941,7 @@ clientSession s@Session{..} sock cnum h = do
941 (\nid -> R.kclosest (searchSpace qsearch) 941 (\nid -> R.kclosest (searchSpace qsearch)
942 (searchK qsearch) 942 (searchK qsearch)
943 nid 943 nid
944 <$> readTVar dhtBuckets) 944 <$> readTVar (refreshBuckets dhtBuckets))
945 (announceTarget dta) 945 (announceTarget dta)
946 announceInterval) 946 announceInterval)
947 dta 947 dta
@@ -967,7 +967,7 @@ clientSession s@Session{..} sock cnum h = do
967 (\nid -> R.kclosest (searchSpace qsearch) 967 (\nid -> R.kclosest (searchSpace qsearch)
968 (searchK qsearch) 968 (searchK qsearch)
969 nid 969 nid
970 <$> readTVar dhtBuckets) 970 <$> readTVar (refreshBuckets dhtBuckets))
971 (announceTarget dta) 971 (announceTarget dta)
972 announceInterval) 972 announceInterval)
973 dta 973 dta
@@ -1009,7 +1009,7 @@ clientSession s@Session{..} sock cnum h = do
1009 join $ atomically $ do 1009 join $ atomically $ do
1010 schs <- readTVar dhtSearches 1010 schs <- readTVar dhtSearches
1011 case Map.lookup (method,nid) schs of 1011 case Map.lookup (method,nid) schs of
1012 Nothing -> do forkSearch method nid qry dhtSearches dhtBuckets tid kvar 1012 Nothing -> do forkSearch method nid qry dhtSearches (refreshBuckets dhtBuckets) tid kvar
1013 return $ presentSearches 1013 return $ presentSearches
1014 Just sch -> do writeTVar kvar (Just $ return ()) 1014 Just sch -> do writeTVar kvar (Just $ return ())
1015 return $ reportSearchResults method h sch 1015 return $ reportSearchResults method h sch
@@ -1614,7 +1614,7 @@ initTox opts ssvar keysdb mbxmpp invc = case porttox opts of
1614 tcpclient = tcpKademliaClient $ Tox.toxOnionRoutes tox 1614 tcpclient = tcpKademliaClient $ Tox.toxOnionRoutes tox
1615 tcpRefresher = tcpBucketRefresher $ Tox.toxOnionRoutes tox 1615 tcpRefresher = tcpBucketRefresher $ Tox.toxOnionRoutes tox
1616 tcpDHT = DHT 1616 tcpDHT = DHT
1617 { dhtBuckets = refreshBuckets tcpRefresher 1617 { dhtBuckets = tcpRefresher
1618 , dhtSecretKey = return $ Just $ transportSecret (Tox.toxCryptoKeys tox) 1618 , dhtSecretKey = return $ Just $ transportSecret (Tox.toxCryptoKeys tox)
1619 , dhtPing = Map.singleton "ping" DHTPing 1619 , dhtPing = Map.singleton "ping" DHTPing
1620 { pingQuery = noArgPing $ TCP.tcpPing (TCP.tcpClient tcpclient) 1620 { pingQuery = noArgPing $ TCP.tcpPing (TCP.tcpClient tcpclient)
@@ -1639,9 +1639,9 @@ initTox opts ssvar keysdb mbxmpp invc = case porttox opts of
1639 , dhtShowHexId = Just Tox.showHexId 1639 , dhtShowHexId = Just Tox.showHexId
1640 } 1640 }
1641 dhts = Map.fromList $ 1641 dhts = Map.fromList $
1642 ("tox4", toxDHT Tox.routing4 Want_IP4) 1642 ("tox4", toxDHT Tox.refresher4 Want_IP4)
1643 : (if ip6tox opts 1643 : (if ip6tox opts
1644 then ( ("tox6", toxDHT Tox.routing6 Want_IP6) :) 1644 then ( ("tox6", toxDHT Tox.refresher6 Want_IP6) :)
1645 else id) 1645 else id)
1646 (if enableTCPDHT opts 1646 (if enableTCPDHT opts
1647 then [ ("toxtcp", tcpDHT) ] 1647 then [ ("toxtcp", tcpDHT) ]
@@ -1674,7 +1674,7 @@ initJabber opts ssvar announcer mbtox toxchat = case portxmpp opts of
1674 let lookupBkts :: String -> Map.Map String DHT -> Maybe (String,TVar (BucketList Tox.NodeInfo)) 1674 let lookupBkts :: String -> Map.Map String DHT -> Maybe (String,TVar (BucketList Tox.NodeInfo))
1675 lookupBkts name m = case Map.lookup name m of 1675 lookupBkts name m = case Map.lookup name m of
1676 Nothing -> Nothing 1676 Nothing -> Nothing
1677 Just DHT{dhtBuckets} -> cast (name, dhtBuckets) 1677 Just DHT{dhtBuckets} -> cast (name, refreshBuckets dhtBuckets)
1678 sv <- xmppServer Tcp.noCleanUp (Just sport) 1678 sv <- xmppServer Tcp.noCleanUp (Just sport)
1679 tcp <- xmppConnections sv -- :: IO ( Manager TCPStatus T.Text ) 1679 tcp <- xmppConnections sv -- :: IO ( Manager TCPStatus T.Text )
1680 let tman = toxman ssvar announcer <$> mbtox 1680 let tman = toxman ssvar announcer <$> mbtox
@@ -1795,9 +1795,9 @@ main = do
1795 , dhtShowHexId = Nothing 1795 , dhtShowHexId = Nothing
1796 } 1796 }
1797 dhts = Map.fromList $ 1797 dhts = Map.fromList $
1798 ("bt4", mainlineDHT Mainline.routing4 Want_IP4) 1798 ("bt4", mainlineDHT Mainline.refresher4 Want_IP4)
1799 : if ip6bt opts 1799 : if ip6bt opts
1800 then [ ("bt6", mainlineDHT Mainline.routing6 Want_IP6) ] 1800 then [ ("bt6", mainlineDHT Mainline.refresher6 Want_IP6) ]
1801 else [] 1801 else []
1802 ips :: IO [SockAddr] 1802 ips :: IO [SockAddr]
1803 ips = readExternals Mainline.nodeAddr 1803 ips = readExternals Mainline.nodeAddr
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs
index 08ba3318..c07b3c6c 100644
--- a/kad/src/Network/Kademlia/Bootstrap.hs
+++ b/kad/src/Network/Kademlia/Bootstrap.hs
@@ -16,6 +16,8 @@
16module Network.Kademlia.Bootstrap where 16module Network.Kademlia.Bootstrap where
17 17
18import Data.Function 18import Data.Function
19import qualified Data.IntMap.Strict as IntMap
20 ;import Data.IntMap.Strict (IntMap)
19import Data.Maybe 21import Data.Maybe
20import qualified Data.Set as Set 22import qualified Data.Set as Set
21import Data.Time.Clock.POSIX (getPOSIXTime) 23import Data.Time.Clock.POSIX (getPOSIXTime)
@@ -71,10 +73,12 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
71 , -- | Timestamp of last bucket event. 73 , -- | Timestamp of last bucket event.
72 refreshLastTouch :: TVar POSIXTime 74 refreshLastTouch :: TVar POSIXTime
73 , -- | This variable indicates whether or not we are in bootstrapping mode. 75 , -- | This variable indicates whether or not we are in bootstrapping mode.
74 bootstrapMode :: TVar Bool 76 bootstrapMode :: TVar Bool
75 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on 77 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on
76 -- every finished refresh. 78 -- every finished refresh.
77 bootstrapCountdown :: TVar (Maybe Int) 79 bootstrapCountdown :: TVar (Maybe Int)
80 -- | Internal state of background searches. Exposed for debugging purposes.
81 , refreshState :: TVar (IntMap [BucketSearch nid ni])
78 } 82 }
79 83
80newBucketRefresher :: ( Ord addr, Hashable addr 84newBucketRefresher :: ( Ord addr, Hashable addr
@@ -91,6 +95,7 @@ newBucketRefresher bkts sch ping = do
91 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... 95 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
92 bootstrapVar <- newTVar True -- Start in bootstrapping mode. 96 bootstrapVar <- newTVar True -- Start in bootstrapping mode.
93 bootstrapCnt <- newTVar Nothing 97 bootstrapCnt <- newTVar Nothing
98 st <- newTVar IntMap.empty
94 return BucketRefresher 99 return BucketRefresher
95 { refreshInterval = 15 * 60 100 { refreshInterval = 15 * 60
96 , refreshQueue = sched 101 , refreshQueue = sched
@@ -100,6 +105,7 @@ newBucketRefresher bkts sch ping = do
100 , refreshLastTouch = lasttouch 105 , refreshLastTouch = lasttouch
101 , bootstrapMode = bootstrapVar 106 , bootstrapMode = bootstrapVar
102 , bootstrapCountdown = bootstrapCnt 107 , bootstrapCountdown = bootstrapCnt
108 , refreshState = st
103 } 109 }
104 110
105-- | This was added to avoid the compile error "Record update for 111-- | This was added to avoid the compile error "Record update for
@@ -118,6 +124,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
118 , refreshLastTouch = refreshLastTouch 124 , refreshLastTouch = refreshLastTouch
119 , bootstrapMode = bootstrapMode 125 , bootstrapMode = bootstrapMode
120 , bootstrapCountdown = bootstrapCountdown 126 , bootstrapCountdown = bootstrapCountdown
127 , refreshState = refreshState
121 } 128 }
122 129
123-- | Fork a refresh loop. Kill the returned thread to terminate it. 130-- | Fork a refresh loop. Kill the returned thread to terminate it.
@@ -228,10 +235,29 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown
228 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." 235 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
229 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) 236 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
230 237
238data BucketSearch nid ni = forall addr tok. BucketSearch
239 { bucketSample :: nid
240 , bucketResults :: TVar (Set.Set ni)
241 , bucketFinFlag :: TVar Bool
242 , bucketState :: SearchState nid addr tok ni ni
243 , bucketThread :: ThreadId
244 }
245
246removeBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni]
247removeBucketState bst Nothing = Nothing
248removeBucketState bst (Just xs) = case filter (\b -> bucketThread b /= bucketThread bst) xs of
249 [] -> Nothing
250 ys -> Just ys
251
252insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni]
253insertBucketState bst Nothing = Just [bst]
254insertBucketState bst (Just xs) = Just (bst : xs)
255
231refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => 256refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
232 BucketRefresher nid ni -> Int -> IO Int 257 BucketRefresher nid ni -> Int -> IO Int
233refreshBucket r@BucketRefresher{ refreshSearch = sch 258refreshBucket r@BucketRefresher{ refreshSearch = sch
234 , refreshBuckets = var } 259 , refreshBuckets = var
260 , refreshState = rstate }
235 n = do 261 n = do
236 tbl <- atomically (readTVar var) 262 tbl <- atomically (readTVar var)
237 let count = bktCount tbl 263 let count = bktCount tbl
@@ -248,13 +274,21 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch
248 dput XRefresh $ "Start refresh " ++ show (n,sample) 274 dput XRefresh $ "Start refresh " ++ show (n,sample)
249 275
250 -- Set 15 minute timeout in order to avoid overlapping refreshes. 276 -- Set 15 minute timeout in order to avoid overlapping refreshes.
251 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount 277 (s,thread) <- search sch tbl sample $ if n+1 == R.defaultBucketCount
252 then const $ return True -- Never short-circuit the last bucket. 278 then const $ return True -- Never short-circuit the last bucket.
253 else checkBucketFull (searchSpace sch) var resultCounter fin n 279 else checkBucketFull (searchSpace sch) var resultCounter fin n
280 let bstate = BucketSearch sample resultCounter fin s thread
281 atomically $ modifyTVar' rstate $ IntMap.alter (insertBucketState bstate) n
254 _ <- timeout (15*60*1000000) $ do 282 _ <- timeout (15*60*1000000) $ do
255 atomically $ searchIsFinished s >>= check 283 atomically $ searchIsFinished s >>= check
256 atomically $ searchCancel s 284 atomically $ searchCancel s
257 dput XDHT $ "Finish refresh " ++ show (n,sample) 285 dput XDHT $ "Finish refresh " ++ show (n,sample)
286 bg <- forkIO $ do
287 atomically $ do
288 searchIsFinished s >>= check
289 modifyTVar' rstate $ IntMap.alter (removeBucketState bstate) n
290
291 labelThread bg ("backgrounded." ++ show n ++ "." ++ show sample)
258 now <- getPOSIXTime 292 now <- getPOSIXTime
259 join $ atomically $ onFinishedRefresh r n now 293 join $ atomically $ onFinishedRefresh r n now
260 rcount <- atomically $ do 294 rcount <- atomically $ do
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs
index 4714cecc..6d3fd16c 100644
--- a/kad/src/Network/Kademlia/CommonAPI.hs
+++ b/kad/src/Network/Kademlia/CommonAPI.hs
@@ -1,5 +1,8 @@
1{-# LANGUAGE ExistentialQuantification #-} 1{-# LANGUAGE ExistentialQuantification #-}
2module Network.Kademlia.CommonAPI where 2module Network.Kademlia.CommonAPI
3 ( module Network.Kademlia.CommonAPI
4 , refreshBuckets
5 ) where
3 6
4 7
5import Control.Concurrent 8import Control.Concurrent
@@ -12,6 +15,7 @@ import qualified Data.Set as Set
12import Data.Time.Clock.POSIX 15import Data.Time.Clock.POSIX
13import Data.Typeable 16import Data.Typeable
14 17
18import Network.Kademlia.Bootstrap
15import Network.Kademlia.Search 19import Network.Kademlia.Search
16import Network.Kademlia.Routing as R 20import Network.Kademlia.Routing as R
17import Crypto.Tox (SecretKey,PublicKey) 21import Crypto.Tox (SecretKey,PublicKey)
@@ -29,7 +33,7 @@ data DHT = forall nid ni. ( Show ni
29 , S.Serialize nid 33 , S.Serialize nid
30 ) => 34 ) =>
31 DHT 35 DHT
32 { dhtBuckets :: TVar (BucketList ni) 36 { dhtBuckets :: BucketRefresher nid ni
33 , dhtSecretKey :: STM (Maybe SecretKey) 37 , dhtSecretKey :: STM (Maybe SecretKey)
34 , dhtPing :: Map.Map String (DHTPing ni) 38 , dhtPing :: Map.Map String (DHTPing ni)
35 , dhtQuery :: Map.Map String (DHTQuery nid ni) 39 , dhtQuery :: Map.Map String (DHTQuery nid ni)
diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs
index 32ec169d..f89287fe 100644
--- a/kad/src/Network/Kademlia/Persistence.hs
+++ b/kad/src/Network/Kademlia/Persistence.hs
@@ -16,7 +16,7 @@ import System.IO.Error
16 16
17saveNodes :: String -> DHT -> IO () 17saveNodes :: String -> DHT -> IO ()
18saveNodes netname DHT{dhtBuckets} = do 18saveNodes netname DHT{dhtBuckets} = do
19 bkts <- atomically $ readTVar dhtBuckets 19 bkts <- atomically $ readTVar (refreshBuckets dhtBuckets)
20 let ns = map fst $ concat $ R.toList bkts 20 let ns = map fst $ concat $ R.toList bkts
21 bs = J.encode ns 21 bs = J.encode ns
22 fname = nodesFileName netname 22 fname = nodesFileName netname
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
index 5b60c303..856a7cfc 100644
--- a/kad/src/Network/Kademlia/Search.hs
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -194,12 +194,15 @@ search ::
194 , PSQKey nid 194 , PSQKey nid
195 , PSQKey ni 195 , PSQKey ni
196 , Show nid 196 , Show nid
197 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) 197 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId)
198search sch buckets target result = do 198search sch buckets target result = do
199 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets 199 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
200 st <- atomically $ newSearch sch target ns 200 st <- atomically $ newSearch sch target ns
201 t <- forkIO $ searchLoop sch target result st 201 v <- newTVarIO False
202 return st 202 t <- forkIO $ atomically (check =<< readTVar v) >> searchLoop sch target result st
203 labelThread t ("search.pending." ++ show target)
204 atomically $ writeTVar v True
205 return (st,t)
203 206
204searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) 207searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni )
205 => Search nid addr tok ni r -- ^ Query and distance methods. 208 => Search nid addr tok ni r -- ^ Query and distance methods.