{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} -- {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE PatternSynonyms #-} module Network.Kademlia where import Data.Function import Data.Maybe import qualified Data.Set as Set import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Monad import Data.Bits import Data.Hashable import Data.IP import Data.Monoid import Data.Serialize (Serialize) import Data.Time.Clock.POSIX (POSIXTime) import Network.Address (bucketRange,genBucketSample) import Network.Kademlia.Search import System.Entropy import System.Timeout import Text.PrettyPrint as PP hiding (($$), (<>)) import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) import System.IO import Control.Concurrent.Tasks -- | The status of a given node with respect to a given routint table. data RoutingStatus = Stranger -- ^ The node is unknown to the Kademlia routing table. | Applicant -- ^ The node may be inserted pending a ping timeout. | Accepted -- ^ The node has a slot in one of the Kademlia buckets. deriving (Eq,Ord,Enum,Show,Read) -- | A change occured in the kademlia routing table. data RoutingTransition ni = RoutingTransition { transitioningNode :: ni , transitionedTo :: !RoutingStatus } deriving (Eq,Ord,Show,Read) data InsertionReporter ni = InsertionReporter { -- | Called on every inbound packet. Accepts: -- -- * Origin of packet. -- -- * List of nodes to be pinged as a result. reportArrival :: POSIXTime -> ni -> [ni] -> IO () -- | Called on every ping probe. Accepts: -- -- * Who was pinged. -- -- * True Bool value if they ponged. , reportPingResult :: POSIXTime -> ni -> Bool -> IO () } quietInsertions :: InsertionReporter ni quietInsertions = InsertionReporter { reportArrival = \_ _ _ -> return () , reportPingResult = \_ _ _ -> return () } contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t contramapIR f ir = InsertionReporter { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b } -- | All the IO operations neccessary to maintain a Kademlia routing table. data TableStateIO ni = TableStateIO { -- | Write the routing table. Typically 'writeTVar'. tblWrite :: R.BucketList ni -> STM () -- | Read the routing table. Typically 'readTVar'. , tblRead :: STM (R.BucketList ni) -- | Issue a ping to a remote node and report 'True' if the node -- responded within an acceptable time and 'False' otherwise. , tblPing :: ni -> IO Bool -- | Convenience method provided to assist in maintaining state -- consistent with the routing table. It will be invoked in the same -- transaction that 'tblRead'\/'tblWrite' occured but only when there was -- an interesting change. The returned IO action will be triggered soon -- afterward. -- -- It is not necessary to do anything interesting here. The following -- trivial implementation is fine: -- -- > tblTransition = const $ return $ return () , tblTransition :: RoutingTransition ni -> STM (IO ()) } vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni vanillaIO var ping = TableStateIO { tblRead = readTVar var , tblWrite = writeTVar var , tblPing = ping , tblTransition = const $ return $ return () } -- | Everything neccessary to maintain a routing table of /ni/ (node -- information) entries. data Kademlia nid ni = Kademlia (InsertionReporter ni) (KademliaSpace nid ni) (TableStateIO ni) -- Helper to 'insertNode'. -- -- Adapt return value from 'updateForPingResult' into a -- more easily groked list of transitions. transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni] transition (x,m) = -- Just _ <- m = Node transition: Accepted --> Stranger -- Nothing <- m = Node transition: Applicant --> Stranger RoutingTransition x Stranger : maybeToList (accepted <$> m) -- Helper to 'transition' -- -- Node transition: Applicant --> Accepted accepted :: (t,ni) -> RoutingTransition ni accepted (_,y) = RoutingTransition y Accepted insertNode :: Kademlia nid ni -> ni -> IO () insertNode (Kademlia reporter space io) node = do tm <- utcTimeToPOSIXSeconds <$> getCurrentTime (ps,reaction) <- atomically $ do tbl <- tblRead io let (inserted, ps,t') = R.updateForInbound space tm node tbl tblWrite io t' reaction <- case ps of _ | inserted -> -- Node transition: Stranger --> Accepted tblTransition io $ RoutingTransition node Accepted (_:_) -> -- Node transition: Stranger --> Applicant tblTransition io $ RoutingTransition node Applicant _ -> return $ return () return (ps, reaction) reportArrival reporter tm node ps reaction _ <- fork $ do myThreadId >>= flip labelThread "pingResults" forM_ ps $ \n -> do b <- tblPing io n reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result join $ atomically $ do tbl <- tblRead io let (replacements, t') = R.updateForPingResult space n b tbl tblWrite io t' ios <- sequence $ concatMap (map (tblTransition io) . transition) replacements return $ sequence_ ios return ()