From 49d0ad9f2adef768d972c12117b0e51c3ebe1b5e Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Thu, 5 Dec 2019 10:15:42 -0500 Subject: Simplification and documentation of Announcer. --- dht/Announcer.hs | 63 ++++++++++++++++++++++++++-------------------------- dht/Announcer/Tox.hs | 2 +- dht/ToxManager.hs | 9 +++++--- dht/examples/dhtd.hs | 1 - 4 files changed, 39 insertions(+), 36 deletions(-) (limited to 'dht') diff --git a/dht/Announcer.hs b/dht/Announcer.hs index c6a04cb1..41f86f24 100644 --- a/dht/Announcer.hs +++ b/dht/Announcer.hs @@ -16,7 +16,6 @@ module Announcer , forkAnnouncer , stopAnnouncer , cancel - , itemStatusNum , runAction , unschedule , delayAction @@ -36,15 +35,17 @@ import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Hashable +import Data.Time.Clock (NominalDiffTime) import Data.Time.Clock.POSIX import qualified GHC.Generics as Generics -- import Generic.Data.Internal.Meta as Lyxia +-- | Actions are scheduled and canceled via this key. newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) -instance Show AnnounceKey where - show (AnnounceKey bs) = "AnnounceKey " ++ show (Char8.unpack bs) +-- instance Show AnnounceKey where +-- show (AnnounceKey bs) = "AnnounceKey " ++ show (Char8.unpack bs) packAnnounceKey :: Announcer -> String -> AnnounceKey packAnnounceKey _ = AnnounceKey . Char8.pack @@ -58,17 +59,9 @@ unpackAnnounceKey _ (AnnounceKey bs) = Char8.unpack bs -- 'NewAnnouncement' which triggers the ordinary recurring scheduling of -- 'Announce'. data ScheduledItem = ScheduledItem (Announcer -> AnnounceKey -> POSIXTime -> STM (IO ())) - -- Can't use Data because STM and IO. :( - -- deriving Data {- itemStatusNum sch = constrIndex $ toConstr sch -} - -- Using Generic works with Lyxia's generic-data package... - -- {- itemStatusNum sch = Lyxia.conIdToInt $ Lyxia.conId sch -} - -- For now, make sure to keep 'itemStatusNum' up to date with 'ScheduledItem'. deriving Generics.Generic -itemStatusNum :: ScheduledItem -> Int -itemStatusNum = const 1 - newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem } emptySchedule :: Schedule @@ -81,32 +74,37 @@ scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled annou data Announcer = Announcer { -- | The queue of scheduled events. The priority is the time at which an - -- event is supposed to occur. Do not write to this TVar ever. + -- event is supposed to occur. scheduled :: TVar Schedule -- | This TVar is False when the Announcer thread has finished. , announcerActive :: TVar Bool - -- | This delay is used to wait until it's time to act on the earliest - -- scheduled item. It will be interrupted whenever a new item is - -- scheduled. + -- | This channel is used to communicate instructions to the timing + -- thread. It can be used to schedule actions or shutdown the thread. , commander :: TChan SchedulerCommand } +-- | Schedule an action to happen immediately. scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () scheduleImmediately announcer k item = writeTChan (commander announcer) $ RunActionSTM k item +-- | Schedule an action to happen at a specific time. scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () scheduleAbs announcer k item absTime = writeTChan (commander announcer) $ ScheduleAction (k, absTime, item) -scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () +-- | Schedule an action to occur s specified delta into the future from now. +scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> NominalDiffTime -> STM () scheduleRel announcer k item relTime = writeTChan (commander announcer) $ DelayAction (k, relTime, item) +-- | Schedule an action to occur s specified delta into the future from now. +-- This is an alias for 'scheduleRel'. delayAction = scheduleRel -runAction :: Announcer -> IO () -> STM () -runAction announcer = writeTChan (commander announcer) . RunAction +-- | Fork an action immediately. The String argument is a thread label. +runAction :: Announcer -> String -> IO () -> STM () +runAction announcer lbl = writeTChan (commander announcer) . RunAction lbl -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. stopAnnouncer :: Announcer -> IO () @@ -114,9 +112,11 @@ stopAnnouncer announcer = do atomically $ writeTChan (commander announcer) ShutdownScheduler atomically $ readTVar (announcerActive announcer) >>= check . not +-- | Cancel a scheduled action (STM). unschedule :: Announcer -> AnnounceKey -> STM () unschedule announcer k = writeTChan (commander announcer) $ UnscheduleAction k +-- | Cancel a scheduled action (IO). cancel :: Announcer -> AnnounceKey -> IO () cancel = ((.).(.)) atomically unschedule @@ -127,21 +127,27 @@ forkAnnouncer = do announcer <- atomically $ Announcer <$> newTVar emptySchedule <*> newTVar True <*> newTChan - fork $ announceThread announcer + tid <- forkIO $ listener announcer + labelThread tid "announcer" return announcer +-- Wait for an item to occur on the given channel, or for the given TVar to be +-- True. readTChanTimeout :: TVar Bool -> TChan a -> STM (Maybe a) -readTChanTimeout delay pktChannel = do - Just <$> readTChan pktChannel <|> pure Nothing <* (readTVar >=> check) delay +readTChanTimeout delay pktChannel = + orElse (Just <$> readTChan pktChannel) + (pure Nothing <* (readTVar >=> check) delay) data SchedulerCommand = ShutdownScheduler - | ScheduleAction KPS -- run an action at an absolute time (todo: use UTCTime) - | DelayAction KPS -- run an action at a time relative to the present (todo: use NominalDiffTime) - | RunAction (IO ()) + | ScheduleAction KPS -- run an action at an absolute time + | DelayAction KPS -- run an action at a time relative to the present + | RunAction String (IO ()) | RunActionSTM AnnounceKey ScheduledItem | UnscheduleAction AnnounceKey +-- This is the main scheduling thread that polls the commander TChan and forks +-- scheduled actions. listener :: Announcer -> IO () listener announcer = relisten -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule). @@ -158,7 +164,7 @@ listener announcer = relisten atomically (readTChan chan) >>= handleCommand Just ((k, p, ScheduledItem f), queue') -> do now <- getPOSIXTime - note $ "queue full - listening with timeout - " ++ show (p, now) + note $ "queue non-empty - listening with timeout - " ++ show (p, now) delay <- registerDelay (toMicroseconds (p - now)) join $ atomically $ do readTChanTimeout delay chan >>= \case @@ -174,7 +180,7 @@ listener announcer = relisten case cmd of ScheduleAction (k, p, s) -> atomically $ modifyScheduled $ PSQ.insert' k s p UnscheduleAction k -> atomically $ modifyScheduled $ PSQ.delete k - RunAction io -> void $ fork io + RunAction lbl io -> void $ forkLabeled lbl io RunActionSTM k (ScheduledItem f) -> do now <- getPOSIXTime void . fork . join . atomically $ modifyScheduled (PSQ.delete k) >> f announcer k now @@ -182,8 +188,3 @@ listener announcer = relisten now <- getPOSIXTime atomically $ modifyScheduled $ PSQ.insert' k s (now + p) -announceThread :: Announcer -> IO () -announceThread announcer = do - myThreadId >>= flip labelThread "announcer" - listener announcer - diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs index f8343f8d..4b775049 100644 --- a/dht/Announcer/Tox.hs +++ b/dht/Announcer/Tox.hs @@ -183,7 +183,7 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge ns <- atomically $ newTVar MM.empty let astate = AnnounceState st ns onResult sr = do - runAction announcer $ do + runAction announcer "with-search-result" $ do got <- sWithResult r sr -- If we had a way to get the source of a search result, we might want to -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' diff --git a/dht/ToxManager.hs b/dht/ToxManager.hs index 408b12d2..39377733 100644 --- a/dht/ToxManager.hs +++ b/dht/ToxManager.hs @@ -327,13 +327,16 @@ gotDhtPubkey theirDhtKey tx theirKey = do , rumoredAddress = assume akey } + showak :: AnnounceKey -> String + showak k = unpackAnnounceKey (txAnnouncer tx) k + assume :: AnnounceKey -> POSIXTime -> SockAddr -> NodeInfo -> STM () assume akey time addr ni = - tput XNodeinfoSearch $ show ("rumor", akey, time, addr, ni) + tput XNodeinfoSearch $ show ("rumor", showak akey, time, addr, ni) observe :: AnnounceKey -> POSIXTime -> NodeInfo -> STM () observe akey time ni@(nodeAddr -> addr) = do - tput XNodeinfoSearch $ show ("observation", akey, time, addr) + tput XNodeinfoSearch $ show ("observation", showak akey, time, addr) setContactAddr time theirKey ni (txAccount tx) gotAddr :: NodeInfo -> ToxToXMPP -> PublicKey -> IO () @@ -408,7 +411,7 @@ gotAddr' ni@(nodeAddr -> addr) tx theirKey theirDhtKey = atomically blee getCookie ni isActive getC ann akey now = getCookieAgain where getCookieAgain = do - tput XNodeinfoSearch $ show ("getCookieAgain", akey) + tput XNodeinfoSearch $ show ("getCookieAgain", unpackAnnounceKey ann akey) mbContact <- getC case mbContact of Nothing -> return $ return () diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index e24bb5df..42fcc67b 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs @@ -872,7 +872,6 @@ clientSession s@Session{..} sock cnum h = do let kstr = unpackAnnounceKey announcer k return [ if ptm==0 then "now" else show (ptm - now) - , show (itemStatusNum item) , kstr ] hPutClient h $ showColumns rs -- cgit v1.2.3