{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer ( Announcer , AnnounceKey , packAnnounceKey , unpackAnnounceKey , AnnounceMethod(..) , forkAnnouncer , stopAnnouncer , schedule , cancel ) where import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search import InterruptibleDelay import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Function import Data.Hashable import Data.Maybe import Data.Time.Clock.POSIX newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) packAnnounceKey :: Announcer -> String -> STM AnnounceKey packAnnounceKey _ = return . AnnounceKey . Char8.pack unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs data ScheduledItem = forall r. ScheduledItem (AnnounceMethod r) | StopAnnouncer | NewAnnouncement (IO ()) (IO ()) POSIXTime | Announce (IO ()) POSIXTime | DeleteAnnouncement | SearchFinished data Announcer = Announcer { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) , announcerActive :: TVar Bool , interrutible :: InterruptibleDelay } scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () scheduleImmediately announcer k item = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) stopAnnouncer :: Announcer -> IO () stopAnnouncer announcer = do atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer interruptDelay (interrutible announcer) atomically $ readTVar (announcerActive announcer) >>= check . not data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod { aSearch :: Search nid addr tok ni r , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) } schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () schedule announcer k AnnounceMethod{aSearch,aPublish} r = do let announce = _todo :: IO () -- publish to current search results onResult _ = return True search = _todo :: IO () -- thread to fork -- ns <- R.kclosest (searchSpace qsearch) searchK nid <$> readTVar dhtBuckets -- st <- newSearch qsearch nid ns -- searchLoop :: Search nid addr tok ni r -> nid -> (r -> STM Bool) -> SearchState nid addr tok ni r -> IO () -- searchLoop aSearch nid onResult st interval = _todo :: POSIXTime -- publish interval atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce interval interruptDelay (interrutible announcer) cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () cancel announcer k _ _ = do atomically $ scheduleImmediately announcer k $ DeleteAnnouncement interruptDelay (interrutible announcer) forkAnnouncer :: IO Announcer forkAnnouncer = do delay <- interruptibleDelay announcer <- atomically $ Announcer <$> newTVar PSQ.empty <*> newTVar True <*> pure delay fork $ announceThread announcer return announcer announceThread :: Announcer -> IO () announceThread announcer = do myThreadId >>= flip labelThread "announcer" fix $ \loop -> do join $ atomically $ do item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) return $ do now <- getPOSIXTime -- Is it time to do something? if (prio item < now) then do -- Yes. Dequeue and handle this event. action <- atomically $ do modifyTVar' (scheduled announcer) (PSQ.delete (key item)) performScheduledItem announcer now item -- Are we finished? mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. action else do -- No. Wait a bit. startDelay (interrutible announcer) (microseconds $ prio item - now) loop -- We're done. Let 'stopAnnouncer' know that it can stop blocking. atomically $ writeTVar (announcerActive announcer) False performScheduledItem :: Announcer -> POSIXTime -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) performScheduledItem announcer now = \case (Binding _ StopAnnouncer _) -> return Nothing -- announcement started: (Binding k (NewAnnouncement search announce interval) _) -> do modifyTVar (scheduled announcer) (PSQ.insert' k (Announce announce interval) (now + interval)) return $ Just $ do interruptDelay (interrutible announcer) fork search return () -- announcement removed: (Binding k DeleteAnnouncement _) -> return $ Just $ return () -- time for periodic announce: -- (re-)announce to the current known set of storing-nodes. -- TODO: If the search is finished, restart the search. (Binding k (Announce announce interval) _) -> do modifyTVar (scheduled announcer) (PSQ.insert' k (Announce announce interval) (now + interval)) return $ Just announce -- search finished: -- if any of the current storing-nodes set have not been -- announced to, announce to them. (Binding _ SearchFinished _) -> return $ Just $ return ()