From 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sat, 28 Sep 2019 13:43:29 -0400 Subject: Factor out some new libraries word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search --- src/Network/Kademlia/Bootstrap.hs | 437 ------------------- src/Network/Kademlia/CommonAPI.hs | 84 ---- src/Network/Kademlia/Persistence.hs | 51 --- src/Network/Kademlia/Routing.hs | 808 ------------------------------------ src/Network/Kademlia/Search.hs | 236 ----------- 5 files changed, 1616 deletions(-) delete mode 100644 src/Network/Kademlia/Bootstrap.hs delete mode 100644 src/Network/Kademlia/CommonAPI.hs delete mode 100644 src/Network/Kademlia/Persistence.hs delete mode 100644 src/Network/Kademlia/Routing.hs delete mode 100644 src/Network/Kademlia/Search.hs (limited to 'src/Network/Kademlia') diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs deleted file mode 100644 index 1324ae77..00000000 --- a/src/Network/Kademlia/Bootstrap.hs +++ /dev/null @@ -1,437 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE ConstraintKinds #-} -{-# LANGUAGE DeriveFunctor #-} -{-# LANGUAGE DeriveTraversable #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE PartialTypeSignatures #-} -{-# LANGUAGE PatternSynonyms #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE ScopedTypeVariables #-} -module Network.Kademlia.Bootstrap where - -import Data.Function -import Data.Maybe -import qualified Data.Set as Set -import Data.Time.Clock.POSIX (getPOSIXTime) -import Network.Kademlia.Routing as R -#ifdef THREAD_DEBUG -import Control.Concurrent.Lifted.Instrument -#else -import Control.Concurrent.Lifted -import GHC.Conc (labelThread) -#endif -import Control.Concurrent.STM -import Control.Monad -import Data.Hashable -import Data.Time.Clock.POSIX (POSIXTime) -import Data.Ord -import System.Entropy -import System.Timeout -import DPut -import DebugTag - -import qualified Data.Wrapper.PSQInt as Int - ;import Data.Wrapper.PSQInt (pattern (:->)) -import Network.Address (bucketRange) -import Network.Kademlia.Search -import Control.Concurrent.Tasks -import Network.Kademlia - -type SensibleNodeId nid ni = - ( Show nid - , Ord nid - , Ord ni - , Hashable nid - , Hashable ni ) - -data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher - { -- | A staleness threshold (if a bucket goes this long without being - -- touched, a refresh will be triggered). - refreshInterval :: POSIXTime - -- | A TVar with the time-to-refresh schedule for each bucket. - -- - -- To "touch" a bucket and prevent it from being refreshed, reschedule - -- its refresh time to some time into the future by modifying its - -- priority in this priority search queue. - , refreshQueue :: TVar (Int.PSQ POSIXTime) - -- | This is the kademlia node search specification. - , refreshSearch :: Search nid addr tok ni ni - -- | The current kademlia routing table buckets. - , refreshBuckets :: TVar (R.BucketList ni) - -- | Action to ping a node. This is used only during initial bootstrap - -- to get some nodes in our table. A 'True' result is interpreted as a a - -- pong, where 'False' is a non-response. - , refreshPing :: ni -> IO Bool - , -- | Timestamp of last bucket event. - refreshLastTouch :: TVar POSIXTime - , -- | This variable indicates whether or not we are in bootstrapping mode. - bootstrapMode :: TVar Bool - , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on - -- every finished refresh. - bootstrapCountdown :: TVar (Maybe Int) - } - -newBucketRefresher :: ( Ord addr, Hashable addr - , SensibleNodeId nid ni ) - => TVar (R.BucketList ni) - -> Search nid addr tok ni ni - -> (ni -> IO Bool) - -> STM (BucketRefresher nid ni) -newBucketRefresher bkts sch ping = do - let spc = searchSpace sch - nodeId = kademliaLocation spc - -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount - sched <- newTVar Int.empty - lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... - bootstrapVar <- newTVar True -- Start in bootstrapping mode. - bootstrapCnt <- newTVar Nothing - return BucketRefresher - { refreshInterval = 15 * 60 - , refreshQueue = sched - , refreshSearch = sch - , refreshBuckets = bkts - , refreshPing = ping - , refreshLastTouch = lasttouch - , bootstrapMode = bootstrapVar - , bootstrapCountdown = bootstrapCnt - } - --- | This was added to avoid the compile error "Record update for --- insufficiently polymorphic field" when trying to update the existentially --- quantified field 'refreshSearch'. -updateRefresherIO :: Ord addr - => Search nid addr tok ni ni - -> (ni -> IO Bool) - -> BucketRefresher nid ni -> BucketRefresher nid ni -updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher - { refreshSearch = sch - , refreshPing = ping - , refreshInterval = refreshInterval - , refreshBuckets = refreshBuckets - , refreshQueue = refreshQueue - , refreshLastTouch = refreshLastTouch - , bootstrapMode = bootstrapMode - , bootstrapCountdown = bootstrapCountdown - } - --- | Fork a refresh loop. Kill the returned thread to terminate it. -forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId -forkPollForRefresh r@BucketRefresher{ refreshInterval - , refreshQueue - , refreshBuckets - , refreshSearch } = fork $ do - myThreadId >>= flip labelThread "pollForRefresh" - fix $ \again -> do - join $ atomically $ do - nextup <- Int.findMin <$> readTVar refreshQueue - maybe retry (return . go again) nextup - where - refresh :: Int -> IO Int - refresh n = do - -- dput XRefresh $ "Refresh time! "++ show n - refreshBucket r n - - go again ( bktnum :-> refresh_time ) = do - now <- getPOSIXTime - case fromEnum (refresh_time - now) of - x | x <= 0 -> do -- Refresh time! - -- Move it to the back of the refresh queue. - atomically $ do - interval <- effectiveRefreshInterval r bktnum - modifyTVar' refreshQueue - $ Int.insert bktnum (now + interval) - -- Now fork the refresh operation. - -- TODO: We should probably propogate the kill signal to this thread. - fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) - _ <- refresh bktnum - return () - return () - picoseconds -> do - -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum - threadDelay ( picoseconds `div` 10^6 ) - again - - --- | This is a helper to 'refreshBucket' which does some book keeping to decide --- whether or not a bucket is sufficiently refreshed or not. It will return --- false when we can terminate a node search. -checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. - -> TVar (BucketList ni) -- ^ The current routing table. - -> TVar (Set.Set ni) -- ^ In-range nodes found so far. - -> TVar Bool -- ^ The result will also be written here. - -> Int -- ^ The bucket number of interest. - -> ni -- ^ A newly found node. - -> STM Bool -checkBucketFull space var resultCounter fin n found_node = do - let fullcount = R.defaultBucketSize - saveit True = writeTVar fin True >> return True - saveit _ = return False - tbl <- readTVar var - let counts = R.shape tbl - nid = kademliaLocation space found_node - -- Update the result set with every found node that is in the - -- bucket of interest. - when (n == R.bucketNumber space nid tbl) - $ modifyTVar' resultCounter (Set.insert found_node) - resultCount <- readTVar resultCounter - saveit $ case drop (n - 1) counts of - (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going - _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going - _ -> False -- okay, good enough, let's quit. - --- | Called from 'refreshBucket' with the current time when a refresh of the --- supplied bucket number finishes. -onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) -onFinishedRefresh BucketRefresher { bootstrapCountdown - , bootstrapMode - , refreshQueue - , refreshBuckets } num now = do - bootstrapping <- readTVar bootstrapMode - if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num - else do - tbl <- readTVar refreshBuckets - action <- - if num /= R.bktCount tbl - 1 - then do modifyTVar' bootstrapCountdown (fmap pred) - return $ return () -- dput XRefresh $ "BOOTSTRAP decrement" - else do - -- The last bucket finished. - cnt <- readTVar bootstrapCountdown - case cnt of - Nothing -> do - let fullsize = R.defaultBucketSize - notfull (n,len) | n==num = False - | len>=fullsize = False - | otherwise = True - unfull = case filter notfull $ zip [0..] (R.shape tbl) of - [] -> [(0,0)] -- Schedule at least 1 more refresh. - xs -> xs - forM_ unfull $ \(n,_) -> do - -- Schedule immediate refresh for unfull buckets (other than this one). - modifyTVar' refreshQueue $ Int.insert n (now - 1) - writeTVar bootstrapCountdown $! Just $! length unfull - return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull - Just n -> do writeTVar bootstrapCountdown $! Just $! pred n - return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)" - cnt <- readTVar bootstrapCountdown - if (cnt == Just 0) - then do - -- Boostrap finished! - writeTVar bootstrapMode False - writeTVar bootstrapCountdown Nothing - return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." - else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) - -refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => - BucketRefresher nid ni -> Int -> IO Int -refreshBucket r@BucketRefresher{ refreshSearch = sch - , refreshBuckets = var } - n = do - tbl <- atomically (readTVar var) - let count = bktCount tbl - nid = kademliaLocation (searchSpace sch) (thisNode tbl) - sample <- if n+1 >= count -- Is this the last bucket? - then return nid -- Yes? Search our own id. - else kademliaSample (searchSpace sch) -- No? Generate a random id. - getEntropy - nid - (bucketRange n (n + 1 < count)) - fin <- atomically $ newTVar False - resultCounter <- atomically $ newTVar Set.empty - - dput XRefresh $ "Start refresh " ++ show (n,sample) - - -- Set 15 minute timeout in order to avoid overlapping refreshes. - s <- search sch tbl sample $ if n+1 == R.defaultBucketCount - then const $ return True -- Never short-circuit the last bucket. - else checkBucketFull (searchSpace sch) var resultCounter fin n - _ <- timeout (15*60*1000000) $ do - atomically $ searchIsFinished s >>= check - atomically $ searchCancel s - dput XDHT $ "Finish refresh " ++ show (n,sample) - now <- getPOSIXTime - join $ atomically $ onFinishedRefresh r n now - rcount <- atomically $ do - c <- Set.size <$> readTVar resultCounter - b <- readTVar fin - return $ if b then 1 else c - return rcount - -refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () -refreshLastBucket r@BucketRefresher { refreshBuckets - , refreshQueue } = do - - now <- getPOSIXTime - atomically $ do - cnt <- bktCount <$> readTVar refreshBuckets - -- Schedule immediate refresh. - modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) - -restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => - BucketRefresher nid ni -> STM (IO ()) -restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do - unchanged <- readTVar bootstrapMode - writeTVar bootstrapMode True - writeTVar bootstrapCountdown Nothing - if not unchanged then return $ do - dput XRefresh "BOOTSTRAP entered bootstrap mode" - refreshLastBucket r - else return $ dput XRefresh "BOOTSTRAP already bootstrapping" - -bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => - BucketRefresher nid ni - -> t1 ni -- ^ Nodes to bootstrap from. - -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. - -> IO () -bootstrap r@BucketRefresher { refreshSearch = sch - , refreshBuckets = var - , refreshPing = ping - , bootstrapMode } ns ns0 = do - gotPing <- atomically $ newTVar False - - -- First, ping the given nodes so that they are added to - -- our routing table. - withTaskGroup "bootstrap.resume" 20 $ \g -> do - forM_ ns $ \n -> do - let lbl = show $ kademliaLocation (searchSpace sch) n - forkTask g lbl $ do - b <- ping n - when b $ atomically $ writeTVar gotPing True - - -- We resort to the hardcoded fallback nodes only when we got no - -- responses. This is to lesson the burden on well-known boostrap - -- nodes. - fallback <- atomically (readTVar gotPing) >>= return . when . not - fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do - forM_ ns0 $ \n -> do - forkTask g (show $ kademliaLocation (searchSpace sch) n) - (void $ ping n) - dput XDHT "Finished bootstrap pings." - -- Now search our own Id by entering bootstrap mode from non-bootstrap mode. - join $ atomically $ do - writeTVar bootstrapMode False - restartBootstrap r - -- - -- Hopefully 'forkPollForRefresh' was invoked and can take over - -- maintenance. - - -effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime -effectiveRefreshInterval BucketRefresher{ refreshInterval - , refreshBuckets - , bootstrapMode } num = do - tbl <- readTVar refreshBuckets - bootstrapping <- readTVar bootstrapMode - case bootstrapping of - False -> return refreshInterval - True -> do - -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds. - let fullcount = R.defaultBucketSize - count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl - if count == fullcount - then return refreshInterval - else return 15 -- seconds - - - --- | Reschedule a bucket's refresh-time. It should be called whenever a bucket --- changes. This will typically be invoked from 'tblTransition'. --- --- From BEP 05: --- --- > Each bucket should maintain a "last changed" property to indicate how --- > "fresh" the contents are. --- --- We will use a "time to next refresh" property instead and store it in --- a priority search queue. --- --- In detail using an expository (not actually implemented) type --- 'BucketTouchEvent'... --- --- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus --- >>> bucketEvents = --- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, --- >>> --- >>> , Stranger :--> Accepted -- or a node is added to a bucket, --- >>> --- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced --- >>> , Applicant :--> Accepted -- with another node, --- >>> ] --- --- the bucket's last changed property should be updated. Buckets that have not --- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." --- This is done by picking a random ID in the range of the bucket and --- performing a find_nodes search on it. --- --- The only other possible BucketTouchEvents are as follows: --- --- >>> not_handled = --- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: --- >>> -- (Applicant :--> Stranger) --- >>> -- (Applicant :--> Accepted) --- >>> , Accepted :--> Applicant -- Never happens --- >>> ] --- --- Because this BucketTouchEvent type is not actually implemented and we only --- receive notifications of a node's new state, it suffices to reschedule the --- bucket refresh 'touchBucket' on every transition to a state other than --- 'Applicant'. --- --- XXX: Unfortunately, this means redundantly triggering twice upon every node --- replacement because we do not currently distinguish between standalone --- insertion/deletion events and an insertion/deletion pair constituting --- replacement. --- --- It might also be better to pass the timestamp of the transition here and --- keep the refresh queue in better sync with the routing table by updating it --- within the STM monad. --- --- We embed the result in the STM monad but currently, no STM state changes --- occur until the returned IO action is invoked. TODO: simplify? -touchBucket :: SensibleNodeId nid ni - => BucketRefresher nid ni - -> RoutingTransition ni -- ^ What happened to the bucket? - -> STM (IO ()) -touchBucket r@BucketRefresher{ refreshSearch - , refreshInterval - , refreshBuckets - , refreshQueue - , refreshLastTouch - , bootstrapMode - , bootstrapCountdown } - RoutingTransition{ transitionedTo - , transitioningNode } - = case transitionedTo of - Applicant -> return $ return () -- Ignore transition to applicant. - _ -> return $ do -- Reschedule for any other transition. - now <- getPOSIXTime - join $ atomically $ do - let space = searchSpace refreshSearch - nid = kademliaLocation space transitioningNode - tbl <- readTVar refreshBuckets - let num = R.bucketNumber space nid tbl - stamp <- readTVar refreshLastTouch - action <- case stamp /= 0 && (now - stamp > 60) of - True -> do - -- It's been one minute since any bucket has been touched, re-enter bootstrap mode. - restartBootstrap r - False -> return $ return () - interval <- effectiveRefreshInterval r num - modifyTVar' refreshQueue $ Int.insert num (now + interval) - writeTVar refreshLastTouch now - return action - -refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni -refreshKademlia r@BucketRefresher { refreshSearch = sch - , refreshPing = ping - , refreshBuckets = bkts - } - = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping) - { tblTransition = \tr -> do - io <- touchBucket r tr - return io - } diff --git a/src/Network/Kademlia/CommonAPI.hs b/src/Network/Kademlia/CommonAPI.hs deleted file mode 100644 index 601be5d8..00000000 --- a/src/Network/Kademlia/CommonAPI.hs +++ /dev/null @@ -1,84 +0,0 @@ -{-# LANGUAGE ExistentialQuantification #-} -module Network.Kademlia.CommonAPI where - - -import Control.Concurrent -import Control.Concurrent.STM -import Data.Aeson as J (FromJSON, ToJSON) -import Data.Hashable -import qualified Data.Map as Map -import Data.Serialize as S -import qualified Data.Set as Set -import Data.Time.Clock.POSIX -import Data.Typeable - -import Network.Kademlia.Search -import Network.Kademlia.Routing as R -import Crypto.Tox (SecretKey,PublicKey) - -data DHT = forall nid ni. ( Show ni - , Read ni - , ToJSON ni - , FromJSON ni - , Ord ni - , Hashable ni - , Show nid - , Ord nid - , Hashable nid - , Typeable ni - , S.Serialize nid - ) => - DHT - { dhtBuckets :: TVar (BucketList ni) - , dhtSecretKey :: STM (Maybe SecretKey) - , dhtPing :: Map.Map String (DHTPing ni) - , dhtQuery :: Map.Map String (DHTQuery nid ni) - , dhtAnnouncables :: Map.Map String (DHTAnnouncable nid) - , dhtParseId :: String -> Either String nid - , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni)) - , dhtFallbackNodes :: IO [ni] - , dhtBootstrap :: [ni] -> [ni] -> IO () - } - -data DHTQuery nid ni = forall addr r tok. - ( Ord addr - , Typeable r - , Typeable tok - , Typeable ni - ) => DHTQuery - { qsearch :: Search nid addr tok ni r - , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. - , qshowR :: r -> String - , qshowTok :: tok -> Maybe String - } - -data DHTAnnouncable nid = forall dta tok ni r. - ( Show r - , Typeable dta -- information being announced - , Typeable tok -- token - , Typeable r -- search result - , Typeable ni -- node - ) => DHTAnnouncable - { announceParseData :: String -> Either String dta - , announceParseToken :: dta -> String -> Either String tok - , announceParseAddress :: String -> Either String ni - , announceSendData :: Either ( String {- search name -} - , String -> Either String r - , PublicKey {- me -} -> dta -> r -> IO ()) - (dta -> tok -> Maybe ni -> IO (Maybe r)) - , announceInterval :: POSIXTime - , announceTarget :: dta -> nid - } - -data DHTSearch nid ni = forall addr tok r. DHTSearch - { searchThread :: ThreadId - , searchState :: SearchState nid addr tok ni r - , searchShowTok :: tok -> Maybe String - , searchResults :: TVar (Set.Set String) - } - -data DHTPing ni = forall r. DHTPing - { pingQuery :: [String] -> ni -> IO (Maybe r) - , pingShowResult :: r -> String - } - diff --git a/src/Network/Kademlia/Persistence.hs b/src/Network/Kademlia/Persistence.hs deleted file mode 100644 index d7431671..00000000 --- a/src/Network/Kademlia/Persistence.hs +++ /dev/null @@ -1,51 +0,0 @@ -{-# LANGUAGE NamedFieldPuns #-} -module Network.Kademlia.Persistence where - -import Network.Kademlia.CommonAPI -import Network.Kademlia.Routing as R - -import Control.Concurrent.STM -import qualified Data.Aeson as J - ;import Data.Aeson as J (FromJSON) -import qualified Data.ByteString.Lazy as L -import qualified Data.HashMap.Strict as HashMap -import Data.List -import qualified Data.Vector as V -import System.IO.Error - -saveNodes :: String -> DHT -> IO () -saveNodes netname DHT{dhtBuckets} = do - bkts <- atomically $ readTVar dhtBuckets - let ns = map fst $ concat $ R.toList bkts - bs = J.encode ns - fname = nodesFileName netname - L.writeFile fname bs - -loadNodes :: FromJSON ni => String -> IO [ni] -loadNodes netname = do - let fname = nodesFileName netname - attempt <- tryIOError $ do - J.decode <$> L.readFile fname - >>= maybe (ioError $ userError "Nothing") return - either (const $ fallbackLoad fname) return attempt - -nodesFileName :: String -> String -nodesFileName netname = netname ++ "-nodes.json" - -fallbackLoad :: FromJSON t => FilePath -> IO [t] -fallbackLoad fname = do - attempt <- tryIOError $ do - J.decode <$> L.readFile fname - >>= maybe (ioError $ userError "Nothing") return - let go r = do - let m = HashMap.lookup "nodes" (r :: J.Object) - ns0 = case m of Just (J.Array v) -> V.toList v - Nothing -> [] - ns1 = zip (map J.fromJSON ns0) ns0 - issuc (J.Error _,_) = False - issuc _ = True - (ss,fs) = partition issuc ns1 - ns = map (\(J.Success n,_) -> n) ss - mapM_ print (map snd fs) >> return ns - either (const $ return []) go attempt - diff --git a/src/Network/Kademlia/Routing.hs b/src/Network/Kademlia/Routing.hs deleted file mode 100644 index a52cca73..00000000 --- a/src/Network/Kademlia/Routing.hs +++ /dev/null @@ -1,808 +0,0 @@ --- | --- Copyright : (c) Sam Truzjan 2013 --- (c) Joe Crayne 2017 --- License : BSD3 --- Maintainer : pxqr.sta@gmail.com --- Stability : experimental --- Portability : portable --- --- Every node maintains a routing table of known good nodes. The --- nodes in the routing table are used as starting points for --- queries in the DHT. Nodes from the routing table are returned in --- response to queries from other nodes. --- --- For more info see: --- --- -{-# LANGUAGE CPP #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE ViewPatterns #-} -{-# LANGUAGE TypeOperators #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE DeriveFunctor #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TupleSections #-} -{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} -{-# OPTIONS_GHC -fno-warn-orphans #-} -module Network.Kademlia.Routing - {- - ( -- * BucketList - BucketList - , Info(..) - - -- * Attributes - , BucketCount - , defaultBucketCount - , BucketSize - , defaultBucketSize - , NodeCount - - -- * Query - , Network.Kademlia.Routing.null - , Network.Kademlia.Routing.full - , thisId - , shape - , Network.Kademlia.Routing.size - , Network.Kademlia.Routing.depth - , compatibleNodeId - - -- * Lookup - , K - , defaultK - , TableKey (..) - , kclosest - - -- * Construction - , Network.Kademlia.Routing.nullTable - , Event(..) - , CheckPing(..) - , Network.Kademlia.Routing.insert - - -- * Conversion - , Network.Kademlia.Routing.TableEntry - , Network.Kademlia.Routing.toList - - -- * Routing - , Timestamp - , getTimestamp - ) -} where - -import Control.Applicative as A -import Control.Arrow -import Control.Monad -import Data.Function -import Data.Functor.Contravariant -import Data.Functor.Identity -import Data.List as L hiding (insert) -import Data.Maybe -import Data.Monoid -import Data.Wrapper.PSQ as PSQ -import Data.Serialize as S hiding (Result, Done) -import qualified Data.Sequence as Seq -import Data.Time -import Data.Time.Clock.POSIX -import Data.Word -import GHC.Generics -import Text.PrettyPrint as PP hiding ((<>)) -import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) -import qualified Data.ByteString as BS -import Data.Bits -import Data.Ord -import Data.Reflection -import Network.Address -import Data.Typeable -import Data.Coerce -import Data.Hashable - - --- | Last time the node was responding to our queries. --- --- Not all nodes that we learn about are equal. Some are \"good\" and --- some are not. Many nodes using the DHT are able to send queries --- and receive responses, but are not able to respond to queries --- from other nodes. It is important that each node's routing table --- must contain only known good nodes. A good node is a node has --- responded to one of our queries within the last 15 minutes. A --- node is also good if it has ever responded to one of our queries --- and has sent us a query within the last 15 minutes. After 15 --- minutes of inactivity, a node becomes questionable. Nodes become --- bad when they fail to respond to multiple queries in a row. Nodes --- that we know are good are given priority over nodes with unknown --- status. --- -type Timestamp = POSIXTime - -getTimestamp :: IO Timestamp -getTimestamp = do - utcTime <- getCurrentTime - return $ utcTimeToPOSIXSeconds utcTime - - - -{----------------------------------------------------------------------- - Bucket ------------------------------------------------------------------------} --- --- When a k-bucket is full and a new node is discovered for that --- k-bucket, the least recently seen node in the k-bucket is --- PINGed. If the node is found to be still alive, the new node is --- place in a secondary list, a replacement cache. The replacement --- cache is used only if a node in the k-bucket stops responding. In --- other words: new nodes are used only when older nodes disappear. - --- | Timestamp - last time this node is pinged. -type NodeEntry ni = Binding ni Timestamp - - --- | Maximum number of 'NodeInfo's stored in a bucket. Most clients --- use this value. -defaultBucketSize :: Int -defaultBucketSize = 8 - -data QueueMethods m elem fifo = QueueMethods - { pushBack :: elem -> fifo -> m fifo - , popFront :: fifo -> m (Maybe elem, fifo) - , emptyQueue :: m fifo - } - -{- -fromQ :: Functor m => - ( a -> b ) - -> ( b -> a ) - -> QueueMethods m elem a - -> QueueMethods m elem b -fromQ embed project QueueMethods{..} = - QueueMethods { pushBack = \e -> fmap embed . pushBack e . project - , popFront = fmap (second embed) . popFront . project - , emptyQueue = fmap embed emptyQueue - } --} - -seqQ :: QueueMethods Identity ni (Seq.Seq ni) -seqQ = QueueMethods - { pushBack = \e fifo -> pure (fifo Seq.|> e) - , popFront = \fifo -> case Seq.viewl fifo of - e Seq.:< fifo' -> pure (Just e, fifo') - Seq.EmptyL -> pure (Nothing, Seq.empty) - , emptyQueue = pure Seq.empty - } - -type BucketQueue ni = Seq.Seq ni - -bucketQ :: QueueMethods Identity ni (BucketQueue ni) -bucketQ = seqQ - - -data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int) - -contramapC :: (b -> a) -> Compare a -> Compare b -contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b)) - (\s x -> hsh s (f x)) - -newtype Ordered' s a = Ordered a - deriving (Show) - --- | Hack to avoid UndecidableInstances -newtype Shrink a = Shrink a - deriving (Show) - -type Ordered s a = Ordered' s (Shrink a) - -instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where - a == b = (compare a b == EQ) - -instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where - compare a b = cmp (coerce a) (coerce b) - where Compare cmp _ = reflect (Proxy :: Proxy s) - -instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where - hashWithSalt salt x = hash salt (coerce x) - where Compare _ hash = reflect (Proxy :: Proxy s) - --- | Bucket is also limited in its length — thus it's called k-bucket. --- When bucket becomes full, we should split it in two lists by --- current span bit. Span bit is defined by depth in the routing --- table tree. Size of the bucket should be choosen such that it's --- very unlikely that all nodes in bucket fail within an hour of --- each other. -data Bucket s ni = Bucket - { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes - , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs - } deriving (Generic) - -#define CAN_SHOW_BUCKET 0 - -#if CAN_SHOW_BUCKET -deriving instance Show ni => Show (Bucket s ni) -#endif - -bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni -bucketCompare _ = reflect (Proxy :: Proxy s) - -mapBucket :: ( Reifies s (Compare a) - , Reifies t (Compare ni) - ) => (a -> ni) -> Bucket s a -> Bucket t ni -mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns) - (fmap (second f) q) - where f' = coerce . f . coerce - - -#if 0 - -{- -getGenericNode :: ( Serialize (NodeId) - , Serialize ip - , Serialize u - ) => Get (NodeInfo) -getGenericNode = do - nid <- get - naddr <- get - u <- get - return NodeInfo - { nodeId = nid - , nodeAddr = naddr - , nodeAnnotation = u - } - -putGenericNode :: ( Serialize (NodeId) - , Serialize ip - , Serialize u - ) => NodeInfo -> Put -putGenericNode (NodeInfo nid naddr u) = do - put nid - put naddr - put u - -instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where - get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) - put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes --} - -#endif - -psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p -psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs - -psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)] -psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq - --- | Update interval, in seconds. -delta :: NominalDiffTime -delta = 15 * 60 - --- | Should maintain a set of stable long running nodes. --- --- Note: pings are triggerd only when a bucket is full. -updateBucketForInbound :: ( Coercible t1 t - , Alternative f - , Reifies s (Compare t1) - ) => NominalDiffTime -> t1 -> Bucket s t1 -> f ([t], Bucket s t1) -updateBucketForInbound curTime info bucket - -- Just update timestamp if a node is already in bucket. - -- - -- Note PingResult events should only occur for nodes we requested a ping for, - -- and those will always already be in the routing queue and will get their - -- timestamp updated here, since 'TryInsert' is called on every inbound packet, - -- including ping results. - | already_have - = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime ) - -- bucket is good, but not full => we can insert a new node - | PSQ.size (bktNodes bucket) < defaultBucketSize - = pure ( [], map_ns $ PSQ.insert (coerce info) curTime ) - -- If there are any questionable nodes in the bucket have not been - -- seen in the last 15 minutes, the least recently seen node is - -- pinged. If any nodes in the bucket are known to have become bad, - -- then one is replaced by the new node in the next insertBucket - -- iteration. - | not (L.null stales) - = pure ( stales - , bucket { -- Update timestamps so that we don't redundantly ping. - bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket - -- Update queue with the pending NodeInfo in case of ping fail. - , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } ) - -- When the bucket is full of good nodes, the new node is simply discarded. - -- We must return 'A.empty' here to ensure that bucket splitting happens - -- inside 'modifyBucket'. - | otherwise = A.empty - where - -- We (take 1) to keep a 1-to-1 correspondence between pending pings and - -- waiting nodes in the bktQ. This way, we don't have to worry about what - -- to do with failed pings for which there is no ready replacements. - stales = -- One stale: - do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) - guard (t < curTime - delta) - return $ coerce n - -- All stale: - -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket - - already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket) - - map_ns f = bucket { bktNodes = f (bktNodes bucket) } - -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } - -updateBucketForPingResult :: (Applicative f, Reifies s (Compare a)) => - a -> Bool -> Bucket s a -> f ([(a, Maybe (Timestamp, a))], Bucket s a) -updateBucketForPingResult bad_node got_response bucket - = pure ( map (,Nothing) forgotten - ++ map (second Just) replacements - , Bucket (foldr replace - (bktNodes bucket) - replacements) - popped - ) - where - (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) - - -- Dropped from accepted, replaced by pending. - replacements | got_response = [] -- Timestamp was already updated by TryInsert. - | Just info <- top = do - -- Insert only if there's a removal. - _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket) - return (bad_node, info) - | otherwise = [] - - -- Dropped from the pending queue without replacing. - forgotten | got_response = maybeToList $ fmap snd top - | otherwise = [] - - - replace (bad_node, (tm, info)) = - PSQ.insert (coerce info) tm - . PSQ.delete (coerce bad_node) - - -updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp -updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales - -type BitIx = Word - -partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) -partitionQ imp test q0 = do - pass0 <- emptyQueue imp - fail0 <- emptyQueue imp - let flipfix a b f = fix f a b - flipfix q0 (pass0,fail0) $ \rec q qs -> do - (mb,q') <- popFront imp q - case mb of - Nothing -> return qs - Just e -> do qs' <- select (pushBack imp e) qs - rec q' qs' - where - select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) - select f = if test e then \(a,b) -> flip (,) b <$> f a - else \(a,b) -> (,) a <$> f b - - - -split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => - forall ni s. ( Reifies s (Compare ni) ) => - (ni -> Word -> Bool) - -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni) -split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs) - where - (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b - (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b - - spanBit :: ni -> Bool - spanBit entry = testNodeIdBit entry i - - -{----------------------------------------------------------------------- --- BucketList ------------------------------------------------------------------------} - -defaultBucketCount :: Int -defaultBucketCount = 20 - -defaultMaxBucketCount :: Word -defaultMaxBucketCount = 24 - -data Info ni nid = Info - { myBuckets :: BucketList ni - , myNodeId :: nid - , myAddress :: SockAddr - } - deriving Generic - -deriving instance (Eq ni, Eq nid) => Eq (Info ni nid) -deriving instance (Show ni, Show nid) => Show (Info ni nid) - --- instance (Eq ip, Serialize ip) => Serialize (Info ip) - --- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ --- 160. The routing table is subdivided into 'Bucket's that each cover --- a portion of the space. An empty table has one bucket with an ID --- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" --- is inserted into the table, it is placed within the bucket that has --- @min <= N < max@. An empty table has only one bucket so any node --- must fit within it. Each bucket can only hold 'K' nodes, currently --- eight, before becoming 'Full'. When a bucket is full of known good --- nodes, no more nodes may be added unless our own 'NodeId' falls --- within the range of the 'Bucket'. In that case, the bucket is --- replaced by two new buckets each with half the range of the old --- bucket and the nodes from the old bucket are distributed among the --- two new ones. For a new table with only one bucket, the full bucket --- is always split into two new buckets covering the ranges @0..2 ^ --- 159@ and @2 ^ 159..2 ^ 160@. --- -data BucketList ni = forall s. Reifies s (Compare ni) => - BucketList { thisNode :: !ni - -- | Non-empty list of buckets. - , buckets :: [Bucket s ni] - } - -mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b -mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts) - $ \p -> BucketList - { thisNode = f self - , buckets = map (resolve p . mapBucket f) bkts - } - where - resolve :: Proxy s -> Bucket s ni -> Bucket s ni - resolve = const id - -instance (Eq ni) => Eq (BucketList ni) where - (==) = (==) `on` Network.Kademlia.Routing.toList - -#if 0 - -instance Serialize NominalDiffTime where - put = putWord32be . fromIntegral . fromEnum - get = (toEnum . fromIntegral) <$> getWord32be - -#endif - -#if CAN_SHOW_BUCKET -deriving instance (Show ni) => Show (BucketList ni) -#else -instance Show ni => Show (BucketList ni) where - showsPrec d (BucketList self bkts) = - mappend "BucketList " - . showsPrec (d+1) self - . mappend " (fromList " - . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts) - . mappend ") " -#endif - -#if 0 - --- | Normally, routing table should be saved between invocations of --- the client software. Note that you don't need to store /this/ --- 'NodeId' since it is already included in routing table. -instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList) - -#endif - --- | Shape of the table. -instance Pretty (BucketList ni) where - pPrint t - | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss - | otherwise = brackets $ - PP.int (L.sum ss) <> " nodes, " <> - PP.int bucketCount <> " buckets" - where - bucketCount = L.length ss - ss = shape t - --- | Empty table with specified /spine/ node id. --- --- XXX: The comparison function argument is awkward here. -nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni -nullTable cmp hsh ni n = - reify (Compare cmp hsh) - $ \p -> BucketList - ni - [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)] - where - empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp - empty = const $ PSQ.empty - -#if 0 - --- | Test if table is empty. In this case DHT should start --- bootstrapping process until table becomes 'full'. -null :: BucketList -> Bool -null (Tip _ _ b) = PSQ.null $ bktNodes b -null _ = False - --- | Test if table have maximum number of nodes. No more nodes can be --- 'insert'ed, except old ones becomes bad. -full :: BucketList -> Bool -full (Tip _ n _) = n == 0 -full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t -full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t - --- | Get the /spine/ node id. -thisId :: BucketList -> NodeId -thisId (Tip nid _ _) = nid -thisId (Zero table _) = thisId table -thisId (One _ table) = thisId table - --- | Number of nodes in a bucket or a table. -type NodeCount = Int - -#endif - --- | Internally, routing table is similar to list of buckets or a --- /matrix/ of nodes. This function returns the shape of the matrix. -shape :: BucketList ni -> [Int] -shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl - -#if 0 - --- | Get number of nodes in the table. -size :: BucketList -> NodeCount -size = L.sum . shape - --- | Get number of buckets in the table. -depth :: BucketList -> BucketCount -depth = L.length . shape - -#endif - -lookupBucket :: forall ni nid x. - ( -- FiniteBits nid - Ord nid - ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x -lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts - where - d = kademliaXor space nid (kademliaLocation space self) - - go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] - go i bs (bucket : buckets) - | kademliaTestBit space d i = bucket : buckets ++ bs - | otherwise = go (succ i) (bucket:bs) buckets - go _ bs [] = bs - -bucketNumber :: forall ni nid. - KademliaSpace nid ni -> nid -> BucketList ni -> Int -bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts - where - d = kademliaXor space nid (kademliaLocation space self) - - go :: Word -> [Bucket s ni] -> Word - go i (bucket : buckets) - | kademliaTestBit space d i = i - | otherwise = go (succ i) buckets - go i [] = i - - -compatibleNodeId :: forall ni nid. - ( Serialize nid, FiniteBits nid) => - (ni -> nid) -> BucketList ni -> IO nid -compatibleNodeId nodeId tbl = genBucketSample prefix br - where - br = bucketRange (L.length (shape tbl) - 1) True - nodeIdSize = finiteBitSize (undefined :: nid) `div` 8 - bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0 - prefix = either error id $ S.decode bs - -tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8] -tablePrefix testbit = map (packByte . take 8 . (++repeat False)) - . chunksOf 8 - . tableBits testbit - where - packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] - bitmask ix True = bit ix - bitmask _ _ = 0 - -tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool] -tableBits testbit (BucketList self bkts) = - zipWith const (map (testbit self) [0..]) - bkts - -selfNode :: BucketList ni -> ni -selfNode (BucketList self _) = self - -chunksOf :: Int -> [e] -> [[e]] -chunksOf i ls = map (take i) (build (splitter ls)) where - splitter :: [e] -> ([e] -> a -> a) -> a -> a - splitter [] _ n = n - splitter l c n = l `c` splitter (drop i l) c n - -build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] -build g = g (:) [] - - - --- | Count of closest nodes in find_node reply. -type K = Int - --- | Default 'K' is equal to 'defaultBucketSize'. -defaultK :: K -defaultK = 8 - -#if 0 -class TableKey dht k where - toNodeId :: k -> NodeId - -instance TableKey dht (NodeId) where - toNodeId = id - -#endif - --- | In Kademlia, the distance metric is XOR and the result is --- interpreted as an unsigned integer. -newtype NodeDistance nodeid = NodeDistance nodeid - deriving (Eq, Ord) - --- | distance(A,B) = |A xor B| Smaller values are closer. -distance :: Bits nid => nid -> nid -> NodeDistance nid -distance a b = NodeDistance $ xor a b - --- | Order by closeness: nearest nodes first. -rank :: ( Ord nid - ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] -rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) - - --- | Get a list of /K/ closest nodes using XOR metric. Used in --- 'find_node' and 'get_peers' queries. -kclosest :: ( -- FiniteBits nid - Ord nid - ) => - KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] -kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) - ++ rank space nid (L.concat everyone) - where - (bucket,everyone) = - L.splitAt 1 - . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) - $ tbl - - - -{----------------------------------------------------------------------- --- Routing ------------------------------------------------------------------------} - -splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => - ( Reifies s (Compare ni) ) => - (ni -> Word -> Bool) - -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ] -splitTip testNodeBit ni i bucket - | testNodeBit ni i = [zeros , ones ] - | otherwise = [ones , zeros ] - where - (ones, zeros) = split testNodeBit i bucket - --- | Used in each query. --- --- TODO: Kademlia non-empty subtrees should should split if they have less than --- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia --- paper. The rule requiring additional splits is in section 2.4. -modifyBucket - :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => - forall ni nid xs. - KademliaSpace nid ni - -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni) -modifyBucket space nid f (BucketList self bkts) - = second (BucketList self) <$> go (0 :: BitIx) bkts - where - d = kademliaXor space nid (kademliaLocation space self) - - -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni]) - - go !i (bucket : buckets@(_:_)) - | kademliaTestBit space d i = second (: buckets) <$> f bucket - | otherwise = second (bucket :) <$> go (succ i) buckets - - go !i [bucket] = second (: []) <$> f bucket <|> gosplit - where - gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space - . kademliaLocation space ) - self - i - bucket) - | otherwise = Nothing -- Limit the number of buckets. - - -bktCount :: BucketList ni -> Int -bktCount (BucketList _ bkts) = L.length bkts - --- | Triggering event for atomic table update -data Event ni = TryInsert { foreignNode :: ni } - | PingResult { foreignNode :: ni , ponged :: Bool } - -#if 0 -deriving instance Eq (NodeId) => Eq (Event) -deriving instance ( Show ip - , Show (NodeId) - , Show u - ) => Show (Event) - -#endif - -eventId :: (ni -> nid) -> Event ni -> nid -eventId nodeId (TryInsert ni) = nodeId ni -eventId nodeId (PingResult ni _) = nodeId ni - - --- | Actions requested by atomic table update -data CheckPing ni = CheckPing [ni] - -#if 0 - -deriving instance Eq (NodeId) => Eq (CheckPing) -deriving instance ( Show ip - , Show (NodeId) - , Show u - ) => Show (CheckPing) - -#endif - - --- | Call on every inbound packet (including requested ping results). --- Returns a triple (was_inserted, to_ping, tbl') where --- --- [ /was_inserted/ ] True if the node was added to the routing table. --- --- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'. --- This will be empty if /was_inserted/, but a non-inserted node --- may be added to a replacement queue and will be inserted if --- one of the items in this list time out. --- --- [ /tbl'/ ] The updated routing 'BucketList'. --- -updateForInbound :: - KademliaSpace nid ni - -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni) -updateForInbound space tm ni tbl@(BucketList _ bkts) = - maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl')) - $ modifyBucket space - (kademliaLocation space ni) - (updateBucketForInbound tm ni) - tbl - --- | Update the routing table with the results of a ping. --- --- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the --- routing table and the node /b/, with timestamp /tm/, has taken its place. -updateForPingResult :: - KademliaSpace nid ni - -> ni -- ^ The pinged node. - -> Bool -- ^ True if we got a reply, False if it timed out. - -> BucketList ni -- ^ The routing table. - -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni ) -updateForPingResult space ni got_reply tbl = - fromMaybe ([],tbl) - $ modifyBucket space - (kademliaLocation space ni) - (updateBucketForPingResult ni got_reply) - tbl - - -{----------------------------------------------------------------------- --- Conversion ------------------------------------------------------------------------} - -type TableEntry ni = (ni, Timestamp) - -tableEntry :: NodeEntry ni -> TableEntry ni -tableEntry (a :-> b) = (a, b) - -toList :: BucketList ni -> [[TableEntry ni]] -toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts - -data KademliaSpace nid ni = KademliaSpace - { -- | Given a node record (probably including IP address), yields a - -- kademlia xor-metric location. - kademliaLocation :: ni -> nid - -- | Used when comparing locations. This is similar to - -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so - -- that 0 is the most significant bit. - , kademliaTestBit :: nid -> Word -> Bool - -- | The Kademlia xor-metric. - , kademliaXor :: nid -> nid -> nid - - , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid - } - -instance Contravariant (KademliaSpace nid) where - contramap f ks = ks - { kademliaLocation = kademliaLocation ks . f - } - diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs deleted file mode 100644 index 1be1afc1..00000000 --- a/src/Network/Kademlia/Search.hs +++ /dev/null @@ -1,236 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE PatternSynonyms #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE LambdaCase #-} -module Network.Kademlia.Search where - -import Control.Concurrent.Tasks -import Control.Concurrent.STM -import Control.Monad -import Data.Function -import Data.Maybe -import qualified Data.Set as Set - ;import Data.Set (Set) -import Data.Hashable (Hashable(..)) -- for type sigs -import System.IO.Error - -import qualified Data.MinMaxPSQ as MM - ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') -import qualified Data.Wrapper.PSQ as PSQ - ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) -import Network.Kademlia.Routing as R -#ifdef THREAD_DEBUG -import Control.Concurrent.Lifted.Instrument -#else -import Control.Concurrent.Lifted -import GHC.Conc (labelThread) -#endif - -data Search nid addr tok ni r = Search - { searchSpace :: KademliaSpace nid ni - , searchNodeAddress :: ni -> addr - , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok))) - (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ()) - , searchAlpha :: Int -- α = 8 - -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on - -- how fast the queries are. For Tox's much slower onion-routed queries, we - -- need to ensure that closer non-responding queries don't completely push out - -- farther away queries. - -- - -- For BitTorrent, setting them both 8 was not an issue, but that is no longer - -- supported because now the number of remembered informants is now the - -- difference between these two numbers. So, if searchK = 16 and searchAlpha = - -- 4, then the number of remembered query responses is 12. - , searchK :: Int -- K = 16 - } - -data SearchState nid addr tok ni r = SearchState - { -- | The number of pending queries. Incremented before any query is sent - -- and decremented when we get a reply. - searchPendingCount :: TVar Int - -- | Nodes scheduled to be queried (roughly at most K). - , searchQueued :: TVar (MinMaxPSQ ni nid) - -- | The nearest (K - α) nodes that issued a reply. - -- - -- α is the maximum number of simultaneous queries. - , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok)) - -- | This tracks already-queried addresses so we avoid bothering them - -- again. XXX: We could probably keep only the pending queries in this - -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha - -- should limit the number of outstanding queries. - , searchVisited :: TVar (Set addr) - , searchSpec :: Search nid addr tok ni r - } - - -newSearch :: ( Ord addr - , PSQKey nid - , PSQKey ni - ) => - {- - KademliaSpace nid ni - -> (ni -> addr) - -> (ni -> IO ([ni], [r])) -- the query action. - -> (r -> STM Bool) -- receives search results. - -> nid -- target of search - -} - Search nid addr tok ni r - -> nid - -> [ni] -- Initial nodes to query. - -> STM (SearchState nid addr tok ni r) -newSearch s@(Search space nAddr qry _ _) target ns = do - c <- newTVar 0 - q <- newTVar $ MM.fromList - $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) - $ ns - i <- newTVar MM.empty - v <- newTVar Set.empty - return -- (Search space nAddr qry) , r , target - ( SearchState c q i v s ) - --- | Discard a value from a key-priority-value tuple. This is useful for --- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". -stripValue :: Binding' k p v -> Binding k p -stripValue (Binding ni _ nid) = (ni :-> nid) - --- | Reset a 'SearchState' object to ready it for a repeated search. -reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => - (nid -> STM [ni]) - -> Search nid addr1 tok1 ni r1 - -> nid - -> SearchState nid addr tok ni r - -> STM (SearchState nid addr tok ni r) -reset nearestNodes qsearch target st = do - searchIsFinished st >>= check -- Wait for a search to finish before resetting. - bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) - <$> nearestNodes target - priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) - writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes - writeTVar (searchInformant st) MM.empty - writeTVar (searchVisited st) Set.empty - writeTVar (searchPendingCount st) 0 - return st - -sendAsyncQuery :: forall addr nid tok ni r. - ( Ord addr - , PSQKey nid - , PSQKey ni - , Show nid - ) => - Search nid addr tok ni r - -> nid - -> (r -> STM Bool) -- ^ return False to terminate the search. - -> SearchState nid addr tok ni r - -> Binding ni nid - -> TaskGroup - -> IO () -sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g = - case searchQuery of - Left blockingQuery -> - forkTask g "searchQuery" $ do - myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) - reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing) - atomically $ do - modifyTVar searchPendingCount pred - maybe (return ()) go reply - Right nonblockingQuery -> do - nonblockingQuery searchTarget ni $ \reply -> - atomically $ do - modifyTVar searchPendingCount pred - maybe (return ()) go reply - where - go (ns,rs,tok) = do - vs <- readTVar searchVisited - -- We only queue a node if it is not yet visited - let insertFoundNode :: Int - -> ni - -> MinMaxPSQ ni nid - -> MinMaxPSQ ni nid - insertFoundNode k n q - | searchNodeAddress n `Set.member` vs - = q - | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget - $ kademliaLocation searchSpace n ) - q - - qsize0 <- MM.size <$> readTVar searchQueued - let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow - -- only when there's fewer than - -- K elements. - modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns - modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d - flip fix rs $ \loop -> \case - r:rs' -> do - wanting <- searchResult r - if wanting then loop rs' - else searchCancel sch - [] -> return () - - -searchIsFinished :: ( PSQKey nid - , PSQKey ni - ) => SearchState nid addr tok ni r -> STM Bool -searchIsFinished SearchState{..} = do - q <- readTVar searchQueued - cnt <- readTVar searchPendingCount - informants <- readTVar searchInformant - return $ cnt == 0 - && ( MM.null q - || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) - && ( PSQ.prio (fromJust $ MM.findMax informants) - <= PSQ.prio (fromJust $ MM.findMin q)))) - -searchCancel :: SearchState nid addr tok ni r -> STM () -searchCancel SearchState{..} = do - writeTVar searchPendingCount 0 - writeTVar searchQueued MM.empty - -search :: - ( Ord r - , Ord addr - , PSQKey nid - , PSQKey ni - , Show nid - ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) -search sch buckets target result = do - let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets - st <- atomically $ newSearch sch target ns - forkIO $ searchLoop sch target result st - return st - -searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) - => Search nid addr tok ni r -- ^ Query and distance methods. - -> nid -- ^ The target we are searching for. - -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. - -> SearchState nid addr tok ni r -- ^ Search-related state. - -> IO () -searchLoop sch@Search{..} target result s@SearchState{..} = do - myThreadId >>= flip labelThread ("search."++show target) - withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do - join $ atomically $ do - cnt <- readTVar $ searchPendingCount - check (cnt <= 8) -- Only 8 pending queries at a time. - informants <- readTVar searchInformant - found <- MM.minView <$> readTVar searchQueued - case found of - Just (ni :-> d, q) - | -- If there's fewer than /k - α/ informants and there's any - -- node we haven't yet got a response from. - (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) - -- Or there's no informants yet at all. - || MM.null informants - -- Or if the closest scheduled node is nearer than the - -- nearest /k/ informants. - || (d < PSQ.prio (fromJust $ MM.findMax informants)) - -> -- Then the search continues, send a query. - do writeTVar searchQueued q - modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) - modifyTVar searchPendingCount succ - return $ do - sendAsyncQuery sch target result s (ni :-> d) g - again - _ -> -- Otherwise, we are finished. - do check (cnt == 0) - return $ return () -- cgit v1.2.3