{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer ( Announcer , AnnounceKey , packAnnounceKey , unpackAnnounceKey , AnnounceMethod(..) , forkAnnouncer , stopAnnouncer , schedule , cancel ) where import qualified Data.MinMaxPSQ as MM import Data.Wrapper.PSQ as PSQ import InterruptibleDelay import Network.Kademlia.Routing as R import Network.Kademlia.Search import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Function import Data.Hashable import Data.Maybe import Data.Ord import Data.Time.Clock.POSIX newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) packAnnounceKey :: Announcer -> String -> STM AnnounceKey packAnnounceKey _ = return . AnnounceKey . Char8.pack unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs data ScheduledItem = StopAnnouncer | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime | SearchFinished (IO ()) (IO ()) POSIXTime | Announce (STM (IO ())) (IO ()) POSIXTime | DeleteAnnouncement data Announcer = Announcer { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) , announcerActive :: TVar Bool , interrutible :: InterruptibleDelay } announceK :: Int announceK = 8 data AnnounceState = forall nid addr tok ni r. AnnounceState { aState :: SearchState nid addr tok ni r , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) } scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () scheduleImmediately announcer k item = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) stopAnnouncer :: Announcer -> IO () stopAnnouncer announcer = do atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer interruptDelay (interrutible announcer) atomically $ readTVar (announcerActive announcer) >>= check . not data AnnounceMethod r = forall nid ni sr addr tok a. ( Show nid , Hashable nid , Hashable ni , Ord addr , Ord nid , Ord ni ) => AnnounceMethod { aSearch :: Search nid addr tok ni sr , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) , aBuckets :: TVar (R.BucketList ni) , aTarget :: nid , aInterval :: POSIXTime } schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do st <- atomically $ newSearch aSearch aTarget [] ns <- atomically $ newTVar MM.empty let astate = AnnounceState st ns publishToNodes is = do forM_ is $ \(Binding ni mtok _) -> do forM_ mtok $ \tok -> do got <- aPublish r tok (Just ni) now <- getPOSIXTime forM_ got $ \_ -> do atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) announce = do -- publish to current search results is <- atomically $ do bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) return $ MM.toList bs publishToNodes is onResult _ = return True -- action for each search-hit (True = keep searching) searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search search = do -- thread to fork atomically $ reset aBuckets aSearch aTarget st searchLoop aSearch aTarget onResult st fork $ do -- Announce to any nodes we haven't already announced to. is <- atomically $ do bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) nq <- readTVar ns return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) $ MM.toList bs publishToNodes is return () {- atomically $ scheduleImmediately announcer k $ SearchFinished {- st -} search announce aInterval interruptDelay (interrutible announcer) -} atomically $ scheduleImmediately announcer k $ NewAnnouncement searchAgain search announce aInterval interruptDelay (interrutible announcer) cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () cancel announcer k _ _ = do atomically $ scheduleImmediately announcer k $ DeleteAnnouncement interruptDelay (interrutible announcer) forkAnnouncer :: IO Announcer forkAnnouncer = do delay <- interruptibleDelay announcer <- atomically $ Announcer <$> newTVar PSQ.empty <*> newTVar True <*> pure delay fork $ announceThread announcer return announcer announceThread :: Announcer -> IO () announceThread announcer = do myThreadId >>= flip labelThread "announcer" fix $ \loop -> do join $ atomically $ do item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) return $ do now <- getPOSIXTime -- Is it time to do something? if (prio item < now) then do -- Yes. Dequeue and handle this event. action <- atomically $ do modifyTVar' (scheduled announcer) (PSQ.delete (key item)) performScheduledItem announcer now item -- Are we finished? mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. action else do -- No. Wait a bit. startDelay (interrutible announcer) (microseconds $ prio item - now) loop -- We're done. Let 'stopAnnouncer' know that it can stop blocking. atomically $ writeTVar (announcerActive announcer) False performScheduledItem :: Announcer -> POSIXTime -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) performScheduledItem announcer now = \case (Binding _ StopAnnouncer _) -> return Nothing -- announcement started: (Binding k (NewAnnouncement checkFin search announce interval) _) -> do modifyTVar (scheduled announcer) (PSQ.insert' k (Announce checkFin announce interval) (now + interval)) return $ Just $ void $ fork search -- announcement removed: (Binding k DeleteAnnouncement _) -> return $ Just $ return () -- time for periodic announce: -- (re-)announce to the current known set of storing-nodes. -- TODO: If the search is finished, restart the search. (Binding k (Announce checkFin announce interval) _) -> do isfin <- checkFin modifyTVar (scheduled announcer) (PSQ.insert' k (Announce checkFin announce interval) (now + interval)) return $ Just $ do isfin announce -- search finished: -- if any of the current storing-nodes set have not been -- announced to, announce to them. (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return ()