From 9eef4cbd00586df2fad36f3cab3d04b807b92e2f Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 31 Oct 2017 17:42:15 -0400 Subject: WIP: a command (recurring announcements) (Part 4) --- Announcer.hs | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++----- Ticker.hs | 36 +++++++++++++++++ examples/dhtd.hs | 15 ++++--- 3 files changed, 154 insertions(+), 15 deletions(-) create mode 100644 Ticker.hs diff --git a/Announcer.hs b/Announcer.hs index bc52ee38..8008267c 100644 --- a/Announcer.hs +++ b/Announcer.hs @@ -1,23 +1,121 @@ -{-# LANGUAGE ExistentialQuantification #-} -module Announcer where +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +module Announcer + ( Announcer + , AnnounceKey + , packAnnounceKey + , unpackAnnounceKey + , AnnounceMethod(..) + , forkAnnouncer + , stopAnnouncer + , schedule + , cancel + ) where +import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search +import Ticker + +import Control.Concurrent.Lifted.Instrument +import Control.Concurrent.STM +import Control.Monad +import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as Char8 +import Data.Function +import Data.Hashable +import Data.Maybe +import Data.Time.Clock.POSIX + +newtype AnnounceKey = AnnounceKey ByteString + deriving (Hashable,Ord,Eq) + +packAnnounceKey :: Announcer -> String -> STM AnnounceKey +packAnnounceKey _ = return . AnnounceKey . Char8.pack + +unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String +unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs + +data ScheduledItem + = forall r. ScheduledItem (AnnounceMethod r) + | StopAnnouncer data Announcer = Announcer + { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) + , announcerActive :: TVar Bool + , lastTick :: TVar POSIXTime + , announceTicker :: Ticker + } -forkAnnouncer :: IO Announcer -forkAnnouncer = return Announcer +scheduleImmediately :: Announcer -> ScheduledItem -> STM () +scheduleImmediately announcer item + = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0) stopAnnouncer :: Announcer -> IO () -stopAnnouncer _ = return () +stopAnnouncer announcer = do + atomically $ scheduleImmediately announcer StopAnnouncer + atomically $ readTVar (announcerActive announcer) >>= check . not -data AnnounceMethod ni r = forall nid addr r tok a. AnnounceMethod +data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod { aSearch :: Search nid addr tok ni r , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) } -schedule :: Announcer -> AnnounceMethod ni r -> r -> IO () -schedule _ _ _ = return () +schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () +schedule _ _ _ _ = do + -- fork the search + -- add it to the priority queue of announce methods. + -- update ticker + return () + +cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () +cancel _ _ _ _ = return () + + +forkAnnouncer :: IO Announcer +forkAnnouncer = do + tickvar <- atomically $ newTVar 0 + ticker <- forkTicker $ writeTVar tickvar + announcer <- atomically $ Announcer <$> newTVar PSQ.empty + <*> newTVar True + <*> pure tickvar + <*> pure ticker + fork $ announceThread announcer + return announcer + + +announceThread :: Announcer -> IO () +announceThread announcer = do + myThreadId >>= flip labelThread "announcer" + fix $ \loop -> do + action <- atomically $ do + now <- readTVar $ lastTick announcer + (item,q) <- readTVar (scheduled announcer) + >>= maybe retry return . PSQ.minView + when (prio item > now) retry -- Is it time to do something? + writeTVar (scheduled announcer) q -- Remove the event from the queue. + performScheduledItem announcer item -- Go for it! + mapM_ (>> loop) action + atomically $ writeTVar (announcerActive announcer) False + +performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) +performScheduledItem announcer = \case + + (Binding _ StopAnnouncer _) -> return Nothing + + -- announcement added: + + -- wait for time to announce or for search to finish. + -- + -- time for periodic announce: + -- (re-)announce to the current known set of storing-nodes. + -- If the search is finished, restart the search. + -- + -- search finished: + -- if any of the current storing-nodes set have not been + -- announced to, announce to them. + -- + -- + -- announcement removed: + -- -cancel :: Announcer -> AnnounceMethod ni r -> r -> IO () -cancel _ _ _ = return () diff --git a/Ticker.hs b/Ticker.hs new file mode 100644 index 00000000..ba423def --- /dev/null +++ b/Ticker.hs @@ -0,0 +1,36 @@ +module Ticker where + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Data.Function +import Data.Time.Clock.POSIX +import Data.Typeable + +data ScheduleTick = ScheduleTick Int deriving (Show, Typeable) +instance Exception ScheduleTick + +newtype Ticker = Ticker ThreadId + +-- | Fork a thread that will invoke an STM transaction whenever a tick event +-- occurs. The first tick happens immediately, later ticks must be scheduled +-- with 'scheduleTick'. +forkTicker :: (POSIXTime -> STM ()) -> IO Ticker +forkTicker tick = do + getPOSIXTime >>= atomically . tick + tid <- forkIO $ fix $ \loop -> do + getPOSIXTime >>= atomically . tick + catch loop $ \(ScheduleTick interval) -> do + threadDelay interval + when (interval >= 0) loop + return $ Ticker tid + +-- | Schedule the next tick. If you supply a negative number, the ticker +-- thread will terminate. Otherwise, the next tick will be scheduled for the +-- given number of microseconds from now. +-- +-- Note: If a tick was scheduled to happen sooner, that tick will be canceled +-- in favor of this one. Only one tick is scheduled at a time. +scheduleTick :: Ticker -> Int -> IO () +scheduleTick (Ticker tid) usecs = throwTo tid (ScheduleTick usecs) diff --git a/examples/dhtd.hs b/examples/dhtd.hs index fa4ce95b..931e1ba0 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -311,6 +311,8 @@ forkSearch method nid DHTQuery{qsearch,qshowTok,qshowR} dhtSearches dhtBuckets , searchResults = results } modifyTVar' dhtSearches $ Map.insert (method,nid) new + -- Finally, we write the search loop action into a tvar that will be executed in a new + -- thread. writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st reportSearchResults :: (Show t, Ord t1, Ord t, Hashable t) => @@ -552,10 +554,10 @@ clientSession s@Session{..} sock cnum h = do (dtastr,ys) = break isSpace $ dropWhile isSpace xs a = Map.lookup method dhtAnnouncables q = Map.lookup method dhtQuery - doit :: Char -> proxy ni -> Announcer -> AnnounceMethod ni r -> r -> IO () + doit :: Char -> proxy ni -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () doit '+' _ = schedule doit '-' _ = cancel - doit _ _ = \_ _ _ -> hPutClient h "Starting(+) or canceling(-)?" + doit _ _ = \_ _ _ _ -> hPutClient h "Starting(+) or canceling(-)?" matchingResult :: ( Typeable sr , Typeable stok @@ -573,9 +575,12 @@ clientSession s@Session{..} sock cnum h = do DHTQuery { qsearch } <- q (Refl,Refl,nr@Refl) <- matchingResult qsearch announceSendData dta <- either (const Nothing) Just $ announceParseData dtastr - return $ doit op nr announcer - (AnnounceMethod qsearch announceSendData) - dta + return $ do + akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) + doit op nr announcer + akey + (AnnounceMethod qsearch announceSendData) + dta fromMaybe (hPutClient h "error.") mameth ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts -- cgit v1.2.3