From eda92679a6d7d27ebb5757ba7056fc452384faac Mon Sep 17 00:00:00 2001 From: joe Date: Sun, 17 Jun 2018 17:13:53 -0400 Subject: Factored out Tox-specific scheduling to Announcer.Tox. --- Announcer.hs | 157 +++---------------------------------------------- Announcer/Tox.hs | 176 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ ToxManager.hs | 3 + dht-client.cabal | 1 + examples/dhtd.hs | 3 +- 5 files changed, 189 insertions(+), 151 deletions(-) create mode 100644 Announcer/Tox.hs diff --git a/Announcer.hs b/Announcer.hs index 7fd72e2d..f0d65656 100644 --- a/Announcer.hs +++ b/Announcer.hs @@ -11,12 +11,15 @@ module Announcer , AnnounceKey , packAnnounceKey , unpackAnnounceKey - , AnnounceMethod(..) , forkAnnouncer , stopAnnouncer - , schedule , cancel , itemStatusNum + + -- lower level, Announcer.Tox needs these. + , scheduleImmediately + , ScheduledItem(..) + , interrutible ) where import qualified Data.MinMaxPSQ as MM @@ -86,14 +89,6 @@ data Announcer = Announcer , interrutible :: InterruptibleDelay } -announceK :: Int -announceK = 8 - -data AnnounceState = forall nid addr tok ni r. AnnounceState - { aState :: SearchState nid addr tok ni r - , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) - } - -- | Schedules an event to occur long ago at the epoch (which effectively makes -- the event happen as soon as possible). Note that the caller will usually -- also want to interrupt the 'interrutible' delay so that it finds this item @@ -109,113 +104,8 @@ stopAnnouncer announcer = do interruptDelay (interrutible announcer) atomically $ readTVar (announcerActive announcer) >>= check . not --- | This type specifies an item that can be announced on appropriate nodes in --- a Kademlia network. -data AnnounceMethod r = forall nid ni sr addr tok a. - ( Show nid - , Hashable nid - , Hashable ni - , Ord addr - , Ord nid - , Ord ni - ) => AnnounceMethod - { aSearch :: Search nid addr tok ni sr - -- ^ This is the Kademlia search to run repeatedly to find the - -- nearby nodes. A new search is started whenever one is not - -- already in progress at announce time. Repeated searches are - -- likely to finish faster than the first since nearby nodes - -- are not discarded. - , aPublish :: Either (r -> sr -> IO ()) - (r -> tok -> Maybe ni -> IO (Maybe a)) - -- ^ The action to perform when we find nearby nodes. The - -- destination node is given as a Maybe so that methods that - -- treat 'Nothing' as loop-back address can be passed here, - -- however 'Nothing' will not be passed by the announcer - -- thread. - -- - -- There are two cases: - -- - -- [Left] The action to perform requires a search result. - -- This was implemented to support Tox's DHTKey and - -- Friend-Request messages. - -- - -- [Right] The action requires a "token" from the destination - -- node. This is the more typical "announce" semantics for - -- Kademlia. - , aBuckets :: TVar (R.BucketList ni) - -- ^ Set this to the current Kademlia routing table buckets. - -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4? - , aTarget :: nid - -- ^ This is the Kademlia node-id of the item being announced. - , aInterval :: POSIXTime - -- ^ Assuming we have nearby nodes from the search, the item - -- will be announced at this interval. - -- - -- Current implementation is to make the scheduled - -- announcements even if the search hasn't finished. It will - -- use the closest nodes found so far. - } - --- | Schedule a recurring Search + Announce sequence. -schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () -schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do - st <- atomically $ newSearch aSearch aTarget [] - ns <- atomically $ newTVar MM.empty - let astate = AnnounceState st ns - publishToNodes is - | Left _ <- aPublish = return () - | Right publish <- aPublish = do - forM_ is $ \(Binding ni mtok _) -> do - forM_ mtok $ \tok -> do - got <- publish r tok (Just ni) - now <- getPOSIXTime - forM_ got $ \_ -> do - atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) - announce = do -- publish to current search results - is <- atomically $ do - bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) - return $ MM.toList bs - publishToNodes is - onResult sr - | Right _ <- aPublish = return True - | Left sendit <- aPublish = do - scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do - got <- sendit r sr - -- If we had a way to get the source of a search result, we might want to - -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' - -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent - -- a message be forgotten. - -- - -- forM_ got $ \_ -> do - -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) - return () - return True -- True to keep searching. - searchAgain = do - -- Canceling a pending search here seems to make announcements more reliable. - searchCancel st - isfin <- searchIsFinished st -- Always True, since we canceled. - return $ when isfin $ void $ fork search - search = do -- thread to fork - atomically $ reset aBuckets aSearch aTarget st - searchLoop aSearch aTarget onResult st - fork $ do -- Announce to any nodes we haven't already announced to. - is <- atomically $ do - bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) - nq <- readTVar ns - return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) - $ MM.toList bs - publishToNodes is - return () - {- - atomically $ scheduleImmediately announcer k - $ SearchFinished {- st -} search announce aInterval - interruptDelay (interrutible announcer) - -} - atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) - interruptDelay (interrutible announcer) - -cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () -cancel announcer k _ _ = do +cancel :: Announcer -> AnnounceKey -> IO () +cancel announcer k = do atomically $ scheduleImmediately announcer k $ DeleteAnnouncement interruptDelay (interrutible announcer) @@ -273,37 +163,4 @@ performScheduledItem announcer now = \case (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now --- announcement started: -newAnnouncement :: STM (IO a) - -> IO () - -> IO () - -> POSIXTime - -> Announcer - -> AnnounceKey - -> POSIXTime - -> STM (IO ()) -newAnnouncement checkFin search announce interval = \announcer k now -> do - modifyTVar (scheduled announcer) - (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) - return $ void $ fork search - --- time for periodic announce: --- (re-)announce to the current known set of storing-nodes. --- TODO: If the search is finished, restart the search. -reAnnounce :: STM (IO a) - -> IO () - -> POSIXTime - -> Announcer - -> AnnounceKey - -> POSIXTime - -> STM (IO ()) -reAnnounce checkFin announce interval = \announcer k now -> do - isfin <- checkFin - modifyTVar (scheduled announcer) - (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) - return $ do - isfin - hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now - announce - diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs new file mode 100644 index 00000000..eab974bc --- /dev/null +++ b/Announcer/Tox.hs @@ -0,0 +1,176 @@ +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NondecreasingIndentation #-} +module Announcer.Tox where + -- , AnnounceMethod(..) + -- , schedule + +import Announcer +import qualified Data.MinMaxPSQ as MM +import Data.Wrapper.PSQ as PSQ +import InterruptibleDelay +import Network.Kademlia.Routing as R +import Network.Kademlia.Search + +import Control.Concurrent.Lifted.Instrument +import Control.Concurrent.STM +import Control.Monad +import Data.Hashable +import Data.Maybe +import Data.Ord +import Data.Time.Clock.POSIX +import System.IO + + +announceK :: Int +announceK = 8 + +data AnnounceState = forall nid addr tok ni r. AnnounceState + { aState :: SearchState nid addr tok ni r + , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) + } + +-- | This type specifies an item that can be announced on appropriate nodes in +-- a Kademlia network. +data AnnounceMethod r = forall nid ni sr addr tok a. + ( Show nid + , Hashable nid + , Hashable ni + , Ord addr + , Ord nid + , Ord ni + ) => AnnounceMethod + { aSearch :: Search nid addr tok ni sr + -- ^ This is the Kademlia search to run repeatedly to find the + -- nearby nodes. A new search is started whenever one is not + -- already in progress at announce time. Repeated searches are + -- likely to finish faster than the first since nearby nodes + -- are not discarded. + , aPublish :: Either (r -> sr -> IO ()) + (r -> tok -> Maybe ni -> IO (Maybe a)) + -- ^ The action to perform when we find nearby nodes. The + -- destination node is given as a Maybe so that methods that + -- treat 'Nothing' as loop-back address can be passed here, + -- however 'Nothing' will not be passed by the announcer + -- thread. + -- + -- There are two cases: + -- + -- [Left] The action to perform requires a search result. + -- This was implemented to support Tox's DHTKey and + -- Friend-Request messages. + -- + -- [Right] The action requires a "token" from the destination + -- node. This is the more typical "announce" semantics for + -- Kademlia. + , aBuckets :: TVar (R.BucketList ni) + -- ^ Set this to the current Kademlia routing table buckets. + -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4? + , aTarget :: nid + -- ^ This is the Kademlia node-id of the item being announced. + , aInterval :: POSIXTime + -- ^ Assuming we have nearby nodes from the search, the item + -- will be announced at this interval. + -- + -- Current implementation is to make the scheduled + -- announcements even if the search hasn't finished. It will + -- use the closest nodes found so far. + } + + +-- announcement started: +newAnnouncement :: STM (IO a) + -> IO () + -> IO () + -> POSIXTime + -> Announcer + -> AnnounceKey + -> POSIXTime + -> STM (IO ()) +newAnnouncement checkFin search announce interval = \announcer k now -> do + modifyTVar (scheduled announcer) + (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) + return $ void $ fork search + +-- time for periodic announce: +-- (re-)announce to the current known set of storing-nodes. +-- TODO: If the search is finished, restart the search. +reAnnounce :: STM (IO a) + -> IO () + -> POSIXTime + -> Announcer + -> AnnounceKey + -> POSIXTime + -> STM (IO ()) +reAnnounce checkFin announce interval = \announcer k now -> do + isfin <- checkFin + modifyTVar (scheduled announcer) + (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) + return $ do + isfin + hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now + announce + +-- | Schedule a recurring Search + Announce sequence. +schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () +schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do + st <- atomically $ newSearch aSearch aTarget [] + ns <- atomically $ newTVar MM.empty + let astate = AnnounceState st ns + publishToNodes is + | Left _ <- aPublish = return () + | Right publish <- aPublish = do + forM_ is $ \(Binding ni mtok _) -> do + forM_ mtok $ \tok -> do + got <- publish r tok (Just ni) + now <- getPOSIXTime + forM_ got $ \_ -> do + atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) + announce = do -- publish to current search results + is <- atomically $ do + bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) + return $ MM.toList bs + publishToNodes is + onResult sr + | Right _ <- aPublish = return True + | Left sendit <- aPublish = do + scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do + got <- sendit r sr + -- If we had a way to get the source of a search result, we might want to + -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' + -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent + -- a message be forgotten. + -- + -- forM_ got $ \_ -> do + -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) + return () + return True -- True to keep searching. + searchAgain = do + -- Canceling a pending search here seems to make announcements more reliable. + searchCancel st + isfin <- searchIsFinished st -- Always True, since we canceled. + return $ when isfin $ void $ fork search + search = do -- thread to fork + atomically $ reset aBuckets aSearch aTarget st + searchLoop aSearch aTarget onResult st + fork $ do -- Announce to any nodes we haven't already announced to. + is <- atomically $ do + bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) + nq <- readTVar ns + return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) + $ MM.toList bs + publishToNodes is + return () + {- + atomically $ scheduleImmediately announcer k + $ SearchFinished {- st -} search announce aInterval + interruptDelay (interrutible announcer) + -} + atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) + interruptDelay (interrutible announcer) + diff --git a/ToxManager.hs b/ToxManager.hs index 81def17f..9b730f55 100644 --- a/ToxManager.hs +++ b/ToxManager.hs @@ -4,6 +4,7 @@ module ToxManager where import Announcer +import Announcer.Tox import Connection -- import Control.Concurrent import Control.Concurrent.STM @@ -124,12 +125,14 @@ toxman announcer toxbkts tox presence = ToxManager forM_ kbkts $ \(akey,bkts) -> do cancel announcer akey + {- (AnnounceMethod (toxQSearch tox) (Right $ toxAnnounceSendData tox) bkts pubid toxAnnounceInterval) pub + -} , setToxConnectionPolicy = \me them p -> do let m = do meid <- readMaybe $ T.unpack $ T.take 43 me diff --git a/dht-client.cabal b/dht-client.cabal index 9dc5ceb9..3169cabd 100644 --- a/dht-client.cabal +++ b/dht-client.cabal @@ -109,6 +109,7 @@ library Text.XXD Network.Tox.ContactInfo Announcer + Announcer.Tox InterruptibleDelay ByteStringOperators ClientState diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 28e9f261..ac78d552 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -67,6 +67,7 @@ import System.Posix.Signals import Announcer +import Announcer.Tox import ToxManager import Crypto.Tox -- (zeros32,SecretKey,PublicKey, generateSecretKey, toPublic, encodeSecret, decodeSecret, userKeys) import Network.UPNP as UPNP @@ -1009,7 +1010,7 @@ clientSession s@Session{..} sock cnum h = do dhtQuery doit :: Char -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () doit '+' = schedule - doit '-' = cancel + doit '-' = \a k _ _ -> cancel a k doit _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?" matchingResult :: ( Typeable stok -- cgit v1.2.3