From 9eef4cbd00586df2fad36f3cab3d04b807b92e2f Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 31 Oct 2017 17:42:15 -0400 Subject: WIP: a command (recurring announcements) (Part 4) --- Announcer.hs | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 108 insertions(+), 10 deletions(-) (limited to 'Announcer.hs') diff --git a/Announcer.hs b/Announcer.hs index bc52ee38..8008267c 100644 --- a/Announcer.hs +++ b/Announcer.hs @@ -1,23 +1,121 @@ -{-# LANGUAGE ExistentialQuantification #-} -module Announcer where +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +module Announcer + ( Announcer + , AnnounceKey + , packAnnounceKey + , unpackAnnounceKey + , AnnounceMethod(..) + , forkAnnouncer + , stopAnnouncer + , schedule + , cancel + ) where +import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search +import Ticker + +import Control.Concurrent.Lifted.Instrument +import Control.Concurrent.STM +import Control.Monad +import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as Char8 +import Data.Function +import Data.Hashable +import Data.Maybe +import Data.Time.Clock.POSIX + +newtype AnnounceKey = AnnounceKey ByteString + deriving (Hashable,Ord,Eq) + +packAnnounceKey :: Announcer -> String -> STM AnnounceKey +packAnnounceKey _ = return . AnnounceKey . Char8.pack + +unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String +unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs + +data ScheduledItem + = forall r. ScheduledItem (AnnounceMethod r) + | StopAnnouncer data Announcer = Announcer + { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) + , announcerActive :: TVar Bool + , lastTick :: TVar POSIXTime + , announceTicker :: Ticker + } -forkAnnouncer :: IO Announcer -forkAnnouncer = return Announcer +scheduleImmediately :: Announcer -> ScheduledItem -> STM () +scheduleImmediately announcer item + = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0) stopAnnouncer :: Announcer -> IO () -stopAnnouncer _ = return () +stopAnnouncer announcer = do + atomically $ scheduleImmediately announcer StopAnnouncer + atomically $ readTVar (announcerActive announcer) >>= check . not -data AnnounceMethod ni r = forall nid addr r tok a. AnnounceMethod +data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod { aSearch :: Search nid addr tok ni r , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) } -schedule :: Announcer -> AnnounceMethod ni r -> r -> IO () -schedule _ _ _ = return () +schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () +schedule _ _ _ _ = do + -- fork the search + -- add it to the priority queue of announce methods. + -- update ticker + return () + +cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () +cancel _ _ _ _ = return () + + +forkAnnouncer :: IO Announcer +forkAnnouncer = do + tickvar <- atomically $ newTVar 0 + ticker <- forkTicker $ writeTVar tickvar + announcer <- atomically $ Announcer <$> newTVar PSQ.empty + <*> newTVar True + <*> pure tickvar + <*> pure ticker + fork $ announceThread announcer + return announcer + + +announceThread :: Announcer -> IO () +announceThread announcer = do + myThreadId >>= flip labelThread "announcer" + fix $ \loop -> do + action <- atomically $ do + now <- readTVar $ lastTick announcer + (item,q) <- readTVar (scheduled announcer) + >>= maybe retry return . PSQ.minView + when (prio item > now) retry -- Is it time to do something? + writeTVar (scheduled announcer) q -- Remove the event from the queue. + performScheduledItem announcer item -- Go for it! + mapM_ (>> loop) action + atomically $ writeTVar (announcerActive announcer) False + +performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) +performScheduledItem announcer = \case + + (Binding _ StopAnnouncer _) -> return Nothing + + -- announcement added: + + -- wait for time to announce or for search to finish. + -- + -- time for periodic announce: + -- (re-)announce to the current known set of storing-nodes. + -- If the search is finished, restart the search. + -- + -- search finished: + -- if any of the current storing-nodes set have not been + -- announced to, announce to them. + -- + -- + -- announcement removed: + -- -cancel :: Announcer -> AnnounceMethod ni r -> r -> IO () -cancel _ _ _ = return () -- cgit v1.2.3