{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} -- {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE PatternSynonyms #-} module Network.Kademlia where import Data.Maybe import Data.Time.Clock.POSIX import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Monad import Data.Time.Clock.POSIX (POSIXTime) -- | The status of a given node with respect to a given routint table. data RoutingStatus = Stranger -- ^ The node is unknown to the Kademlia routing table. | Applicant -- ^ The node may be inserted pending a ping timeout. | Accepted -- ^ The node has a slot in one of the Kademlia buckets. deriving (Eq,Ord,Enum,Show,Read) -- | A change occured in the kademlia routing table. data RoutingTransition ni = RoutingTransition { transitioningNode :: ni , transitionedTo :: !RoutingStatus } deriving (Eq,Ord,Show,Read) data InsertionReporter ni = InsertionReporter { -- | Called on every inbound packet. Accepts: -- -- * Origin of packet. -- -- * List of nodes to be pinged as a result. reportArrival :: POSIXTime -> ni -> [ni] -> IO () -- | Called on every ping probe. Accepts: -- -- * Who was pinged. -- -- * True Bool value if they ponged. , reportPingResult :: POSIXTime -> ni -> Bool -> IO () } quietInsertions :: InsertionReporter ni quietInsertions = InsertionReporter { reportArrival = \_ _ _ -> return () , reportPingResult = \_ _ _ -> return () } contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t contramapIR f ir = InsertionReporter { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b } -- | All the IO operations neccessary to maintain a Kademlia routing table. data TableStateIO ni = TableStateIO { -- | Write the routing table. Typically 'writeTVar'. tblWrite :: R.BucketList ni -> STM () -- | Read the routing table. Typically 'readTVar'. , tblRead :: STM (R.BucketList ni) -- | Issue a ping to a remote node and report 'True' if the node -- responded within an acceptable time and 'False' otherwise. , tblPing :: ni -> IO Bool -- | Convenience method provided to assist in maintaining state -- consistent with the routing table. It will be invoked in the same -- transaction that 'tblRead'\/'tblWrite' occured but only when there was -- an interesting change. The returned IO action will be triggered soon -- afterward. -- -- It is not necessary to do anything interesting here. The following -- trivial implementation is fine: -- -- > tblTransition = const $ return $ return () , tblTransition :: RoutingTransition ni -> STM (IO ()) } vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni vanillaIO var ping = TableStateIO { tblRead = readTVar var , tblWrite = writeTVar var , tblPing = ping , tblTransition = const $ return $ return () } -- | Everything neccessary to maintain a routing table of /ni/ (node -- information) entries. data Kademlia nid ni = Kademlia (InsertionReporter ni) (KademliaSpace nid ni) (TableStateIO ni) -- Helper to 'insertNode'. -- -- Adapt return value from 'updateForPingResult' into a -- more easily groked list of transitions. transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni] transition (x,m) = -- Just _ <- m = Node transition: Accepted --> Stranger -- Nothing <- m = Node transition: Applicant --> Stranger RoutingTransition x Stranger : maybeToList (accepted <$> m) -- Helper to 'transition' -- -- Node transition: Applicant --> Accepted accepted :: (t,ni) -> RoutingTransition ni accepted (_,y) = RoutingTransition y Accepted insertNode :: Kademlia nid ni -> ni -> IO () insertNode (Kademlia reporter space io) node = do tm <- getPOSIXTime (ps,reaction) <- atomically $ do tbl <- tblRead io let (inserted, ps,t') = R.updateForInbound space tm node tbl tblWrite io t' reaction <- case ps of _ | inserted -> -- Node transition: Stranger --> Accepted tblTransition io $ RoutingTransition node Accepted (_:_) -> -- Node transition: Stranger --> Applicant tblTransition io $ RoutingTransition node Applicant _ -> return $ return () return (ps, reaction) reportArrival reporter tm node ps reaction _ <- fork $ do myThreadId >>= flip labelThread "pingResults" forM_ ps $ \n -> do b <- tblPing io n reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result join $ atomically $ do tbl <- tblRead io let (replacements, t') = R.updateForPingResult space n b tbl tblWrite io t' ios <- sequence $ concatMap (map (tblTransition io) . transition) replacements return $ sequence_ ios return ()