{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer ( Announcer(scheduled) , AnnounceKey , packAnnounceKey , unpackAnnounceKey , AnnounceMethod(..) , forkAnnouncer , stopAnnouncer , schedule , cancel , itemStatusNum ) where import qualified Data.MinMaxPSQ as MM import Data.Wrapper.PSQ as PSQ import InterruptibleDelay import Network.Kademlia.Routing as R import Network.Kademlia.Search import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Function import Data.Hashable import Data.Maybe import Data.Ord import Data.Time.Clock.POSIX import qualified GHC.Generics as Generics import Generic.Data.Internal.Meta as Lyxia newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) packAnnounceKey :: Announcer -> String -> STM AnnounceKey packAnnounceKey _ = return . AnnounceKey . Char8.pack unpackAnnounceKey :: Announcer -> AnnounceKey -> STM String unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs data ScheduledItem = DeleteAnnouncement | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime | SearchFinished (IO ()) (IO ()) POSIXTime | Announce (STM (IO ())) (IO ()) POSIXTime | SearchResult (STM (IO ())) | StopAnnouncer -- Can't use Data because STM and IO. :( -- deriving Data {- itemStatusNum sch = constrIndex $ toConstr sch -} -- Using Generic to accomplish the job. deriving Generics.Generic itemStatusNum :: ScheduledItem -> Int itemStatusNum sch = Lyxia.conIdToInt $ Lyxia.conId sch data Announcer = Announcer { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) , announcerActive :: TVar Bool , interrutible :: InterruptibleDelay } announceK :: Int announceK = 8 data AnnounceState = forall nid addr tok ni r. AnnounceState { aState :: SearchState nid addr tok ni r , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) } scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () scheduleImmediately announcer k item = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. stopAnnouncer :: Announcer -> IO () stopAnnouncer announcer = do atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer interruptDelay (interrutible announcer) atomically $ readTVar (announcerActive announcer) >>= check . not -- | This type specifies an item that can be announced on appropriate nodes in -- a Kademlia network. data AnnounceMethod r = forall nid ni sr addr tok a. ( Show nid , Hashable nid , Hashable ni , Ord addr , Ord nid , Ord ni ) => AnnounceMethod { aSearch :: Search nid addr tok ni sr -- ^ This is the Kademlia search to run repeatedly to find the -- nearby nodes. A new search is started whenever one is not -- already in progress at announce time. Repeated searches are -- likely to finish faster than the first since nearby nodes -- are not discarded. , aPublish :: Either (r -> sr -> IO ()) (r -> tok -> Maybe ni -> IO (Maybe a)) -- ^ The action to perform when we find nearby nodes. The -- destination node is given as a Maybe so that methods that -- treat 'Nothing' as loop-back address can be passed here, -- however 'Nothing' will not be passed by the announcer -- thread. -- -- There are two cases: -- -- [Left] The action to perform requires a search result. -- This was implemented to support Tox's DHTKey and -- Friend-Request messages. -- -- [Right] The action requires a "token" from the destination -- node. This is the more typical "announce" semantics for -- Kademlia. , aBuckets :: TVar (R.BucketList ni) -- ^ Set this to the current Kademlia routing table buckets. , aTarget :: nid -- ^ This is the Kademlia node-id of the item being announced. , aInterval :: POSIXTime -- ^ Assuming we have nearby nodes from the search, the item -- will be announced at this interval. -- -- Current implementation is to make the scheduled -- announcements even if the search hasn't finished. It will -- use the closest nodes found so far. } -- | Schedule a recurring Search + Announce sequence. schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do st <- atomically $ newSearch aSearch aTarget [] ns <- atomically $ newTVar MM.empty let astate = AnnounceState st ns publishToNodes is | Left _ <- aPublish = return () | Right publish <- aPublish = do forM_ is $ \(Binding ni mtok _) -> do forM_ mtok $ \tok -> do got <- publish r tok (Just ni) now <- getPOSIXTime forM_ got $ \_ -> do atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) announce = do -- publish to current search results is <- atomically $ do bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) return $ MM.toList bs publishToNodes is onResult sr | Right _ <- aPublish = return True | Left sendit <- aPublish = do scheduleImmediately announcer k $ SearchResult $ return $ do got <- sendit r sr -- If we had a way to get the source of a search result, we might want to -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent -- a message be forgotten. -- -- forM_ got $ \_ -> do -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) return () return True -- True to keep searching. searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search search = do -- thread to fork atomically $ reset aBuckets aSearch aTarget st searchLoop aSearch aTarget onResult st fork $ do -- Announce to any nodes we haven't already announced to. is <- atomically $ do bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) nq <- readTVar ns return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) $ MM.toList bs publishToNodes is return () {- atomically $ scheduleImmediately announcer k $ SearchFinished {- st -} search announce aInterval interruptDelay (interrutible announcer) -} atomically $ scheduleImmediately announcer k $ NewAnnouncement searchAgain search announce aInterval interruptDelay (interrutible announcer) cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () cancel announcer k _ _ = do atomically $ scheduleImmediately announcer k $ DeleteAnnouncement interruptDelay (interrutible announcer) -- | Construct an 'Announcer' object and fork a thread in which to perform the -- Kademlia searches and announces. forkAnnouncer :: IO Announcer forkAnnouncer = do delay <- interruptibleDelay announcer <- atomically $ Announcer <$> newTVar PSQ.empty <*> newTVar True <*> pure delay fork $ announceThread announcer return announcer announceThread :: Announcer -> IO () announceThread announcer = do myThreadId >>= flip labelThread "announcer" fix $ \loop -> do join $ atomically $ do item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) return $ do now <- getPOSIXTime -- Is it time to do something? if (prio item < now) then do -- Yes. Dequeue and handle this event. action <- atomically $ do modifyTVar' (scheduled announcer) (PSQ.delete (key item)) performScheduledItem announcer now item -- Are we finished? mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. action else do -- No. Wait a bit. startDelay (interrutible announcer) (microseconds $ prio item - now) loop -- We're done. Let 'stopAnnouncer' know that it can stop blocking. atomically $ writeTVar (announcerActive announcer) False performScheduledItem :: Announcer -> POSIXTime -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) performScheduledItem announcer now = \case (Binding _ StopAnnouncer _) -> return Nothing -- announcement started: (Binding k (NewAnnouncement checkFin search announce interval) _) -> do modifyTVar (scheduled announcer) (PSQ.insert' k (Announce checkFin announce interval) (now + interval)) return $ Just $ void $ fork search -- announcement removed: (Binding k DeleteAnnouncement _) -> return $ Just $ return () -- time for periodic announce: -- (re-)announce to the current known set of storing-nodes. -- TODO: If the search is finished, restart the search. (Binding k (Announce checkFin announce interval) _) -> do isfin <- checkFin modifyTVar (scheduled announcer) (PSQ.insert' k (Announce checkFin announce interval) (now + interval)) return $ Just $ do isfin announce -- search finished: -- if any of the current storing-nodes set have not been -- announced to, announce to them. (Binding _ (SearchFinished {- st -} search announce interval) _) -> return $ Just $ return () (Binding _ (SearchResult action) _) -> Just <$> action