{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-} module Network.DHT ( -- makeTableKeeper -- , TableKeeper(..) module Network.DHT -- for now , module Network.DHT.Types ) where import Data.Bits import Data.Maybe import Data.Monoid import Network.Address import Network.DHT.Types import Network.DatagramServer.Types import Network.DHT.Routing import Control.Concurrent.STM #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted #endif import Text.PrettyPrint as PP hiding ((<>), ($$)) import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) import Control.Monad import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) data TableKeeper msg ip u = TableKeeper { routingInfo :: TVar (Maybe (Info msg ip u)) , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO () , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO () } makeTableKeeper :: forall msg ip u. ( Address ip , Show u , Show (NodeId msg) , Ord (NodeId msg) , FiniteBits (NodeId msg) ) => TableParameters msg ip u -> IO (TableKeeper msg ip u) makeTableKeeper param@TableParameters{..} = do error "TODO makeTableKeeper" -- kick off table-updating thread ri <- atomically (newTVar Nothing) let tk = TableKeeper{ routingInfo = ri , grokNode = insertNode param tk , grokAddress = error "todo" } return tk atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg) ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u]) atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do minfo <- readTVar (routingInfo state) case minfo of Just inf -> do (ps,t') <- insert tm arrival $ myBuckets inf writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' } return $ do case witnessed_ip of Just (ReflectedIP ip) | ip /= myAddress inf -> logMessage 'I' $ unwords $ [ "Possible NAT?" , show (toSockAddr $ nodeAddr $ foreignNode arrival) , "reports my address:" , show ip ] -- TODO: Let routing table vote on my IP/NodeId. _ -> return () return ps Nothing -> let dropped = return $ do -- Ignore non-witnessing nodes until somebody tells -- us our ip address. logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival)) return [] in fromMaybe dropped $ do ReflectedIP ip <- witnessed_ip let nil = nullTable (adjustID ip arrival) maxBuckets return $ do (ps,t') <- insert tm arrival nil let new_info = Info t' (adjustID ip arrival) ip writeTVar (routingInfo state) $ Just new_info return $ do logMessage 'I' $ unwords [ "External IP address:" , show ip , "(reported by" , show (toSockAddr $ nodeAddr $ foreignNode arrival) ++ ")" ] return ps -- | This operation do not block but acquire exclusive access to -- routing table. insertNode :: forall msg ip u. ( Address ip , Show u , Show (NodeId msg) , Ord (NodeId msg) , FiniteBits (NodeId msg) ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO () insertNode param@TableParameters{..} state info witnessed_ip0 = do tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime let showTable = do t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state) let logMsg = "Routing table: " <> pPrint t logMessage 'D' (render logMsg) let arrival = TryInsert info logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ ) ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0 showTable _ <- fork $ do myThreadId >>= flip labelThread "DHT.insertNode.pingResults" forM_ ps $ \(CheckPing ns)-> do forM_ ns $ \n -> do (b,mip) <- pingProbe (nodeAddr n) let alive = PingResult n b logMessage 'D' $ "PingResult "++show (nodeId n,b) _ <- join $ atomically $ atomicInsert param state tm alive mip showTable return () return ()