{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer.Tox where -- , AnnounceMethod(..) -- , schedule import Announcer import qualified Data.MinMaxPSQ as MM import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.Hashable import Data.Maybe import Data.Ord import Data.Time.Clock.POSIX announceK :: Int announceK = 8 data AnnounceState = forall nid addr tok ni r qk. AnnounceState { aState :: SearchState nid addr tok ni r qk , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) } -- | This type specifies an item that can be announced on appropriate nodes in -- a Kademlia network. data AnnounceMethod r = forall nid ni sr addr tok a qk. ( Show nid , Hashable nid , Hashable ni , Ord addr , Ord nid , Ord ni , Ord qk ) => AnnounceMethod { aSearch :: Search nid addr tok ni sr qk -- ^ This is the Kademlia search to run repeatedly to find the -- nearby nodes. A new search is started whenever one is not -- already in progress at announce time. Repeated searches are -- likely to finish faster than the first since nearby nodes -- are not discarded. , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) -- ^ The action to perform when we find nearby nodes. The -- destination node is given as a Maybe so that methods that -- treat 'Nothing' as loop-back address can be passed here, -- however 'Nothing' will not be passed by the announcer -- thread. -- -- The action requires a "token" from the destination -- node. This is the more typical "announce" semantics for -- Kademlia. , aNearestNodes :: nid -> STM [ni] -- ^ Method to obtain starting nodes from an iterative Kademlia search. , aTarget :: nid -- ^ This is the Kademlia node-id of the item being announced. , aInterval :: POSIXTime -- ^ Assuming we have nearby nodes from the search, the item -- will be announced at this interval. -- -- Current implementation is to make the scheduled -- announcements even if the search hasn't finished. It will -- use the closest nodes found so far. } -- | This type specifies a Kademlia search and an action to perform upon the result. data SearchMethod r = forall nid ni sr addr tok a qk. ( Show nid , Hashable nid , Hashable ni , Ord addr , Ord nid , Ord ni , Ord qk ) => SearchMethod { sSearch :: Search nid addr tok ni sr qk -- ^ This is the Kademlia search to run repeatedly to find the -- nearby nodes. A new search is started whenever one is not -- already in progress at announce time. Repeated searches are -- likely to finish faster than the first since nearby nodes -- are not discarded. -- -- XXX: Currently, "repeatedly" is wrong. , sWithResult :: r -> sr -> IO () -- ^ -- The action to perform upon a search result. This was -- implemented to support Tox's DHTKey and Friend-Request -- messages. , sNearestNodes :: nid -> STM [ni] -- ^ Method to obtain starting nodes from an iterative Kademlia search. , sTarget :: nid -- ^ This is the Kademlia node-id of the item being announced. , sInterval :: POSIXTime -- ^ The time between searches. -- -- XXX: Currently, search results will stop any repetition. } -- announcement started: newAnnouncement :: STM (IO a) -> IO () -> IO () -> POSIXTime -> Announcer -> AnnounceKey -> POSIXTime -> STM (IO ()) newAnnouncement checkFin search announce interval = \announcer k now -> do scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval) return $ void $ fork search -- time for periodic announce: -- (re-)announce to the current known set of storing-nodes. -- TODO: If the search is finished, restart the search. reAnnounce :: STM (IO a) -> IO () -> POSIXTime -> Announcer -> AnnounceKey -> POSIXTime -> STM (IO ()) reAnnounce checkFin announce interval = \announcer k now -> do isfin <- checkFin scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval) return $ do isfin announce -- | Schedule a recurring Search + Announce sequence. scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do st <- atomically $ newSearch aSearch aTarget [] ns <- atomically $ newTVar MM.empty mutex <- newMVar () -- This mutex insures one search at a time. let astate = AnnounceState st ns publishToNodes is = do forM_ is $ \(Binding ni mtok _) -> do forM_ mtok $ \tok -> do got <- aPublish r tok (Just ni) now <- getPOSIXTime forM_ got $ \_ -> do atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) announce = do -- publish to current search results is <- atomically $ do bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) return $ MM.toList bs publishToNodes is onResult sr = return True searchAgain = do return $ void $ do t <- fork search labelThread t ("scheduleAnnounce.sch." ++ show aTarget) search = do -- thread to fork got <- tryTakeMVar mutex case got of Just () -> do me <- myThreadId labelThread me "scheduleAnnounce.reset" reset aNearestNodes aSearch aTarget st labelThread me "scheduleAnnounce.searchLoop" searchLoop aSearch aTarget onResult st -- Announce to any nodes we haven't already announced to. is <- atomically $ do bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) nq <- readTVar ns return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) $ MM.toList bs publishToNodes is putMVar mutex () Nothing -> do -- Previous search did not finish. Instead of starting a new search, -- we will re-announce only. announce -- Cancel search so that a new one can start in the nest period. atomically $ searchCancel st atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) -- | Schedule a recurring Search + Publish sequence. scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do st <- atomically $ newSearch sSearch sTarget [] ns <- atomically $ newTVar MM.empty mutex <- newMVar () -- This mutex insures one search at a time. let astate = AnnounceState st ns onResult sr = do runAction announcer ("search-result:"++unpackAnnounceKey announcer k) $ do got <- sWithResult r sr -- If we had a way to get the source of a search result, we might want to -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent -- a message be forgotten. -- -- forM_ got $ \_ -> do -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) return () return True -- True to keep searching. searchAgain = do return $ void $ do t <- fork search labelThread t ("scheduleSearch.sch." ++ show sTarget) search = do -- thread to fork got <- tryTakeMVar mutex case got of Just () -> do me <- myThreadId labelThread me "scheduleSearch.reset" reset sNearestNodes sSearch sTarget st labelThread me "scheduleSearch.searchLoop" searchLoop sSearch sTarget onResult st putMVar mutex () Nothing -> do -- Cancel search so that a new one can start in the nest period. atomically $ searchCancel st atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval)