From b9a2c3dd44f5dd59157676fb386584a148d854cb Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 15 Jun 2017 20:08:08 -0400 Subject: Refactored insertNode. --- src/Network/DHT.hs | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 src/Network/DHT.hs (limited to 'src/Network/DHT.hs') diff --git a/src/Network/DHT.hs b/src/Network/DHT.hs new file mode 100644 index 00000000..0dab29cd --- /dev/null +++ b/src/Network/DHT.hs @@ -0,0 +1,125 @@ +{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} +module Network.DHT + ( -- makeTableKeeper + -- , TableKeeper(..) + module Network.DHT -- for now + , module Network.DHT.Types + ) where + +import Data.Bits +import Data.Maybe +import Data.Monoid +import Network.Address +import Network.DHT.Types +import Network.DatagramServer.Types +import Network.DHT.Routing +import Control.Concurrent.STM +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import GHC.Conc (labelThread) +import Control.Concurrent.Lifted +#endif +import Text.PrettyPrint as PP hiding ((<>), ($$)) +import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) + +import Control.Monad +import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) + +data TableKeeper msg ip u = TableKeeper + { routingInfo :: TVar (Maybe (Info msg ip u)) + , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO () + , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO () + } + +makeTableKeeper :: forall msg ip u. + ( Address ip + , Show u + , Show (NodeId msg) + , Ord (NodeId msg) + , FiniteBits (NodeId msg) + ) => TableParameters msg ip u -> IO (TableKeeper msg ip u) +makeTableKeeper param@TableParameters{..} = do + error "TODO makeTableKeeper" -- kick off table-updating thread + ri <- atomically (newTVar Nothing) + let tk = TableKeeper{ routingInfo = ri + , grokNode = insertNode param tk + , grokAddress = error "todo" + } + return tk + +atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg) + ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u]) +atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do + minfo <- readTVar (routingInfo state) + case minfo of + Just inf -> do + (ps,t') <- insert tm arrival $ myBuckets inf + writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' } + return $ do + case witnessed_ip of + Just (ReflectedIP ip) + | ip /= myAddress inf + -> logMessage 'I' $ unwords + $ [ "Possible NAT?" + , show (toSockAddr $ nodeAddr $ foreignNode arrival) + , "reports my address:" + , show ip ] + -- TODO: Let routing table vote on my IP/NodeId. + _ -> return () + return ps + Nothing -> + let dropped = return $ do + -- Ignore non-witnessing nodes until somebody tells + -- us our ip address. + logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival)) + return [] + in fromMaybe dropped $ do + ReflectedIP ip <- witnessed_ip + let nil = nullTable (adjustID ip arrival) maxBuckets + return $ do + (ps,t') <- insert tm arrival nil + let new_info = Info t' (adjustID ip arrival) ip + writeTVar (routingInfo state) $ Just new_info + return $ do + logMessage 'I' $ unwords + [ "External IP address:" + , show ip + , "(reported by" + , show (toSockAddr $ nodeAddr $ foreignNode arrival) + ++ ")" + ] + return ps + +-- | This operation do not block but acquire exclusive access to +-- routing table. +insertNode :: forall msg ip u. + ( Address ip + , Show u + , Show (NodeId msg) + , Ord (NodeId msg) + , FiniteBits (NodeId msg) + ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO () +insertNode param@TableParameters{..} state info witnessed_ip0 = do + tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime + let showTable = do + t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state) + let logMsg = "Routing table: " <> pPrint t + logMessage 'D' (render logMsg) + let arrival = TryInsert info + logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ ) + ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0 + showTable + _ <- fork $ do + myThreadId >>= flip labelThread "DHT.insertNode.pingResults" + forM_ ps $ \(CheckPing ns)-> do + forM_ ns $ \n -> do + (b,mip) <- pingProbe (nodeAddr n) + let alive = PingResult n b + logMessage 'D' $ "PingResult "++show (nodeId n,b) + _ <- join $ atomically $ atomicInsert param state tm alive mip + showTable + return () + return () + -- cgit v1.2.3