{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer ( Announcer , AnnounceKey , packAnnounceKey , unpackAnnounceKey , AnnounceMethod(..) , forkAnnouncer , stopAnnouncer , schedule , cancel ) where import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search import Ticker import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Function import Data.Hashable import Data.Maybe import Data.Time.Clock.POSIX newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) packAnnounceKey :: Announcer -> String -> STM AnnounceKey packAnnounceKey _ = return . AnnounceKey . Char8.pack unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs data ScheduledItem = forall r. ScheduledItem (AnnounceMethod r) | StopAnnouncer data Announcer = Announcer { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) , announcerActive :: TVar Bool , lastTick :: TVar POSIXTime , announceTicker :: Ticker } scheduleImmediately :: Announcer -> ScheduledItem -> STM () scheduleImmediately announcer item = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0) stopAnnouncer :: Announcer -> IO () stopAnnouncer announcer = do atomically $ scheduleImmediately announcer StopAnnouncer atomically $ readTVar (announcerActive announcer) >>= check . not data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod { aSearch :: Search nid addr tok ni r , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) } schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () schedule _ _ _ _ = do -- fork the search -- add it to the priority queue of announce methods. -- update ticker return () cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () cancel _ _ _ _ = return () forkAnnouncer :: IO Announcer forkAnnouncer = do tickvar <- atomically $ newTVar 0 ticker <- forkTicker $ writeTVar tickvar announcer <- atomically $ Announcer <$> newTVar PSQ.empty <*> newTVar True <*> pure tickvar <*> pure ticker fork $ announceThread announcer return announcer announceThread :: Announcer -> IO () announceThread announcer = do myThreadId >>= flip labelThread "announcer" fix $ \loop -> do action <- atomically $ do now <- readTVar $ lastTick announcer (item,q) <- readTVar (scheduled announcer) >>= maybe retry return . PSQ.minView when (prio item > now) retry -- Is it time to do something? writeTVar (scheduled announcer) q -- Remove the event from the queue. performScheduledItem announcer item -- Go for it! mapM_ (>> loop) action atomically $ writeTVar (announcerActive announcer) False performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) performScheduledItem announcer = \case (Binding _ StopAnnouncer _) -> return Nothing -- announcement added: -- wait for time to announce or for search to finish. -- -- time for periodic announce: -- (re-)announce to the current known set of storing-nodes. -- If the search is finished, restart the search. -- -- search finished: -- if any of the current storing-nodes set have not been -- announced to, announce to them. -- -- -- announcement removed: --