{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE DeriveFunctor, DeriveTraversable #-} -- {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE GADTs #-} module Kademlia where import Data.Maybe import Network.DHT.Routing as R import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted #endif import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Data.IP import Control.Concurrent.STM import Control.Monad import Data.Monoid import Data.Time.Clock.POSIX (POSIXTime) {- insertNode1 :: forall raw dht u ip. ( Address ip , Default u , Show u , Ord (NodeId dht) , FiniteBits (NodeId dht) , Show (NodeId dht) , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) , DHT.Kademlia dht , Ord (TransactionID dht) , WireFormat raw dht , Serialize (TransactionID dht) , SerializableTo raw (Response dht (Ping dht)) , SerializableTo raw (Query dht (Ping dht)) , Ord (NodeId dht) , Show (NodeId dht) , Show (QueryMethod dht) ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) -} {- insertNode1 = do params = DHT.TableParameters { maxBuckets = R.defaultBucketCount :: Int , fallbackID = myid , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht , logMessage = (\ _ _ -> return ()) {- TODO -} :: Char -> String -> IO () , pingProbe = error "probe" :: ni -> NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP) } let state = DHT.TableKeeper { routingInfo = tbl , grokNode = DHT.insertNode params state , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () } return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 -} {- insertNode :: forall msg ip u. ( Address ip , Show u , Show (NodeId msg) , Ord (NodeId msg) , FiniteBits (NodeId msg) ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO () -} {- let showTable = do t <- atomically $ fmap myBuckets <$> readTVar routingInfo let logMsg = "Routing table: " <> pPrint t logMessage 'D' (render logMsg) reportPingResult tm n b = showTable reportArrival tm info ps = showTable -} -- | A change occured in the kademlia routing table. data RoutingTableChanged ni = RoutingTableChanged { nodeReplaced :: !(Maybe ni) -- Deleted entry. , nodeInserted :: ni -- New routing table entry. , nodeTimestamp :: !POSIXTime -- Last-seen time for the new node. } deriving (Eq,Ord,Show,Functor,Foldable,Traversable) data InsertionReporter ni = InsertionReporter { -- | Called on every inbound packet. reportArrival :: POSIXTime -> ni -- ^ Origin of packet. -> [ni] -- ^ These will be pinged as a result. -> IO () -- | Called on every ping probe. , reportPingResult :: POSIXTime -> ni -- ^ Who was pinged. -> Bool -- ^ True if they ponged. -> IO () } quietInsertions = InsertionReporter { reportArrival = \_ _ _ -> return () , reportPingResult = \_ _ _ -> return () } contramapIR f ir = InsertionReporter { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis) , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b } -- | All the IO operations neccessary to maintain a Kademlia routing table. data TableStateIO nid ni = TableStateIO { -- | Write the routing table. Typically 'writeTVar'. tblWrite :: R.Table ni -> STM () -- | Read the routing table. Typically 'readTVar'. , tblRead :: STM (R.Table ni) -- | Issue a ping to a remote node and report 'True' if the node -- responded within an acceptable time and 'False' otherwise. , tblPing :: ni -> IO Bool -- | Convenience method provided to assist in maintaining state -- consistent with the routing table. It will be invoked in the same -- transaction that 'tblRead'\/'tblWrite' occured but only when there was -- an interesting change. The returned IO action will be triggered soon -- afterward. -- -- It is not necessary to do anything interesting here. The following -- trivial implementation is fine: -- -- > tblChanged = const $ return $ return () , tblChanged :: RoutingTableChanged ni -> STM (IO ()) } vanillaIO :: TVar (Table ni) -> (ni -> IO Bool) -> TableStateIO nid ni vanillaIO var ping = TableStateIO { tblRead = readTVar var , tblWrite = writeTVar var , tblPing = ping , tblChanged = const $ return $ return () } -- | Everything neccessary to maintain a routing table of /ni/ (node -- information) entries. data Kademlia nid ni = Kademlia (InsertionReporter ni) (KademliaSpace nid ni) (TableStateIO nid ni) {- kademlia :: FiniteBits nid => TVar (Table nid nid) -> (nid -> IO Bool) -> Kademlia nid nid kademlia var ping = Kademlia quietInsertions (KademliaSpace id testIdBit) (vanillaIO var ping) -} insertNode :: forall ni nid. Ord ni => Kademlia nid ni -> ni -> IO () insertNode (Kademlia reporter space io) node = do tm <- utcTimeToPOSIXSeconds <$> getCurrentTime (ps,reaction) <- atomically $ do tbl <- tblRead io let (inserted, ps,t') = R.updateForInbound space tm node tbl tblWrite io t' reaction <- if inserted then tblChanged io $ RoutingTableChanged Nothing node tm else return $ return () return (ps, reaction) reportArrival reporter tm node ps reaction _ <- fork $ do myThreadId >>= flip labelThread "pingResults" forM_ ps $ \n -> do b <- tblPing io n reportPingResult reporter tm n b join $ atomically $ do tbl <- tblRead io let (replacements, t') = R.updateForPingResult space n b tbl tblWrite io t' sequence <$> mapM (\(x,(t,y)) -> tblChanged io $ RoutingTableChanged (Just x) y t) replacements return ()