{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} -- {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE PatternSynonyms #-} module Kademlia where import Data.Function import Data.Maybe import qualified Data.Set as Set import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) import Network.DHT.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Monad import Data.Bits import Data.Hashable import Data.IP import Data.Monoid import Data.Serialize (Serialize) import Data.Time.Clock.POSIX (POSIXTime) import qualified Data.Wrapper.PSQInt as Int ;import Data.Wrapper.PSQInt (pattern (:->)) import Network.Address (bucketRange,genBucketSample) import Network.BitTorrent.DHT.Search import System.Timeout import Text.PrettyPrint as PP hiding (($$), (<>)) import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) import System.IO import Tasks -- | The status of a given node with respect to a given routint table. data RoutingStatus = Stranger -- ^ The node is unknown to the Kademlia routing table. | Applicant -- ^ The node may be inserted pending a ping timeout. | Accepted -- ^ The node has a slot in one of the Kademlia buckets. deriving (Eq,Ord,Enum,Show,Read) -- | A change occured in the kademlia routing table. data RoutingTransition ni = RoutingTransition { transitioningNode :: ni , transitionedTo :: !RoutingStatus } deriving (Eq,Ord,Show,Read) data InsertionReporter ni = InsertionReporter { -- | Called on every inbound packet. reportArrival :: POSIXTime -> ni -- ^ Origin of packet. -> [ni] -- ^ These will be pinged as a result. -> IO () -- | Called on every ping probe. , reportPingResult :: POSIXTime -> ni -- ^ Who was pinged. -> Bool -- ^ True if they ponged. -> IO () } quietInsertions :: InsertionReporter ni quietInsertions = InsertionReporter { reportArrival = \_ _ _ -> return () , reportPingResult = \_ _ _ -> return () } contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t contramapIR f ir = InsertionReporter { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b } -- | All the IO operations neccessary to maintain a Kademlia routing table. data TableStateIO ni = TableStateIO { -- | Write the routing table. Typically 'writeTVar'. tblWrite :: R.BucketList ni -> STM () -- | Read the routing table. Typically 'readTVar'. , tblRead :: STM (R.BucketList ni) -- | Issue a ping to a remote node and report 'True' if the node -- responded within an acceptable time and 'False' otherwise. , tblPing :: ni -> IO Bool -- | Convenience method provided to assist in maintaining state -- consistent with the routing table. It will be invoked in the same -- transaction that 'tblRead'\/'tblWrite' occured but only when there was -- an interesting change. The returned IO action will be triggered soon -- afterward. -- -- It is not necessary to do anything interesting here. The following -- trivial implementation is fine: -- -- > tblTransition = const $ return $ return () , tblTransition :: RoutingTransition ni -> STM (IO ()) } vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni vanillaIO var ping = TableStateIO { tblRead = readTVar var , tblWrite = writeTVar var , tblPing = ping , tblTransition = const $ return $ return () } -- | Everything neccessary to maintain a routing table of /ni/ (node -- information) entries. data Kademlia nid ni = Kademlia (InsertionReporter ni) (KademliaSpace nid ni) (TableStateIO ni) -- Helper to 'insertNode'. -- -- Adapt return value from 'updateForPingResult' into a -- more easily groked list of transitions. transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni] transition (x,m) = -- | Just _ <- m = Node transition: Accepted --> Stranger -- | Nothing <- m = Node transition: Applicant --> Stranger RoutingTransition x Stranger : maybeToList (accepted <$> m) -- Helper to 'transition' -- -- Node transition: Applicant --> Accepted accepted :: (t,ni) -> RoutingTransition ni accepted (_,y) = RoutingTransition y Accepted insertNode :: Kademlia nid ni -> ni -> IO () insertNode (Kademlia reporter space io) node = do tm <- utcTimeToPOSIXSeconds <$> getCurrentTime (ps,reaction) <- atomically $ do tbl <- tblRead io let (inserted, ps,t') = R.updateForInbound space tm node tbl tblWrite io t' reaction <- case ps of _ | inserted -> -- Node transition: Stranger --> Accepted tblTransition io $ RoutingTransition node Accepted (_:_) -> -- Node transition: Stranger --> Applicant tblTransition io $ RoutingTransition node Applicant _ -> return $ return () return (ps, reaction) reportArrival reporter tm node ps reaction _ <- fork $ do myThreadId >>= flip labelThread "pingResults" forM_ ps $ \n -> do b <- tblPing io n reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result join $ atomically $ do tbl <- tblRead io let (replacements, t') = R.updateForPingResult space n b tbl tblWrite io t' ios <- sequence $ concatMap (map (tblTransition io) . transition) replacements return $ sequence_ ios return () -- TODO: Bootstrap/Refresh -- -- From BEP 05: -- -- Each bucket should maintain a "last changed" property to indicate how -- "fresh" the contents are. -- -- Note: We will use a "time to next refresh" property instead and store it in -- a priority search queue. -- -- When... -- -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus -- >>> bucketEvents = -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, -- >>> -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, -- >>> -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced -- >>> , Applicant :--> Accepted -- with another node, -- >>> ] -- -- the bucket's last changed property should be updated. Buckets -- that have not been changed in 15 minutes should be "refreshed." This is done -- by picking a random ID in the range of the bucket and performing a -- find_nodes search on it. -- -- The only other possible BucketTouchEvents are as follows: -- -- >>> not_handled = -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: -- >>> -- (Applicant :--> Stranger) -- >>> -- (Applicant :--> Accepted) -- >>> , Accepted :--> Applicant -- Never happens -- >>> ] -- -- XXX: This will be redundantly triggered twice upon every node replacement -- because we do not currently distinguish between standalone -- insertion/deletion events and an insertion/deletion pair constituting -- replacement. -- -- It might also be better to pass the timestamp of the transition here and -- keep the refresh queue in better sync with the routing table by updating it -- within the STM monad. touchBucket :: KademliaSpace nid ni -> POSIXTime -> TVar (BucketList ni) -> TVar (Int.PSQ POSIXTime) -> RoutingTransition ni -> STM (IO ()) touchBucket space interval bkts psq tr | (transitionedTo tr == Applicant) = return $ return () | otherwise = return $ do now <- getPOSIXTime atomically $ do let nid = kademliaLocation space (transitioningNode tr) num <- R.bucketNumber space nid <$> readTVar bkts modifyTVar' psq $ Int.insert num (now + interval) -- | > pollForRefresh interval queue refresh -- -- Fork a refresh loop. Kill the returned thread to terminate it. The -- arguments are: a staleness threshold (if a bucket goes this long without -- being touched, a refresh will be triggered), a TVar with the time-to-refresh -- schedule for each bucket, and a refresh action to be forked when a bucket -- excedes the staleness threshold. -- -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's -- refresh time to some time into the future by modifying the 'Int.PSQ' in the -- TVar. forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId forkPollForRefresh interval psq refresh = do fork $ do myThreadId >>= flip labelThread "pollForRefresh" fix $ \again -> do join $ atomically $ do nextup <- Int.findMin <$> readTVar psq maybe retry (return . go again) nextup where go again ( bktnum :-> refresh_time ) = do now <- getPOSIXTime case fromEnum (refresh_time - now) of x | x <= 0 -> do -- Refresh time! -- Move it to the back of the refresh queue. atomically $ modifyTVar' psq $ Int.insert bktnum (now + interval) -- Now fork the refresh operation. -- TODO: We should probably propogate the kill signal to this thread. fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) _ <- refresh bktnum return () return () seconds -> threadDelay ( seconds * 1000000 ) again refreshBucket :: forall nid tok ni addr. ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int refreshBucket sch var n = do tbl <- atomically (readTVar var) let count = bktCount tbl nid = kademliaLocation (searchSpace sch) (thisNode tbl) sample <- if n+1 >= count -- Is this the last bucket? then return nid -- Yes? Search our own id. else genBucketSample nid -- No? Generate a random id. (bucketRange n (n + 1 < count)) fin <- atomically $ newTVar False resultCounter <- atomically $ newTVar Set.empty let fullcount = R.defaultBucketSize saveit True = writeTVar fin True >> return True saveit _ = return False checkBucketFull :: ni -> STM Bool checkBucketFull found_node = do tbl <- readTVar var let counts = R.shape tbl when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) $ modifyTVar' resultCounter (Set.insert found_node) resultCount <- readTVar resultCounter saveit $ case drop (n - 1) counts of (cnt:_) | cnt < fullcount -> True _ | Set.size resultCount < fullcount -> True _ -> False hPutStrLn stderr $ "Start refresh " ++ show (n,sample) -- Set 15 minute timeout in order to avoid overlapping refreshes. s <- search sch tbl sample $ if n+1 == R.defaultBucketCount then const $ return True -- Never short-circuit the last bucket. else checkBucketFull _ <- timeout (15*60*1000000) $ do atomically $ searchIsFinished s >>= check atomically $ searchCancel s hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) rcount <- atomically $ do c <- Set.size <$> readTVar resultCounter b <- readTVar fin return $ if b then 1 else c return rcount bootstrap :: ( Show nid , Serialize nid , FiniteBits nid , Hashable ni , Hashable nid , Ord ni , Ord addr , Ord nid , Traversable t1 , Traversable t ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () bootstrap sch var ping ns ns0 = do gotPing <- atomically $ newTVar False -- First, ping the given nodes so that they are added to -- our routing table. withTaskGroup "bootstrap.resume" 20 $ \g -> do forM_ ns $ \n -> do let lbl = show $ kademliaLocation (searchSpace sch) n forkTask g lbl $ do b <- ping n when b $ atomically $ writeTVar gotPing True -- We resort to the hardcoded fallback nodes only when we got no -- responses. This is to lesson the burden on well-known boostrap -- nodes. fallback <- atomically (readTVar gotPing) >>= return . when . not fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do forM_ ns0 $ \n -> do forkTask g (show $ kademliaLocation (searchSpace sch) n) (void $ ping n) hPutStrLn stderr "Finished bootstrap pings." -- Now run searches until all the buckets are full. On a small network, -- this may never quit. -- -- TODO: For small networks, we should give up on filling a nearby bucket -- at some point and move on to one farther away. flip fix 1 $ \again cnt -> do when (cnt==0) $ do -- Force a delay in case the search returns too quickly hPutStrLn stderr $ "Zero results, forcing 1 minute delay" threadDelay (60 * 1000000) tbl <- atomically $ readTVar var let shp = zip (R.shape tbl) [0 .. ] unfull = filter ( (< R.defaultBucketSize) . fst ) shp case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of [] -> do when (length shp < R.defaultBucketCount) $ do -- Not enough buckets, keep trying. hPutStrLn stderr $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) cnt <- refreshBucket sch var (R.defaultBucketCount - 1) again cnt (size,num):_ -> do -- If we don't yet have enough buckets, we need to search our own id. -- We indicate that by setting the bucket number to the target. let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1 | otherwise = num hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp) cnt <- refreshBucket sch var num' again cnt