{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} -- {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE GADTs #-} module Kademlia where import Data.Maybe import Network.DHT.Routing as R import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted #endif import Data.Bits import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.IP import Control.Concurrent.STM import Control.Monad import Data.Monoid import Data.Time.Clock.POSIX (POSIXTime) -- | The status of a given node with respect to a given routint table. data RoutingStatus = Stranger -- ^ The node is unknown to the Kademlia routing table. | Applicant -- ^ The node may be inserted pending a ping timeout. | Accepted -- ^ The node has a slot in one of the Kademlia buckets. deriving (Eq,Ord,Enum,Show,Read) -- | A change occured in the kademlia routing table. data RoutingTransition ni = RoutingTransition { transitioningNode :: ni , transitionedTo :: !RoutingStatus } deriving (Eq,Ord,Show,Read) data InsertionReporter ni = InsertionReporter { -- | Called on every inbound packet. reportArrival :: POSIXTime -> ni -- ^ Origin of packet. -> [ni] -- ^ These will be pinged as a result. -> IO () -- | Called on every ping probe. , reportPingResult :: POSIXTime -> ni -- ^ Who was pinged. -> Bool -- ^ True if they ponged. -> IO () } quietInsertions = InsertionReporter { reportArrival = \_ _ _ -> return () , reportPingResult = \_ _ _ -> return () } contramapIR f ir = InsertionReporter { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b } -- | All the IO operations neccessary to maintain a Kademlia routing table. data TableStateIO nid ni = TableStateIO { -- | Write the routing table. Typically 'writeTVar'. tblWrite :: R.BucketList ni -> STM () -- | Read the routing table. Typically 'readTVar'. , tblRead :: STM (R.BucketList ni) -- | Issue a ping to a remote node and report 'True' if the node -- responded within an acceptable time and 'False' otherwise. , tblPing :: ni -> IO Bool -- | Convenience method provided to assist in maintaining state -- consistent with the routing table. It will be invoked in the same -- transaction that 'tblRead'\/'tblWrite' occured but only when there was -- an interesting change. The returned IO action will be triggered soon -- afterward. -- -- It is not necessary to do anything interesting here. The following -- trivial implementation is fine: -- -- > tblTransition = const $ return $ return () , tblTransition :: RoutingTransition ni -> STM (IO ()) } vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO nid ni vanillaIO var ping = TableStateIO { tblRead = readTVar var , tblWrite = writeTVar var , tblPing = ping , tblTransition = const $ return $ return () } -- | Everything neccessary to maintain a routing table of /ni/ (node -- information) entries. data Kademlia nid ni = Kademlia (InsertionReporter ni) (KademliaSpace nid ni) (TableStateIO nid ni) -- Helper to 'insertNode'. -- -- Adapt return value from 'updateForPingResult' into a -- more easily groked list of transitions. transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni] transition (x,m) = -- | Just _ <- m = Node transition: Accepted --> Stranger -- | Nothing <- m = Node transition: Applicant --> Stranger RoutingTransition x Stranger : maybeToList (accepted <$> m) -- Helper to 'transition' -- -- Node transition: Applicant --> Accepted accepted :: (t,ni) -> RoutingTransition ni accepted (_,y) = RoutingTransition y Accepted insertNode :: Kademlia nid ni -> ni -> IO () insertNode (Kademlia reporter space io) node = do tm <- utcTimeToPOSIXSeconds <$> getCurrentTime (ps,reaction) <- atomically $ do tbl <- tblRead io let (inserted, ps,t') = R.updateForInbound space tm node tbl tblWrite io t' reaction <- case ps of _ | inserted -> -- Node transition: Stranger --> Accepted tblTransition io $ RoutingTransition node Accepted (_:_) -> -- Node transition: Stranger --> Applicant tblTransition io $ RoutingTransition node Applicant _ -> return $ return () return (ps, reaction) reportArrival reporter tm node ps reaction _ <- fork $ do myThreadId >>= flip labelThread "pingResults" forM_ ps $ \n -> do b <- tblPing io n reportPingResult reporter tm n b join $ atomically $ do tbl <- tblRead io let (replacements, t') = R.updateForPingResult space n b tbl tblWrite io t' ios <- sequence $ concatMap (map (tblTransition io) . transition) replacements return $ sequence_ ios return ()