From 16631d46cd6c64c81cbc7dd3fa33afdeb6ea2366 Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 15 Sep 2017 03:31:29 -0400 Subject: Moved Kademlia to hierarchical location. --- src/Network/Kademlia.hs | 380 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 380 insertions(+) create mode 100644 src/Network/Kademlia.hs (limited to 'src') diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs new file mode 100644 index 00000000..c6c59ae6 --- /dev/null +++ b/src/Network/Kademlia.hs @@ -0,0 +1,380 @@ +{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE DeriveFunctor, DeriveTraversable #-} +-- {-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE PatternSynonyms #-} +module Network.Kademlia where + +import Data.Function +import Data.Maybe +import qualified Data.Set as Set +import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) +import Network.DHT.Routing as R +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif +import Control.Concurrent.STM +import Control.Monad +import Data.Bits +import Data.Hashable +import Data.IP +import Data.Monoid +import Data.Serialize (Serialize) +import Data.Time.Clock.POSIX (POSIXTime) +import qualified Data.Wrapper.PSQInt as Int + ;import Data.Wrapper.PSQInt (pattern (:->)) +import Network.Address (bucketRange,genBucketSample) +import Network.BitTorrent.DHT.Search +import System.Entropy +import System.Timeout +import Text.PrettyPrint as PP hiding (($$), (<>)) +import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) +import System.IO +import Tasks + +-- | The status of a given node with respect to a given routint table. +data RoutingStatus + = Stranger -- ^ The node is unknown to the Kademlia routing table. + | Applicant -- ^ The node may be inserted pending a ping timeout. + | Accepted -- ^ The node has a slot in one of the Kademlia buckets. + deriving (Eq,Ord,Enum,Show,Read) + +-- | A change occured in the kademlia routing table. +data RoutingTransition ni = RoutingTransition + { transitioningNode :: ni + , transitionedTo :: !RoutingStatus + } + deriving (Eq,Ord,Show,Read) + +data InsertionReporter ni = InsertionReporter + { -- | Called on every inbound packet. + reportArrival :: POSIXTime + -> ni -- ^ Origin of packet. + -> [ni] -- ^ These will be pinged as a result. + -> IO () + -- | Called on every ping probe. + , reportPingResult :: POSIXTime + -> ni -- ^ Who was pinged. + -> Bool -- ^ True if they ponged. + -> IO () + } + +quietInsertions :: InsertionReporter ni +quietInsertions = InsertionReporter + { reportArrival = \_ _ _ -> return () + , reportPingResult = \_ _ _ -> return () + } + +contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t +contramapIR f ir = InsertionReporter + { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) + , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b + } + +-- | All the IO operations neccessary to maintain a Kademlia routing table. +data TableStateIO ni = TableStateIO + { -- | Write the routing table. Typically 'writeTVar'. + tblWrite :: R.BucketList ni -> STM () + + -- | Read the routing table. Typically 'readTVar'. + , tblRead :: STM (R.BucketList ni) + + -- | Issue a ping to a remote node and report 'True' if the node + -- responded within an acceptable time and 'False' otherwise. + , tblPing :: ni -> IO Bool + + -- | Convenience method provided to assist in maintaining state + -- consistent with the routing table. It will be invoked in the same + -- transaction that 'tblRead'\/'tblWrite' occured but only when there was + -- an interesting change. The returned IO action will be triggered soon + -- afterward. + -- + -- It is not necessary to do anything interesting here. The following + -- trivial implementation is fine: + -- + -- > tblTransition = const $ return $ return () + , tblTransition :: RoutingTransition ni -> STM (IO ()) + } + +vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni +vanillaIO var ping = TableStateIO + { tblRead = readTVar var + , tblWrite = writeTVar var + , tblPing = ping + , tblTransition = const $ return $ return () + } + +-- | Everything neccessary to maintain a routing table of /ni/ (node +-- information) entries. +data Kademlia nid ni = Kademlia (InsertionReporter ni) + (KademliaSpace nid ni) + (TableStateIO ni) + + +-- Helper to 'insertNode'. +-- +-- Adapt return value from 'updateForPingResult' into a +-- more easily groked list of transitions. +transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni] +transition (x,m) = + -- | Just _ <- m = Node transition: Accepted --> Stranger + -- | Nothing <- m = Node transition: Applicant --> Stranger + RoutingTransition x Stranger + : maybeToList (accepted <$> m) + +-- Helper to 'transition' +-- +-- Node transition: Applicant --> Accepted +accepted :: (t,ni) -> RoutingTransition ni +accepted (_,y) = RoutingTransition y Accepted + + +insertNode :: Kademlia nid ni -> ni -> IO () +insertNode (Kademlia reporter space io) node = do + + tm <- utcTimeToPOSIXSeconds <$> getCurrentTime + + (ps,reaction) <- atomically $ do + tbl <- tblRead io + let (inserted, ps,t') = R.updateForInbound space tm node tbl + tblWrite io t' + reaction <- case ps of + _ | inserted -> -- Node transition: Stranger --> Accepted + tblTransition io $ RoutingTransition node Accepted + (_:_) -> -- Node transition: Stranger --> Applicant + tblTransition io $ RoutingTransition node Applicant + _ -> return $ return () + return (ps, reaction) + + reportArrival reporter tm node ps + reaction + + _ <- fork $ do + myThreadId >>= flip labelThread "pingResults" + forM_ ps $ \n -> do + b <- tblPing io n + reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result + join $ atomically $ do + tbl <- tblRead io + let (replacements, t') = R.updateForPingResult space n b tbl + tblWrite io t' + ios <- sequence $ concatMap + (map (tblTransition io) . transition) + replacements + return $ sequence_ ios + + return () + + +-- TODO: Bootstrap/Refresh +-- +-- From BEP 05: +-- +-- Each bucket should maintain a "last changed" property to indicate how +-- "fresh" the contents are. +-- +-- Note: We will use a "time to next refresh" property instead and store it in +-- a priority search queue. +-- +-- When... +-- +-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus +-- >>> bucketEvents = +-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, +-- >>> +-- >>> , Stranger :--> Accepted -- or a node is added to a bucket, +-- >>> +-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced +-- >>> , Applicant :--> Accepted -- with another node, +-- >>> ] +-- +-- the bucket's last changed property should be updated. Buckets +-- that have not been changed in 15 minutes should be "refreshed." This is done +-- by picking a random ID in the range of the bucket and performing a +-- find_nodes search on it. +-- +-- The only other possible BucketTouchEvents are as follows: +-- +-- >>> not_handled = +-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: +-- >>> -- (Applicant :--> Stranger) +-- >>> -- (Applicant :--> Accepted) +-- >>> , Accepted :--> Applicant -- Never happens +-- >>> ] +-- + +-- XXX: This will be redundantly triggered twice upon every node replacement +-- because we do not currently distinguish between standalone +-- insertion/deletion events and an insertion/deletion pair constituting +-- replacement. +-- +-- It might also be better to pass the timestamp of the transition here and +-- keep the refresh queue in better sync with the routing table by updating it +-- within the STM monad. +touchBucket :: KademliaSpace nid ni + -> POSIXTime + -> TVar (BucketList ni) + -> TVar (Int.PSQ POSIXTime) + -> RoutingTransition ni + -> STM (IO ()) +touchBucket space interval bkts psq tr + | (transitionedTo tr == Applicant) + = return $ return () + | otherwise = return $ do + now <- getPOSIXTime + atomically $ do + let nid = kademliaLocation space (transitioningNode tr) + num <- R.bucketNumber space nid <$> readTVar bkts + modifyTVar' psq $ Int.insert num (now + interval) + +-- | > pollForRefresh interval queue refresh +-- +-- Fork a refresh loop. Kill the returned thread to terminate it. The +-- arguments are: a staleness threshold (if a bucket goes this long without +-- being touched, a refresh will be triggered), a TVar with the time-to-refresh +-- schedule for each bucket, and a refresh action to be forked when a bucket +-- excedes the staleness threshold. +-- +-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's +-- refresh time to some time into the future by modifying the 'Int.PSQ' in the +-- TVar. +forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId +forkPollForRefresh interval psq refresh = do + fork $ do + myThreadId >>= flip labelThread "pollForRefresh" + fix $ \again -> do + join $ atomically $ do + nextup <- Int.findMin <$> readTVar psq + maybe retry (return . go again) nextup + where + go again ( bktnum :-> refresh_time ) = do + now <- getPOSIXTime + case fromEnum (refresh_time - now) of + x | x <= 0 -> do -- Refresh time! + -- Move it to the back of the refresh queue. + atomically $ modifyTVar' psq + $ Int.insert bktnum (now + interval) + -- Now fork the refresh operation. + -- TODO: We should probably propogate the kill signal to this thread. + fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) + _ <- refresh bktnum + return () + return () + seconds -> threadDelay ( seconds * 1000000 ) + again + +refreshBucket :: forall nid tok ni addr. + ( Show nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => + Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int +refreshBucket sch var n = do + tbl <- atomically (readTVar var) + let count = bktCount tbl + nid = kademliaLocation (searchSpace sch) (thisNode tbl) + sample <- if n+1 >= count -- Is this the last bucket? + then return nid -- Yes? Search our own id. + else kademliaSample (searchSpace sch) -- No? Generate a random id. + getEntropy + nid + (bucketRange n (n + 1 < count)) + fin <- atomically $ newTVar False + resultCounter <- atomically $ newTVar Set.empty + let fullcount = R.defaultBucketSize + saveit True = writeTVar fin True >> return True + saveit _ = return False + checkBucketFull :: ni -> STM Bool + checkBucketFull found_node = do + tbl <- readTVar var + let counts = R.shape tbl + when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) + $ modifyTVar' resultCounter (Set.insert found_node) + resultCount <- readTVar resultCounter + saveit $ case drop (n - 1) counts of + (cnt:_) | cnt < fullcount -> True + _ | Set.size resultCount < fullcount -> True + _ -> False + + hPutStrLn stderr $ "Start refresh " ++ show (n,sample) + + -- Set 15 minute timeout in order to avoid overlapping refreshes. + s <- search sch tbl sample $ if n+1 == R.defaultBucketCount + then const $ return True -- Never short-circuit the last bucket. + else checkBucketFull + _ <- timeout (15*60*1000000) $ do + atomically $ searchIsFinished s >>= check + atomically $ searchCancel s + hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) + rcount <- atomically $ do + c <- Set.size <$> readTVar resultCounter + b <- readTVar fin + return $ if b then 1 else c + return rcount + +bootstrap :: + ( Show nid + , Serialize nid + -- , FiniteBits nid + , Hashable ni + , Hashable nid + , Ord ni + , Ord addr + , Ord nid + , Traversable t1 + , Traversable t + ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () +bootstrap sch var ping ns ns0 = do + gotPing <- atomically $ newTVar False + + -- First, ping the given nodes so that they are added to + -- our routing table. + withTaskGroup "bootstrap.resume" 20 $ \g -> do + forM_ ns $ \n -> do + let lbl = show $ kademliaLocation (searchSpace sch) n + forkTask g lbl $ do + b <- ping n + when b $ atomically $ writeTVar gotPing True + + -- We resort to the hardcoded fallback nodes only when we got no + -- responses. This is to lesson the burden on well-known boostrap + -- nodes. + fallback <- atomically (readTVar gotPing) >>= return . when . not + fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do + forM_ ns0 $ \n -> do + forkTask g (show $ kademliaLocation (searchSpace sch) n) + (void $ ping n) + hPutStrLn stderr "Finished bootstrap pings." + + -- Now run searches until all the buckets are full. On a small network, + -- this may never quit. + -- + -- TODO: For small networks, we should give up on filling a nearby bucket + -- at some point and move on to one farther away. + flip fix 1 $ \again cnt -> do + when (cnt==0) $ do + -- Force a delay in case the search returns too quickly + hPutStrLn stderr $ "Zero results, forcing 1 minute delay" + threadDelay (60 * 1000000) + tbl <- atomically $ readTVar var + let shp = zip (R.shape tbl) [0 .. ] + unfull = filter ( (< R.defaultBucketSize) . fst ) shp + case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of + [] -> do + when (length shp < R.defaultBucketCount) $ do + -- Not enough buckets, keep trying. + hPutStrLn stderr + $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) + cnt <- refreshBucket sch var + (R.defaultBucketCount - 1) + again cnt + (size,num):_ -> do + -- If we don't yet have enough buckets, we need to search our own id. + -- We indicate that by setting the bucket number to the target. + let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1 + | otherwise = num + hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp) + cnt <- refreshBucket sch var num' + again cnt -- cgit v1.2.3