From 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sat, 28 Sep 2019 13:43:29 -0400 Subject: Factor out some new libraries word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search --- kad/CHANGELOG.md | 5 + kad/LICENSE | 30 ++ kad/Setup.hs | 2 + kad/kad.cabal | 76 +++ kad/src/DebugTag.hs | 24 + kad/src/Network/Kademlia.hs | 163 +++++++ kad/src/Network/Kademlia/Bootstrap.hs | 439 +++++++++++++++++ kad/src/Network/Kademlia/CommonAPI.hs | 84 ++++ kad/src/Network/Kademlia/Persistence.hs | 52 ++ kad/src/Network/Kademlia/Routing.hs | 809 ++++++++++++++++++++++++++++++++ kad/src/Network/Kademlia/Search.hs | 236 ++++++++++ 11 files changed, 1920 insertions(+) create mode 100644 kad/CHANGELOG.md create mode 100644 kad/LICENSE create mode 100644 kad/Setup.hs create mode 100644 kad/kad.cabal create mode 100644 kad/src/DebugTag.hs create mode 100644 kad/src/Network/Kademlia.hs create mode 100644 kad/src/Network/Kademlia/Bootstrap.hs create mode 100644 kad/src/Network/Kademlia/CommonAPI.hs create mode 100644 kad/src/Network/Kademlia/Persistence.hs create mode 100644 kad/src/Network/Kademlia/Routing.hs create mode 100644 kad/src/Network/Kademlia/Search.hs (limited to 'kad') diff --git a/kad/CHANGELOG.md b/kad/CHANGELOG.md new file mode 100644 index 00000000..6255a362 --- /dev/null +++ b/kad/CHANGELOG.md @@ -0,0 +1,5 @@ +# Revision history for kad + +## 0.1.0.0 -- YYYY-mm-dd + +* First version. Released on an unsuspecting world. diff --git a/kad/LICENSE b/kad/LICENSE new file mode 100644 index 00000000..e8eaef49 --- /dev/null +++ b/kad/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2019, James Crayne + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of James Crayne nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/kad/Setup.hs b/kad/Setup.hs new file mode 100644 index 00000000..9a994af6 --- /dev/null +++ b/kad/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/kad/kad.cabal b/kad/kad.cabal new file mode 100644 index 00000000..5babda13 --- /dev/null +++ b/kad/kad.cabal @@ -0,0 +1,76 @@ +-- Initial kad.cabal generated by cabal init. For further documentation, +-- see http://haskell.org/cabal/users-guide/ + +name: kad +version: 0.1.0.0 +-- synopsis: +-- description: +license: BSD3 +license-file: LICENSE +author: James Crayne +maintainer: jim.crayne@gmail.com +-- copyright: +-- category: +build-type: Simple +extra-source-files: CHANGELOG.md +cabal-version: >=1.10 + +library + cpp-options: -DTHREAD_DEBUG + exposed-modules: + Network.Kademlia + , Network.Kademlia.Bootstrap + , Network.Kademlia.Routing + , Network.Kademlia.CommonAPI + , Network.Kademlia.Persistence + , Network.Kademlia.Search + other-modules: DebugTag + other-extensions: + CPP + , ConstraintKinds + , DeriveFunctor + , DeriveTraversable + , FlexibleContexts + , GADTs + , KindSignatures + , LambdaCase + , NamedFieldPuns + , PartialTypeSignatures + , PatternSynonyms + , RankNTypes + , ScopedTypeVariables + , RecordWildCards + , BangPatterns + , ViewPatterns + , TypeOperators + , DeriveGeneric + , TupleSections + , StandaloneDeriving + , MultiParamTypeClasses + , FlexibleInstances + , ExistentialQuantification + build-depends: + base + , tox-crypto + , entropy + , lifted-base + , lifted-concurrent + , aeson + , vector + , containers + , unordered-containers + , dput-hslogger + , time + , stm + , pretty + , bytestring + , hashable + , contravariant + , reflection + , psq-wrap + , minmax-psq + , network-addr + , cereal + , tasks + hs-source-dirs: src + default-language: Haskell2010 diff --git a/kad/src/DebugTag.hs b/kad/src/DebugTag.hs new file mode 100644 index 00000000..9ac04bb0 --- /dev/null +++ b/kad/src/DebugTag.hs @@ -0,0 +1,24 @@ +module DebugTag where + +import Data.Typeable + +-- | Debug Tags, add more as needed, but ensure XAnnounce is always first, XMisc last +data DebugTag + = XAnnounce + | XBitTorrent + | XDHT + | XLan + | XMan + | XNetCrypto + | XNetCryptoOut + | XOnion + | XRoutes + | XPing + | XRefresh + | XJabber + | XTCP + | XMisc + | XNodeinfoSearch + | XUnexpected -- Used only for special anomalous errors that we didn't expect to happen. + | XUnused -- Never commit code that uses XUnused. + deriving (Eq, Ord, Show, Read, Enum, Bounded,Typeable) diff --git a/kad/src/Network/Kademlia.hs b/kad/src/Network/Kademlia.hs new file mode 100644 index 00000000..e61afe9b --- /dev/null +++ b/kad/src/Network/Kademlia.hs @@ -0,0 +1,163 @@ +{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE DeriveFunctor, DeriveTraversable #-} +-- {-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PatternSynonyms #-} +module Network.Kademlia where + +import Data.Maybe +import Data.Time.Clock.POSIX +import Network.Kademlia.Routing as R +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif +import Control.Concurrent.STM +import Control.Monad +import Data.Time.Clock.POSIX (POSIXTime) + +-- | The status of a given node with respect to a given routint table. +data RoutingStatus + = Stranger -- ^ The node is unknown to the Kademlia routing table. + | Applicant -- ^ The node may be inserted pending a ping timeout. + | Accepted -- ^ The node has a slot in one of the Kademlia buckets. + deriving (Eq,Ord,Enum,Show,Read) + +-- | A change occured in the kademlia routing table. +data RoutingTransition ni = RoutingTransition + { transitioningNode :: ni + , transitionedTo :: !RoutingStatus + } + deriving (Eq,Ord,Show,Read) + +data InsertionReporter ni = InsertionReporter + { -- | Called on every inbound packet. Accepts: + -- + -- * Origin of packet. + -- + -- * List of nodes to be pinged as a result. + reportArrival :: POSIXTime + -> ni + -> [ni] + -> IO () + -- | Called on every ping probe. Accepts: + -- + -- * Who was pinged. + -- + -- * True Bool value if they ponged. + , reportPingResult :: POSIXTime + -> ni + -> Bool + -> IO () + } + +quietInsertions :: InsertionReporter ni +quietInsertions = InsertionReporter + { reportArrival = \_ _ _ -> return () + , reportPingResult = \_ _ _ -> return () + } + +contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t +contramapIR f ir = InsertionReporter + { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) + , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b + } + +-- | All the IO operations necessary to maintain a Kademlia routing table. +data TableStateIO ni = TableStateIO + { -- | Write the routing table. Typically 'writeTVar'. + tblWrite :: R.BucketList ni -> STM () + + -- | Read the routing table. Typically 'readTVar'. + , tblRead :: STM (R.BucketList ni) + + -- | Issue a ping to a remote node and report 'True' if the node + -- responded within an acceptable time and 'False' otherwise. + , tblPing :: ni -> IO Bool + + -- | Convenience method provided to assist in maintaining state + -- consistent with the routing table. It will be invoked in the same + -- transaction that 'tblRead'\/'tblWrite' occured but only when there was + -- an interesting change. The returned IO action will be triggered soon + -- afterward. + -- + -- It is not necessary to do anything interesting here. The following + -- trivial implementation is fine: + -- + -- > tblTransition = const $ return $ return () + , tblTransition :: RoutingTransition ni -> STM (IO ()) + } + +vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni +vanillaIO var ping = TableStateIO + { tblRead = readTVar var + , tblWrite = writeTVar var + , tblPing = ping + , tblTransition = const $ return $ return () + } + +-- | Everything necessary to maintain a routing table of /ni/ (node +-- information) entries. +data Kademlia nid ni = Kademlia { kademInsertionReporter :: InsertionReporter ni + , kademSpace :: KademliaSpace nid ni + , kademIO :: TableStateIO ni + } + + +-- Helper to 'insertNode'. +-- +-- Adapt return value from 'updateForPingResult' into a +-- more easily grokked list of transitions. +transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni] +transition (x,m) = + -- Just _ <- m = Node transition: Accepted --> Stranger + -- Nothing <- m = Node transition: Applicant --> Stranger + RoutingTransition x Stranger + : maybeToList (accepted <$> m) + +-- Helper to 'transition' +-- +-- Node transition: Applicant --> Accepted +accepted :: (t,ni) -> RoutingTransition ni +accepted (_,y) = RoutingTransition y Accepted + + +insertNode :: Kademlia nid ni -> ni -> IO () +insertNode (Kademlia reporter space io) node = do + + tm <- getPOSIXTime + + (ps,reaction) <- atomically $ do + tbl <- tblRead io + let (inserted, ps,t') = R.updateForInbound space tm node tbl + tblWrite io t' + reaction <- case ps of + _ | inserted -> -- Node transition: Stranger --> Accepted + tblTransition io $ RoutingTransition node Accepted + (_:_) -> -- Node transition: Stranger --> Applicant + tblTransition io $ RoutingTransition node Applicant + _ -> return $ return () + return (ps, reaction) + + reportArrival reporter tm node ps + reaction + + _ <- fork $ do + myThreadId >>= flip labelThread "pingResults" + forM_ ps $ \n -> do + b <- tblPing io n + reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result + join $ atomically $ do + tbl <- tblRead io + let (replacements, t') = R.updateForPingResult space n b tbl + tblWrite io t' + ios <- sequence $ concatMap + (map (tblTransition io) . transition) + replacements + return $ sequence_ ios + + return () + diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs new file mode 100644 index 00000000..08ba3318 --- /dev/null +++ b/kad/src/Network/Kademlia/Bootstrap.hs @@ -0,0 +1,439 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE NondecreasingIndentation #-} +{-# LANGUAGE PartialTypeSignatures #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +module Network.Kademlia.Bootstrap where + +import Data.Function +import Data.Maybe +import qualified Data.Set as Set +import Data.Time.Clock.POSIX (getPOSIXTime) +import Network.Kademlia.Routing as R +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif +import Control.Concurrent.STM +import Control.Monad +import Data.Hashable +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Ord +import System.Entropy +import System.Timeout +import DPut +import DebugTag + +import qualified Data.Wrapper.PSQInt as Int + ;import Data.Wrapper.PSQInt (pattern (:->)) +import Network.Address (bucketRange) +import Network.Kademlia.Search +import Control.Concurrent.Tasks +import Network.Kademlia + +type SensibleNodeId nid ni = + ( Show nid + , Ord nid + , Ord ni + , Hashable nid + , Hashable ni ) + +data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher + { -- | A staleness threshold (if a bucket goes this long without being + -- touched, a refresh will be triggered). + refreshInterval :: POSIXTime + -- | A TVar with the time-to-refresh schedule for each bucket. + -- + -- To "touch" a bucket and prevent it from being refreshed, reschedule + -- its refresh time to some time into the future by modifying its + -- priority in this priority search queue. + , refreshQueue :: TVar (Int.PSQ POSIXTime) + -- | This is the kademlia node search specification. + , refreshSearch :: Search nid addr tok ni ni + -- | The current kademlia routing table buckets. + , refreshBuckets :: TVar (R.BucketList ni) + -- | Action to ping a node. This is used only during initial bootstrap + -- to get some nodes in our table. A 'True' result is interpreted as a a + -- pong, where 'False' is a non-response. + , refreshPing :: ni -> IO Bool + , -- | Timestamp of last bucket event. + refreshLastTouch :: TVar POSIXTime + , -- | This variable indicates whether or not we are in bootstrapping mode. + bootstrapMode :: TVar Bool + , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on + -- every finished refresh. + bootstrapCountdown :: TVar (Maybe Int) + } + +newBucketRefresher :: ( Ord addr, Hashable addr + , SensibleNodeId nid ni ) + => TVar (R.BucketList ni) + -> Search nid addr tok ni ni + -> (ni -> IO Bool) + -> STM (BucketRefresher nid ni) +newBucketRefresher bkts sch ping = do + let spc = searchSpace sch + nodeId = kademliaLocation spc + -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount + sched <- newTVar Int.empty + lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... + bootstrapVar <- newTVar True -- Start in bootstrapping mode. + bootstrapCnt <- newTVar Nothing + return BucketRefresher + { refreshInterval = 15 * 60 + , refreshQueue = sched + , refreshSearch = sch + , refreshBuckets = bkts + , refreshPing = ping + , refreshLastTouch = lasttouch + , bootstrapMode = bootstrapVar + , bootstrapCountdown = bootstrapCnt + } + +-- | This was added to avoid the compile error "Record update for +-- insufficiently polymorphic field" when trying to update the existentially +-- quantified field 'refreshSearch'. +updateRefresherIO :: Ord addr + => Search nid addr tok ni ni + -> (ni -> IO Bool) + -> BucketRefresher nid ni -> BucketRefresher nid ni +updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher + { refreshSearch = sch + , refreshPing = ping + , refreshInterval = refreshInterval + , refreshBuckets = refreshBuckets + , refreshQueue = refreshQueue + , refreshLastTouch = refreshLastTouch + , bootstrapMode = bootstrapMode + , bootstrapCountdown = bootstrapCountdown + } + +-- | Fork a refresh loop. Kill the returned thread to terminate it. +forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId +forkPollForRefresh r@BucketRefresher{ refreshInterval + , refreshQueue + , refreshBuckets + , refreshSearch } = fork $ do + myThreadId >>= flip labelThread "pollForRefresh" + fix $ \again -> do + join $ atomically $ do + nextup <- Int.findMin <$> readTVar refreshQueue + maybe retry (return . go again) nextup + where + refresh :: Int -> IO Int + refresh n = do + -- dput XRefresh $ "Refresh time! "++ show n + refreshBucket r n + + go again ( bktnum :-> refresh_time ) = do + now <- getPOSIXTime + case fromEnum (refresh_time - now) of + x | x <= 0 -> do -- Refresh time! + -- Move it to the back of the refresh queue. + atomically $ do + interval <- effectiveRefreshInterval r bktnum + modifyTVar' refreshQueue + $ Int.insert bktnum (now + interval) + -- Now fork the refresh operation. + -- TODO: We should probably propogate the kill signal to this thread. + fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) + _ <- refresh bktnum + return () + return () + picoseconds -> do + -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum + threadDelay ( picoseconds `div` 10^6 ) + again + + +-- | This is a helper to 'refreshBucket' which does some book keeping to decide +-- whether or not a bucket is sufficiently refreshed or not. It will return +-- false when we can terminate a node search. +checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. + -> TVar (BucketList ni) -- ^ The current routing table. + -> TVar (Set.Set ni) -- ^ In-range nodes found so far. + -> TVar Bool -- ^ The result will also be written here. + -> Int -- ^ The bucket number of interest. + -> ni -- ^ A newly found node. + -> STM Bool +checkBucketFull space var resultCounter fin n found_node = do + let fullcount = R.defaultBucketSize + saveit True = writeTVar fin True >> return True + saveit _ = return False + tbl <- readTVar var + let counts = R.shape tbl + nid = kademliaLocation space found_node + -- Update the result set with every found node that is in the + -- bucket of interest. + when (n == R.bucketNumber space nid tbl) + $ modifyTVar' resultCounter (Set.insert found_node) + resultCount <- readTVar resultCounter + saveit $ case drop (n - 1) counts of + (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going + _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going + _ -> False -- okay, good enough, let's quit. + +-- | Called from 'refreshBucket' with the current time when a refresh of the +-- supplied bucket number finishes. +onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) +onFinishedRefresh BucketRefresher { bootstrapCountdown + , bootstrapMode + , refreshQueue + , refreshBuckets } num now = do + bootstrapping <- readTVar bootstrapMode + if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num + else do + tbl <- readTVar refreshBuckets + action <- + if num /= R.bktCount tbl - 1 + then do modifyTVar' bootstrapCountdown (fmap pred) + return $ return () -- dput XRefresh $ "BOOTSTRAP decrement" + else do + -- The last bucket finished. + cnt <- readTVar bootstrapCountdown + case cnt of + Nothing -> do + let fullsize = R.defaultBucketSize + notfull (n,len) | n==num = False + | len>=fullsize = False + | otherwise = True + unfull = case filter notfull $ zip [0..] (R.shape tbl) of + [] -> [(0,0)] -- Schedule at least 1 more refresh. + xs -> xs + forM_ unfull $ \(n,_) -> do + -- Schedule immediate refresh for unfull buckets (other than this one). + modifyTVar' refreshQueue $ Int.insert n (now - 1) + writeTVar bootstrapCountdown $! Just $! length unfull + return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull + Just n -> do writeTVar bootstrapCountdown $! Just $! pred n + return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)" + cnt <- readTVar bootstrapCountdown + if (cnt == Just 0) + then do + -- Boostrap finished! + writeTVar bootstrapMode False + writeTVar bootstrapCountdown Nothing + return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." + else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) + +refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => + BucketRefresher nid ni -> Int -> IO Int +refreshBucket r@BucketRefresher{ refreshSearch = sch + , refreshBuckets = var } + n = do + tbl <- atomically (readTVar var) + let count = bktCount tbl + nid = kademliaLocation (searchSpace sch) (thisNode tbl) + sample <- if n+1 >= count -- Is this the last bucket? + then return nid -- Yes? Search our own id. + else kademliaSample (searchSpace sch) -- No? Generate a random id. + getEntropy + nid + (bucketRange n (n + 1 < count)) + fin <- atomically $ newTVar False + resultCounter <- atomically $ newTVar Set.empty + + dput XRefresh $ "Start refresh " ++ show (n,sample) + + -- Set 15 minute timeout in order to avoid overlapping refreshes. + s <- search sch tbl sample $ if n+1 == R.defaultBucketCount + then const $ return True -- Never short-circuit the last bucket. + else checkBucketFull (searchSpace sch) var resultCounter fin n + _ <- timeout (15*60*1000000) $ do + atomically $ searchIsFinished s >>= check + atomically $ searchCancel s + dput XDHT $ "Finish refresh " ++ show (n,sample) + now <- getPOSIXTime + join $ atomically $ onFinishedRefresh r n now + rcount <- atomically $ do + c <- Set.size <$> readTVar resultCounter + b <- readTVar fin + return $ if b then 1 else c + return rcount + +refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () +refreshLastBucket r@BucketRefresher { refreshBuckets + , refreshQueue } = do + + now <- getPOSIXTime + atomically $ do + cnt <- bktCount <$> readTVar refreshBuckets + -- Schedule immediate refresh. + modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) + +restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => + BucketRefresher nid ni -> STM (IO ()) +restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do + unchanged <- readTVar bootstrapMode + writeTVar bootstrapMode True + writeTVar bootstrapCountdown Nothing + if not unchanged then return $ do + dput XRefresh "BOOTSTRAP entered bootstrap mode" + refreshLastBucket r + else return $ dput XRefresh "BOOTSTRAP already bootstrapping" + +bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => + BucketRefresher nid ni + -> t1 ni -- ^ Nodes to bootstrap from. + -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. + -> IO () +bootstrap r@BucketRefresher { refreshSearch = sch + , refreshBuckets = var + , refreshPing = ping + , bootstrapMode } ns ns0 = do + gotPing <- atomically $ newTVar False + + -- First, ping the given nodes so that they are added to + -- our routing table. + withTaskGroup "bootstrap.resume" 20 $ \g -> do + forM_ ns $ \n -> do + let lbl = show $ kademliaLocation (searchSpace sch) n + forkTask g lbl $ do + b <- ping n + when b $ atomically $ writeTVar gotPing True + + -- We resort to the hardcoded fallback nodes only when we got no + -- responses. This is to lesson the burden on well-known boostrap + -- nodes. + fallback <- atomically (readTVar gotPing) >>= return . when . not + fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do + forM_ ns0 $ \n -> do + forkTask g (show $ kademliaLocation (searchSpace sch) n) + (void $ ping n) + dput XDHT "Finished bootstrap pings." + -- Now search our own Id by entering bootstrap mode from non-bootstrap mode. + join $ atomically $ do + writeTVar bootstrapMode False + restartBootstrap r + -- + -- Hopefully 'forkPollForRefresh' was invoked and can take over + -- maintenance. + + +effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime +effectiveRefreshInterval BucketRefresher{ refreshInterval + , refreshBuckets + , bootstrapMode } num = do + tbl <- readTVar refreshBuckets + bootstrapping <- readTVar bootstrapMode + case bootstrapping of + False -> return refreshInterval + True -> do + -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds. + let fullcount = R.defaultBucketSize + count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl + if count == fullcount + then return refreshInterval + else return 15 -- seconds + + + +-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket +-- changes. This will typically be invoked from 'tblTransition'. +-- +-- From BEP 05: +-- +-- > Each bucket should maintain a "last changed" property to indicate how +-- > "fresh" the contents are. +-- +-- We will use a "time to next refresh" property instead and store it in +-- a priority search queue. +-- +-- In detail using an expository (not actually implemented) type +-- 'BucketTouchEvent'... +-- +-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus +-- >>> bucketEvents = +-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, +-- >>> +-- >>> , Stranger :--> Accepted -- or a node is added to a bucket, +-- >>> +-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced +-- >>> , Applicant :--> Accepted -- with another node, +-- >>> ] +-- +-- the bucket's last changed property should be updated. Buckets that have not +-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." +-- This is done by picking a random ID in the range of the bucket and +-- performing a find_nodes search on it. +-- +-- The only other possible BucketTouchEvents are as follows: +-- +-- >>> not_handled = +-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: +-- >>> -- (Applicant :--> Stranger) +-- >>> -- (Applicant :--> Accepted) +-- >>> , Accepted :--> Applicant -- Never happens +-- >>> ] +-- +-- Because this BucketTouchEvent type is not actually implemented and we only +-- receive notifications of a node's new state, it suffices to reschedule the +-- bucket refresh 'touchBucket' on every transition to a state other than +-- 'Applicant'. +-- +-- XXX: Unfortunately, this means redundantly triggering twice upon every node +-- replacement because we do not currently distinguish between standalone +-- insertion/deletion events and an insertion/deletion pair constituting +-- replacement. +-- +-- It might also be better to pass the timestamp of the transition here and +-- keep the refresh queue in better sync with the routing table by updating it +-- within the STM monad. +-- +-- We embed the result in the STM monad but currently, no STM state changes +-- occur until the returned IO action is invoked. TODO: simplify? +touchBucket :: SensibleNodeId nid ni + => BucketRefresher nid ni + -> RoutingTransition ni -- ^ What happened to the bucket? + -> STM (IO ()) +touchBucket r@BucketRefresher{ refreshSearch + , refreshInterval + , refreshBuckets + , refreshQueue + , refreshLastTouch + , bootstrapMode + , bootstrapCountdown } + RoutingTransition{ transitionedTo + , transitioningNode } + = case transitionedTo of + Applicant -> return $ return () -- Ignore transition to applicant. + _ -> return $ do -- Reschedule for any other transition. + now <- getPOSIXTime + join $ atomically $ do + let space = searchSpace refreshSearch + nid = kademliaLocation space transitioningNode + tbl <- readTVar refreshBuckets + let num = R.bucketNumber space nid tbl + stamp <- readTVar refreshLastTouch + action <- case stamp /= 0 && (now - stamp > 60) of + True -> do + -- It's been one minute since any bucket has been touched, re-enter bootstrap mode. + restartBootstrap r + False -> return $ return () + interval <- effectiveRefreshInterval r num + modifyTVar' refreshQueue $ Int.insert num (now + interval) + writeTVar refreshLastTouch now + return action + +refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni +refreshKademlia r@BucketRefresher { refreshSearch = sch + , refreshPing = ping + , refreshBuckets = bkts + } + = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping) + { tblTransition = \tr -> do + io <- touchBucket r tr + return io + } diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs new file mode 100644 index 00000000..601be5d8 --- /dev/null +++ b/kad/src/Network/Kademlia/CommonAPI.hs @@ -0,0 +1,84 @@ +{-# LANGUAGE ExistentialQuantification #-} +module Network.Kademlia.CommonAPI where + + +import Control.Concurrent +import Control.Concurrent.STM +import Data.Aeson as J (FromJSON, ToJSON) +import Data.Hashable +import qualified Data.Map as Map +import Data.Serialize as S +import qualified Data.Set as Set +import Data.Time.Clock.POSIX +import Data.Typeable + +import Network.Kademlia.Search +import Network.Kademlia.Routing as R +import Crypto.Tox (SecretKey,PublicKey) + +data DHT = forall nid ni. ( Show ni + , Read ni + , ToJSON ni + , FromJSON ni + , Ord ni + , Hashable ni + , Show nid + , Ord nid + , Hashable nid + , Typeable ni + , S.Serialize nid + ) => + DHT + { dhtBuckets :: TVar (BucketList ni) + , dhtSecretKey :: STM (Maybe SecretKey) + , dhtPing :: Map.Map String (DHTPing ni) + , dhtQuery :: Map.Map String (DHTQuery nid ni) + , dhtAnnouncables :: Map.Map String (DHTAnnouncable nid) + , dhtParseId :: String -> Either String nid + , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni)) + , dhtFallbackNodes :: IO [ni] + , dhtBootstrap :: [ni] -> [ni] -> IO () + } + +data DHTQuery nid ni = forall addr r tok. + ( Ord addr + , Typeable r + , Typeable tok + , Typeable ni + ) => DHTQuery + { qsearch :: Search nid addr tok ni r + , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. + , qshowR :: r -> String + , qshowTok :: tok -> Maybe String + } + +data DHTAnnouncable nid = forall dta tok ni r. + ( Show r + , Typeable dta -- information being announced + , Typeable tok -- token + , Typeable r -- search result + , Typeable ni -- node + ) => DHTAnnouncable + { announceParseData :: String -> Either String dta + , announceParseToken :: dta -> String -> Either String tok + , announceParseAddress :: String -> Either String ni + , announceSendData :: Either ( String {- search name -} + , String -> Either String r + , PublicKey {- me -} -> dta -> r -> IO ()) + (dta -> tok -> Maybe ni -> IO (Maybe r)) + , announceInterval :: POSIXTime + , announceTarget :: dta -> nid + } + +data DHTSearch nid ni = forall addr tok r. DHTSearch + { searchThread :: ThreadId + , searchState :: SearchState nid addr tok ni r + , searchShowTok :: tok -> Maybe String + , searchResults :: TVar (Set.Set String) + } + +data DHTPing ni = forall r. DHTPing + { pingQuery :: [String] -> ni -> IO (Maybe r) + , pingShowResult :: r -> String + } + diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs new file mode 100644 index 00000000..32ec169d --- /dev/null +++ b/kad/src/Network/Kademlia/Persistence.hs @@ -0,0 +1,52 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +module Network.Kademlia.Persistence where + +import Network.Kademlia.CommonAPI +import Network.Kademlia.Routing as R + +import Control.Concurrent.STM +import qualified Data.Aeson as J + ;import Data.Aeson as J (FromJSON) +import qualified Data.ByteString.Lazy as L +import qualified Data.HashMap.Strict as HashMap +import Data.List +import qualified Data.Vector as V +import System.IO.Error + +saveNodes :: String -> DHT -> IO () +saveNodes netname DHT{dhtBuckets} = do + bkts <- atomically $ readTVar dhtBuckets + let ns = map fst $ concat $ R.toList bkts + bs = J.encode ns + fname = nodesFileName netname + L.writeFile fname bs + +loadNodes :: FromJSON ni => String -> IO [ni] +loadNodes netname = do + let fname = nodesFileName netname + attempt <- tryIOError $ do + J.decode <$> L.readFile fname + >>= maybe (ioError $ userError "Nothing") return + either (const $ fallbackLoad fname) return attempt + +nodesFileName :: String -> String +nodesFileName netname = netname ++ "-nodes.json" + +fallbackLoad :: FromJSON t => FilePath -> IO [t] +fallbackLoad fname = do + attempt <- tryIOError $ do + J.decode <$> L.readFile fname + >>= maybe (ioError $ userError "Nothing") return + let go r = do + let m = HashMap.lookup "nodes" (r :: J.Object) + ns0 = case m of Just (J.Array v) -> V.toList v + Nothing -> [] + ns1 = zip (map J.fromJSON ns0) ns0 + issuc (J.Error _,_) = False + issuc _ = True + (ss,fs) = partition issuc ns1 + ns = map (\(J.Success n,_) -> n) ss + mapM_ print (map snd fs) >> return ns + either (const $ return []) go attempt + diff --git a/kad/src/Network/Kademlia/Routing.hs b/kad/src/Network/Kademlia/Routing.hs new file mode 100644 index 00000000..c7fdf028 --- /dev/null +++ b/kad/src/Network/Kademlia/Routing.hs @@ -0,0 +1,809 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- (c) Joe Crayne 2017 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Every node maintains a routing table of known good nodes. The +-- nodes in the routing table are used as starting points for +-- queries in the DHT. Nodes from the routing table are returned in +-- response to queries from other nodes. +-- +-- For more info see: +-- +-- +{-# LANGUAGE CPP #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} +module Network.Kademlia.Routing + {- + ( -- * BucketList + BucketList + , Info(..) + + -- * Attributes + , BucketCount + , defaultBucketCount + , BucketSize + , defaultBucketSize + , NodeCount + + -- * Query + , Network.Kademlia.Routing.null + , Network.Kademlia.Routing.full + , thisId + , shape + , Network.Kademlia.Routing.size + , Network.Kademlia.Routing.depth + , compatibleNodeId + + -- * Lookup + , K + , defaultK + , TableKey (..) + , kclosest + + -- * Construction + , Network.Kademlia.Routing.nullTable + , Event(..) + , CheckPing(..) + , Network.Kademlia.Routing.insert + + -- * Conversion + , Network.Kademlia.Routing.TableEntry + , Network.Kademlia.Routing.toList + + -- * Routing + , Timestamp + , getTimestamp + ) -} where + +import Control.Applicative as A +import Control.Arrow +import Control.Monad +import Data.Function +import Data.Functor.Contravariant +import Data.Functor.Identity +import Data.List as L hiding (insert) +import Data.Maybe +import Data.Monoid +import Data.Wrapper.PSQ as PSQ +import Data.Serialize as S hiding (Result, Done) +import qualified Data.Sequence as Seq +import Data.Time +import Data.Time.Clock.POSIX +import Data.Word +import GHC.Generics +import Text.PrettyPrint as PP hiding ((<>)) +import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) +import qualified Data.ByteString as BS +import Data.Bits +import Data.Ord +import Data.Reflection +import Network.Address +import Data.Typeable +import Data.Coerce +import Data.Hashable + + +-- | Last time the node was responding to our queries. +-- +-- Not all nodes that we learn about are equal. Some are \"good\" and +-- some are not. Many nodes using the DHT are able to send queries +-- and receive responses, but are not able to respond to queries +-- from other nodes. It is important that each node's routing table +-- must contain only known good nodes. A good node is a node has +-- responded to one of our queries within the last 15 minutes. A +-- node is also good if it has ever responded to one of our queries +-- and has sent us a query within the last 15 minutes. After 15 +-- minutes of inactivity, a node becomes questionable. Nodes become +-- bad when they fail to respond to multiple queries in a row. Nodes +-- that we know are good are given priority over nodes with unknown +-- status. +-- +type Timestamp = POSIXTime + +getTimestamp :: IO Timestamp +getTimestamp = do + utcTime <- getCurrentTime + return $ utcTimeToPOSIXSeconds utcTime + + + +{----------------------------------------------------------------------- + Bucket +-----------------------------------------------------------------------} +-- +-- When a k-bucket is full and a new node is discovered for that +-- k-bucket, the least recently seen node in the k-bucket is +-- PINGed. If the node is found to be still alive, the new node is +-- place in a secondary list, a replacement cache. The replacement +-- cache is used only if a node in the k-bucket stops responding. In +-- other words: new nodes are used only when older nodes disappear. + +-- | Timestamp - last time this node is pinged. +type NodeEntry ni = Binding ni Timestamp + + +-- | Maximum number of 'NodeInfo's stored in a bucket. Most clients +-- use this value. +defaultBucketSize :: Int +defaultBucketSize = 8 + +data QueueMethods m elem fifo = QueueMethods + { pushBack :: elem -> fifo -> m fifo + , popFront :: fifo -> m (Maybe elem, fifo) + , emptyQueue :: m fifo + } + +{- +fromQ :: Functor m => + ( a -> b ) + -> ( b -> a ) + -> QueueMethods m elem a + -> QueueMethods m elem b +fromQ embed project QueueMethods{..} = + QueueMethods { pushBack = \e -> fmap embed . pushBack e . project + , popFront = fmap (second embed) . popFront . project + , emptyQueue = fmap embed emptyQueue + } +-} + +seqQ :: QueueMethods Identity ni (Seq.Seq ni) +seqQ = QueueMethods + { pushBack = \e fifo -> pure (fifo Seq.|> e) + , popFront = \fifo -> case Seq.viewl fifo of + e Seq.:< fifo' -> pure (Just e, fifo') + Seq.EmptyL -> pure (Nothing, Seq.empty) + , emptyQueue = pure Seq.empty + } + +type BucketQueue ni = Seq.Seq ni + +bucketQ :: QueueMethods Identity ni (BucketQueue ni) +bucketQ = seqQ + + +data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int) + +contramapC :: (b -> a) -> Compare a -> Compare b +contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b)) + (\s x -> hsh s (f x)) + +newtype Ordered' s a = Ordered a + deriving (Show) + +-- | Hack to avoid UndecidableInstances +newtype Shrink a = Shrink a + deriving (Show) + +type Ordered s a = Ordered' s (Shrink a) + +instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where + a == b = (compare a b == EQ) + +instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where + compare a b = cmp (coerce a) (coerce b) + where Compare cmp _ = reflect (Proxy :: Proxy s) + +instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where + hashWithSalt salt x = hash salt (coerce x) + where Compare _ hash = reflect (Proxy :: Proxy s) + +-- | Bucket is also limited in its length — thus it's called k-bucket. +-- When bucket becomes full, we should split it in two lists by +-- current span bit. Span bit is defined by depth in the routing +-- table tree. Size of the bucket should be choosen such that it's +-- very unlikely that all nodes in bucket fail within an hour of +-- each other. +data Bucket s ni = Bucket + { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes + , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs + } deriving (Generic) + +#define CAN_SHOW_BUCKET 0 + +#if CAN_SHOW_BUCKET +deriving instance Show ni => Show (Bucket s ni) +#endif + +bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni +bucketCompare _ = reflect (Proxy :: Proxy s) + +mapBucket :: ( Reifies s (Compare a) + , Reifies t (Compare ni) + ) => (a -> ni) -> Bucket s a -> Bucket t ni +mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns) + (fmap (second f) q) + where f' = coerce . f . coerce + + +#if 0 + +{- +getGenericNode :: ( Serialize (NodeId) + , Serialize ip + , Serialize u + ) => Get (NodeInfo) +getGenericNode = do + nid <- get + naddr <- get + u <- get + return NodeInfo + { nodeId = nid + , nodeAddr = naddr + , nodeAnnotation = u + } + +putGenericNode :: ( Serialize (NodeId) + , Serialize ip + , Serialize u + ) => NodeInfo -> Put +putGenericNode (NodeInfo nid naddr u) = do + put nid + put naddr + put u + +instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where + get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) + put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes +-} + +#endif + +psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p +psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs + +psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)] +psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq + +-- | Update interval, in seconds. +delta :: NominalDiffTime +delta = 15 * 60 + +-- | Should maintain a set of stable long running nodes. +-- +-- Note: pings are triggerd only when a bucket is full. +updateBucketForInbound :: ( Coercible t1 t + , Alternative f + , Reifies s (Compare t1) + ) => NominalDiffTime -> t1 -> Bucket s t1 -> f ([t], Bucket s t1) +updateBucketForInbound curTime info bucket + -- Just update timestamp if a node is already in bucket. + -- + -- Note PingResult events should only occur for nodes we requested a ping for, + -- and those will always already be in the routing queue and will get their + -- timestamp updated here, since 'TryInsert' is called on every inbound packet, + -- including ping results. + | already_have + = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime ) + -- bucket is good, but not full => we can insert a new node + | PSQ.size (bktNodes bucket) < defaultBucketSize + = pure ( [], map_ns $ PSQ.insert (coerce info) curTime ) + -- If there are any questionable nodes in the bucket have not been + -- seen in the last 15 minutes, the least recently seen node is + -- pinged. If any nodes in the bucket are known to have become bad, + -- then one is replaced by the new node in the next insertBucket + -- iteration. + | not (L.null stales) + = pure ( stales + , bucket { -- Update timestamps so that we don't redundantly ping. + bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket + -- Update queue with the pending NodeInfo in case of ping fail. + , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } ) + -- When the bucket is full of good nodes, the new node is simply discarded. + -- We must return 'A.empty' here to ensure that bucket splitting happens + -- inside 'modifyBucket'. + | otherwise = A.empty + where + -- We (take 1) to keep a 1-to-1 correspondence between pending pings and + -- waiting nodes in the bktQ. This way, we don't have to worry about what + -- to do with failed pings for which there is no ready replacements. + stales = -- One stale: + do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) + guard (t < curTime - delta) + return $ coerce n + -- All stale: + -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket + + already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket) + + map_ns f = bucket { bktNodes = f (bktNodes bucket) } + -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } + +updateBucketForPingResult :: (Applicative f, Reifies s (Compare a)) => + a -> Bool -> Bucket s a -> f ([(a, Maybe (Timestamp, a))], Bucket s a) +updateBucketForPingResult bad_node got_response bucket + = pure ( map (,Nothing) forgotten + ++ map (second Just) replacements + , Bucket (foldr replace + (bktNodes bucket) + replacements) + popped + ) + where + (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) + + -- Dropped from accepted, replaced by pending. + replacements | got_response = [] -- Timestamp was already updated by TryInsert. + | Just info <- top = do + -- Insert only if there's a removal. + _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket) + return (bad_node, info) + | otherwise = [] + + -- Dropped from the pending queue without replacing. + forgotten | got_response = maybeToList $ fmap snd top + | otherwise = [] + + + replace (bad_node, (tm, info)) = + PSQ.insert (coerce info) tm + . PSQ.delete (coerce bad_node) + + +updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp +updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales + +type BitIx = Word + +partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) +partitionQ imp test q0 = do + pass0 <- emptyQueue imp + fail0 <- emptyQueue imp + let flipfix a b f = fix f a b + flipfix q0 (pass0,fail0) $ \rec q qs -> do + (mb,q') <- popFront imp q + case mb of + Nothing -> return qs + Just e -> do qs' <- select (pushBack imp e) qs + rec q' qs' + where + select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) + select f = if test e then \(a,b) -> flip (,) b <$> f a + else \(a,b) -> (,) a <$> f b + + + +split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => + forall ni s. ( Reifies s (Compare ni) ) => + (ni -> Word -> Bool) + -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni) +split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs) + where + (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b + (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b + + spanBit :: ni -> Bool + spanBit entry = testNodeIdBit entry i + + +{----------------------------------------------------------------------- +-- BucketList +-----------------------------------------------------------------------} + +defaultBucketCount :: Int +defaultBucketCount = 20 + +defaultMaxBucketCount :: Word +defaultMaxBucketCount = 24 + +data Info ni nid = Info + { myBuckets :: BucketList ni + , myNodeId :: nid + , myAddress :: SockAddr + } + deriving Generic + +deriving instance (Eq ni, Eq nid) => Eq (Info ni nid) +deriving instance (Show ni, Show nid) => Show (Info ni nid) + +-- instance (Eq ip, Serialize ip) => Serialize (Info ip) + +-- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ +-- 160. The routing table is subdivided into 'Bucket's that each cover +-- a portion of the space. An empty table has one bucket with an ID +-- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" +-- is inserted into the table, it is placed within the bucket that has +-- @min <= N < max@. An empty table has only one bucket so any node +-- must fit within it. Each bucket can only hold 'K' nodes, currently +-- eight, before becoming 'Full'. When a bucket is full of known good +-- nodes, no more nodes may be added unless our own 'NodeId' falls +-- within the range of the 'Bucket'. In that case, the bucket is +-- replaced by two new buckets each with half the range of the old +-- bucket and the nodes from the old bucket are distributed among the +-- two new ones. For a new table with only one bucket, the full bucket +-- is always split into two new buckets covering the ranges @0..2 ^ +-- 159@ and @2 ^ 159..2 ^ 160@. +-- +data BucketList ni = forall s. Reifies s (Compare ni) => + BucketList { thisNode :: !ni + -- | Non-empty list of buckets. + , buckets :: [Bucket s ni] + } + +mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b +mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts) + $ \p -> BucketList + { thisNode = f self + , buckets = map (resolve p . mapBucket f) bkts + } + where + resolve :: Proxy s -> Bucket s ni -> Bucket s ni + resolve = const id + +instance (Eq ni) => Eq (BucketList ni) where + (==) = (==) `on` Network.Kademlia.Routing.toList + +#if 0 + +instance Serialize NominalDiffTime where + put = putWord32be . fromIntegral . fromEnum + get = (toEnum . fromIntegral) <$> getWord32be + +#endif + +#if CAN_SHOW_BUCKET +deriving instance (Show ni) => Show (BucketList ni) +#else +instance Show ni => Show (BucketList ni) where + showsPrec d (BucketList self bkts) = + mappend "BucketList " + . showsPrec (d+1) self + . mappend " (fromList " + . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts) + . mappend ") " +#endif + +#if 0 + +-- | Normally, routing table should be saved between invocations of +-- the client software. Note that you don't need to store /this/ +-- 'NodeId' since it is already included in routing table. +instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList) + +#endif + +-- | Shape of the table. +instance Pretty (BucketList ni) where + pPrint t + | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss + | otherwise = brackets $ + PP.int (L.sum ss) <> " nodes, " <> + PP.int bucketCount <> " buckets" + where + bucketCount = L.length ss + ss = shape t + +-- | Empty table with specified /spine/ node id. +-- +-- XXX: The comparison function argument is awkward here. +nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni +nullTable cmp hsh ni n = + reify (Compare cmp hsh) + $ \p -> BucketList + ni + [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)] + where + empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp + empty = const $ PSQ.empty + +#if 0 + +-- | Test if table is empty. In this case DHT should start +-- bootstrapping process until table becomes 'full'. +null :: BucketList -> Bool +null (Tip _ _ b) = PSQ.null $ bktNodes b +null _ = False + +-- | Test if table have maximum number of nodes. No more nodes can be +-- 'insert'ed, except old ones becomes bad. +full :: BucketList -> Bool +full (Tip _ n _) = n == 0 +full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t +full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t + +-- | Get the /spine/ node id. +thisId :: BucketList -> NodeId +thisId (Tip nid _ _) = nid +thisId (Zero table _) = thisId table +thisId (One _ table) = thisId table + +-- | Number of nodes in a bucket or a table. +type NodeCount = Int + +#endif + +-- | Internally, routing table is similar to list of buckets or a +-- /matrix/ of nodes. This function returns the shape of the matrix. +shape :: BucketList ni -> [Int] +shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl + +#if 0 + +-- | Get number of nodes in the table. +size :: BucketList -> NodeCount +size = L.sum . shape + +-- | Get number of buckets in the table. +depth :: BucketList -> BucketCount +depth = L.length . shape + +#endif + +lookupBucket :: forall ni nid x. + ( -- FiniteBits nid + Ord nid + ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x +lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts + where + d = kademliaXor space nid (kademliaLocation space self) + + go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] + go i bs (bucket : buckets) + | kademliaTestBit space d i = bucket : buckets ++ bs + | otherwise = go (succ i) (bucket:bs) buckets + go _ bs [] = bs + +bucketNumber :: forall ni nid. + KademliaSpace nid ni -> nid -> BucketList ni -> Int +bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts + where + d = kademliaXor space nid (kademliaLocation space self) + + go :: Word -> [Bucket s ni] -> Word + go i (bucket : buckets) + | kademliaTestBit space d i = i + | otherwise = go (succ i) buckets + go i [] = i + + +compatibleNodeId :: forall ni nid. + ( Serialize nid, FiniteBits nid) => + (ni -> nid) -> BucketList ni -> IO nid +compatibleNodeId nodeId tbl = genBucketSample prefix br + where + br = bucketRange (L.length (shape tbl) - 1) True + nodeIdSize = finiteBitSize (undefined :: nid) `div` 8 + bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0 + prefix = either error id $ S.decode bs + +tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8] +tablePrefix testbit = map (packByte . take 8 . (++repeat False)) + . chunksOf 8 + . tableBits testbit + where + packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] + bitmask ix True = bit ix + bitmask _ _ = 0 + +tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool] +tableBits testbit (BucketList self bkts) = + zipWith const (map (testbit self) [0..]) + bkts + +selfNode :: BucketList ni -> ni +selfNode (BucketList self _) = self + +chunksOf :: Int -> [e] -> [[e]] +chunksOf i ls = map (take i) (build (splitter ls)) where + splitter :: [e] -> ([e] -> a -> a) -> a -> a + splitter [] _ n = n + splitter l c n = l `c` splitter (drop i l) c n + +build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] +build g = g (:) [] + + + +-- | Count of closest nodes in find_node reply. +type K = Int + +-- | Default 'K' is equal to 'defaultBucketSize'. +defaultK :: K +defaultK = 8 + +#if 0 +class TableKey dht k where + toNodeId :: k -> NodeId + +instance TableKey dht (NodeId) where + toNodeId = id + +#endif + +-- | In Kademlia, the distance metric is XOR and the result is +-- interpreted as an unsigned integer. +newtype NodeDistance nodeid = NodeDistance nodeid + deriving (Eq, Ord) + +-- | distance(A,B) = |A xor B| Smaller values are closer. +distance :: Bits nid => nid -> nid -> NodeDistance nid +distance a b = NodeDistance $ xor a b + +-- | Order by closeness: nearest nodes first. +rank :: ( Ord nid + ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] +rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) + + +-- | Get a list of /K/ closest nodes using XOR metric. Used in +-- 'find_node' and 'get_peers' queries. +kclosest :: ( -- FiniteBits nid + Ord nid + ) => + KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] +kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) + ++ rank space nid (L.concat everyone) + where + (bucket,everyone) = + L.splitAt 1 + . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) + $ tbl + + + +{----------------------------------------------------------------------- +-- Routing +-----------------------------------------------------------------------} + +splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => + ( Reifies s (Compare ni) ) => + (ni -> Word -> Bool) + -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ] +splitTip testNodeBit ni i bucket + | testNodeBit ni i = [zeros , ones ] + | otherwise = [ones , zeros ] + where + (ones, zeros) = split testNodeBit i bucket + +-- | Used in each query. +-- +-- TODO: Kademlia non-empty subtrees should should split if they have less than +-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia +-- paper. The rule requiring additional splits is in section 2.4. +modifyBucket + :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => + forall ni nid xs. + KademliaSpace nid ni + -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni) +modifyBucket space nid f (BucketList self bkts) + = second (BucketList self) <$> go (0 :: BitIx) bkts + where + d = kademliaXor space nid (kademliaLocation space self) + + -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni]) + + go !i (bucket : buckets@(_:_)) + | kademliaTestBit space d i = second (: buckets) <$> f bucket + | otherwise = second (bucket :) <$> go (succ i) buckets + + go !i [bucket] = second (: []) <$> f bucket <|> gosplit + where + gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space + . kademliaLocation space ) + self + i + bucket) + | otherwise = Nothing -- Limit the number of buckets. + + +bktCount :: BucketList ni -> Int +bktCount (BucketList _ bkts) = L.length bkts + +-- | Triggering event for atomic table update +data Event ni = TryInsert { foreignNode :: ni } + | PingResult { foreignNode :: ni , ponged :: Bool } + +#if 0 +deriving instance Eq (NodeId) => Eq (Event) +deriving instance ( Show ip + , Show (NodeId) + , Show u + ) => Show (Event) + +#endif + +eventId :: (ni -> nid) -> Event ni -> nid +eventId nodeId (TryInsert ni) = nodeId ni +eventId nodeId (PingResult ni _) = nodeId ni + + +-- | Actions requested by atomic table update +data CheckPing ni = CheckPing [ni] + +#if 0 + +deriving instance Eq (NodeId) => Eq (CheckPing) +deriving instance ( Show ip + , Show (NodeId) + , Show u + ) => Show (CheckPing) + +#endif + + +-- | Call on every inbound packet (including requested ping results). +-- Returns a triple (was_inserted, to_ping, tbl') where +-- +-- [ /was_inserted/ ] True if the node was added to the routing table. +-- +-- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'. +-- This will be empty if /was_inserted/, but a non-inserted node +-- may be added to a replacement queue and will be inserted if +-- one of the items in this list time out. +-- +-- [ /tbl'/ ] The updated routing 'BucketList'. +-- +updateForInbound :: + KademliaSpace nid ni + -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni) +updateForInbound space tm ni tbl@(BucketList _ bkts) = + maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl')) + $ modifyBucket space + (kademliaLocation space ni) + (updateBucketForInbound tm ni) + tbl + +-- | Update the routing table with the results of a ping. +-- +-- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the +-- routing table and the node /b/, with timestamp /tm/, has taken its place. +updateForPingResult :: + KademliaSpace nid ni + -> ni -- ^ The pinged node. + -> Bool -- ^ True if we got a reply, False if it timed out. + -> BucketList ni -- ^ The routing table. + -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni ) +updateForPingResult space ni got_reply tbl = + fromMaybe ([],tbl) + $ modifyBucket space + (kademliaLocation space ni) + (updateBucketForPingResult ni got_reply) + tbl + + +{----------------------------------------------------------------------- +-- Conversion +-----------------------------------------------------------------------} + +type TableEntry ni = (ni, Timestamp) + +tableEntry :: NodeEntry ni -> TableEntry ni +tableEntry (a :-> b) = (a, b) + +toList :: BucketList ni -> [[TableEntry ni]] +toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts + +data KademliaSpace nid ni = KademliaSpace + { -- | Given a node record (probably including IP address), yields a + -- kademlia xor-metric location. + kademliaLocation :: ni -> nid + -- | Used when comparing locations. This is similar to + -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so + -- that 0 is the most significant bit. + , kademliaTestBit :: nid -> Word -> Bool + -- | The Kademlia xor-metric. + , kademliaXor :: nid -> nid -> nid + + , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid + } + +instance Contravariant (KademliaSpace nid) where + contramap f ks = ks + { kademliaLocation = kademliaLocation ks . f + } + diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs new file mode 100644 index 00000000..1be1afc1 --- /dev/null +++ b/kad/src/Network/Kademlia/Search.hs @@ -0,0 +1,236 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +module Network.Kademlia.Search where + +import Control.Concurrent.Tasks +import Control.Concurrent.STM +import Control.Monad +import Data.Function +import Data.Maybe +import qualified Data.Set as Set + ;import Data.Set (Set) +import Data.Hashable (Hashable(..)) -- for type sigs +import System.IO.Error + +import qualified Data.MinMaxPSQ as MM + ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') +import qualified Data.Wrapper.PSQ as PSQ + ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) +import Network.Kademlia.Routing as R +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif + +data Search nid addr tok ni r = Search + { searchSpace :: KademliaSpace nid ni + , searchNodeAddress :: ni -> addr + , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok))) + (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ()) + , searchAlpha :: Int -- α = 8 + -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on + -- how fast the queries are. For Tox's much slower onion-routed queries, we + -- need to ensure that closer non-responding queries don't completely push out + -- farther away queries. + -- + -- For BitTorrent, setting them both 8 was not an issue, but that is no longer + -- supported because now the number of remembered informants is now the + -- difference between these two numbers. So, if searchK = 16 and searchAlpha = + -- 4, then the number of remembered query responses is 12. + , searchK :: Int -- K = 16 + } + +data SearchState nid addr tok ni r = SearchState + { -- | The number of pending queries. Incremented before any query is sent + -- and decremented when we get a reply. + searchPendingCount :: TVar Int + -- | Nodes scheduled to be queried (roughly at most K). + , searchQueued :: TVar (MinMaxPSQ ni nid) + -- | The nearest (K - α) nodes that issued a reply. + -- + -- α is the maximum number of simultaneous queries. + , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok)) + -- | This tracks already-queried addresses so we avoid bothering them + -- again. XXX: We could probably keep only the pending queries in this + -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha + -- should limit the number of outstanding queries. + , searchVisited :: TVar (Set addr) + , searchSpec :: Search nid addr tok ni r + } + + +newSearch :: ( Ord addr + , PSQKey nid + , PSQKey ni + ) => + {- + KademliaSpace nid ni + -> (ni -> addr) + -> (ni -> IO ([ni], [r])) -- the query action. + -> (r -> STM Bool) -- receives search results. + -> nid -- target of search + -} + Search nid addr tok ni r + -> nid + -> [ni] -- Initial nodes to query. + -> STM (SearchState nid addr tok ni r) +newSearch s@(Search space nAddr qry _ _) target ns = do + c <- newTVar 0 + q <- newTVar $ MM.fromList + $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) + $ ns + i <- newTVar MM.empty + v <- newTVar Set.empty + return -- (Search space nAddr qry) , r , target + ( SearchState c q i v s ) + +-- | Discard a value from a key-priority-value tuple. This is useful for +-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". +stripValue :: Binding' k p v -> Binding k p +stripValue (Binding ni _ nid) = (ni :-> nid) + +-- | Reset a 'SearchState' object to ready it for a repeated search. +reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => + (nid -> STM [ni]) + -> Search nid addr1 tok1 ni r1 + -> nid + -> SearchState nid addr tok ni r + -> STM (SearchState nid addr tok ni r) +reset nearestNodes qsearch target st = do + searchIsFinished st >>= check -- Wait for a search to finish before resetting. + bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) + <$> nearestNodes target + priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) + writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes + writeTVar (searchInformant st) MM.empty + writeTVar (searchVisited st) Set.empty + writeTVar (searchPendingCount st) 0 + return st + +sendAsyncQuery :: forall addr nid tok ni r. + ( Ord addr + , PSQKey nid + , PSQKey ni + , Show nid + ) => + Search nid addr tok ni r + -> nid + -> (r -> STM Bool) -- ^ return False to terminate the search. + -> SearchState nid addr tok ni r + -> Binding ni nid + -> TaskGroup + -> IO () +sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g = + case searchQuery of + Left blockingQuery -> + forkTask g "searchQuery" $ do + myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) + reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing) + atomically $ do + modifyTVar searchPendingCount pred + maybe (return ()) go reply + Right nonblockingQuery -> do + nonblockingQuery searchTarget ni $ \reply -> + atomically $ do + modifyTVar searchPendingCount pred + maybe (return ()) go reply + where + go (ns,rs,tok) = do + vs <- readTVar searchVisited + -- We only queue a node if it is not yet visited + let insertFoundNode :: Int + -> ni + -> MinMaxPSQ ni nid + -> MinMaxPSQ ni nid + insertFoundNode k n q + | searchNodeAddress n `Set.member` vs + = q + | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget + $ kademliaLocation searchSpace n ) + q + + qsize0 <- MM.size <$> readTVar searchQueued + let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow + -- only when there's fewer than + -- K elements. + modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns + modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d + flip fix rs $ \loop -> \case + r:rs' -> do + wanting <- searchResult r + if wanting then loop rs' + else searchCancel sch + [] -> return () + + +searchIsFinished :: ( PSQKey nid + , PSQKey ni + ) => SearchState nid addr tok ni r -> STM Bool +searchIsFinished SearchState{..} = do + q <- readTVar searchQueued + cnt <- readTVar searchPendingCount + informants <- readTVar searchInformant + return $ cnt == 0 + && ( MM.null q + || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) + && ( PSQ.prio (fromJust $ MM.findMax informants) + <= PSQ.prio (fromJust $ MM.findMin q)))) + +searchCancel :: SearchState nid addr tok ni r -> STM () +searchCancel SearchState{..} = do + writeTVar searchPendingCount 0 + writeTVar searchQueued MM.empty + +search :: + ( Ord r + , Ord addr + , PSQKey nid + , PSQKey ni + , Show nid + ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) +search sch buckets target result = do + let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets + st <- atomically $ newSearch sch target ns + forkIO $ searchLoop sch target result st + return st + +searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) + => Search nid addr tok ni r -- ^ Query and distance methods. + -> nid -- ^ The target we are searching for. + -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. + -> SearchState nid addr tok ni r -- ^ Search-related state. + -> IO () +searchLoop sch@Search{..} target result s@SearchState{..} = do + myThreadId >>= flip labelThread ("search."++show target) + withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do + join $ atomically $ do + cnt <- readTVar $ searchPendingCount + check (cnt <= 8) -- Only 8 pending queries at a time. + informants <- readTVar searchInformant + found <- MM.minView <$> readTVar searchQueued + case found of + Just (ni :-> d, q) + | -- If there's fewer than /k - α/ informants and there's any + -- node we haven't yet got a response from. + (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) + -- Or there's no informants yet at all. + || MM.null informants + -- Or if the closest scheduled node is nearer than the + -- nearest /k/ informants. + || (d < PSQ.prio (fromJust $ MM.findMax informants)) + -> -- Then the search continues, send a query. + do writeTVar searchQueued q + modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) + modifyTVar searchPendingCount succ + return $ do + sendAsyncQuery sch target result s (ni :-> d) g + again + _ -> -- Otherwise, we are finished. + do check (cnt == 0) + return $ return () -- cgit v1.2.3