{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer ( Announcer , AnnounceKey , packAnnounceKey , unpackAnnounceKey , AnnounceMethod(..) , forkAnnouncer , stopAnnouncer , schedule , cancel ) where import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search import InterruptibleDelay import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Function import Data.Hashable import Data.Maybe import Data.Time.Clock.POSIX newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) packAnnounceKey :: Announcer -> String -> STM AnnounceKey packAnnounceKey _ = return . AnnounceKey . Char8.pack unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs data ScheduledItem = forall r. ScheduledItem (AnnounceMethod r) | StopAnnouncer data Announcer = Announcer { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) , announcerActive :: TVar Bool , interrutible :: InterruptibleDelay } scheduleImmediately :: Announcer -> ScheduledItem -> STM () scheduleImmediately announcer item = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0) stopAnnouncer :: Announcer -> IO () stopAnnouncer announcer = do atomically $ scheduleImmediately announcer StopAnnouncer atomically $ readTVar (announcerActive announcer) >>= check . not data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod { aSearch :: Search nid addr tok ni r , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) } -- startDelay :: InterruptibleDelay -> Microseconds -> IO Bool -- interruptDelay :: InterruptibleDelay -> IO () schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () schedule announcer _ _ _ = do -- fork the search -- add it to the priority queue of announce methods. interruptDelay (interrutible announcer) cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () cancel announcer _ _ _ = do -- cancel search/announce interruptDelay (interrutible announcer) forkAnnouncer :: IO Announcer forkAnnouncer = do delay <- interruptibleDelay announcer <- atomically $ Announcer <$> newTVar PSQ.empty <*> newTVar True <*> pure delay fork $ announceThread announcer return announcer announceThread :: Announcer -> IO () announceThread announcer = do myThreadId >>= flip labelThread "announcer" fix $ \loop -> do join $ atomically $ do item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) return $ do now <- getPOSIXTime -- Is it time to do something? if (prio item > now) then do -- Yes. Dequeue and handle this event. action <- atomically $ do modifyTVar' (scheduled announcer) (PSQ.delete (key item)) performScheduledItem announcer item -- Are we finished? mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. action else do -- No. Wait a bit. startDelay (interrutible announcer) (microseconds $ prio item - now) loop -- We're done. Let 'stopAnnouncer' know that it can stop blocking. atomically $ writeTVar (announcerActive announcer) False performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) performScheduledItem announcer = \case (Binding _ StopAnnouncer _) -> return Nothing -- announcement added: -- wait for time to announce or for search to finish. -- -- time for periodic announce: -- (re-)announce to the current known set of storing-nodes. -- If the search is finished, restart the search. -- -- search finished: -- if any of the current storing-nodes set have not been -- announced to, announce to them. -- -- -- announcement removed: --