From a45b000e07a806e171f1e4701abd3e025382ecf3 Mon Sep 17 00:00:00 2001 From: joe Date: Mon, 18 Jun 2018 13:32:02 -0400 Subject: Factored separate code-paths for Kademlia announce versus action on search result. --- Announcer/Tox.hs | 102 ++++++++++++++++++++++++++++++++++++++----------------- ToxManager.hs | 4 +-- ToxToXMPP.hs | 6 ++-- examples/dhtd.hs | 20 ++++++----- 4 files changed, 88 insertions(+), 44 deletions(-) diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs index c9164d22..f79f8ee7 100644 --- a/Announcer/Tox.hs +++ b/Announcer/Tox.hs @@ -50,23 +50,16 @@ data AnnounceMethod r = forall nid ni sr addr tok a. -- already in progress at announce time. Repeated searches are -- likely to finish faster than the first since nearby nodes -- are not discarded. - , aPublish :: Either (r -> sr -> IO ()) - (r -> tok -> Maybe ni -> IO (Maybe a)) + , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) -- ^ The action to perform when we find nearby nodes. The -- destination node is given as a Maybe so that methods that -- treat 'Nothing' as loop-back address can be passed here, -- however 'Nothing' will not be passed by the announcer -- thread. -- - -- There are two cases: - -- - -- [Left] The action to perform requires a search result. - -- This was implemented to support Tox's DHTKey and - -- Friend-Request messages. - -- - -- [Right] The action requires a "token" from the destination - -- node. This is the more typical "announce" semantics for - -- Kademlia. + -- The action requires a "token" from the destination + -- node. This is the more typical "announce" semantics for + -- Kademlia. , aNearestNodes :: nid -> STM [ni] -- ^ Method to obtain starting nodes from an iterative Kademlia search. , aTarget :: nid @@ -80,6 +73,38 @@ data AnnounceMethod r = forall nid ni sr addr tok a. -- use the closest nodes found so far. } +-- | This type specifies a Kademlia search and an action to perform upon the result. +data SearchMethod r = forall nid ni sr addr tok a. + ( Show nid + , Hashable nid + , Hashable ni + , Ord addr + , Ord nid + , Ord ni + ) => SearchMethod + { sSearch :: Search nid addr tok ni sr + -- ^ This is the Kademlia search to run repeatedly to find the + -- nearby nodes. A new search is started whenever one is not + -- already in progress at announce time. Repeated searches are + -- likely to finish faster than the first since nearby nodes + -- are not discarded. + -- + -- XXX: Currently, "repeatedly" is wrong. + , sWithResult :: r -> sr -> IO () + -- ^ + -- The action to perform upon a search result. This was + -- implemented to support Tox's DHTKey and Friend-Request + -- messages. + , sNearestNodes :: nid -> STM [ni] + -- ^ Method to obtain starting nodes from an iterative Kademlia search. + , sTarget :: nid + -- ^ This is the Kademlia node-id of the item being announced. + , sInterval :: POSIXTime + -- ^ The time between searches. + -- + -- XXX: Currently, search results will stop any repetition. + } + -- announcement started: newAnnouncement :: STM (IO a) @@ -115,17 +140,15 @@ reAnnounce checkFin announce interval = \announcer k now -> do announce -- | Schedule a recurring Search + Announce sequence. -schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () -schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do +scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () +scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do st <- atomically $ newSearch aSearch aTarget [] ns <- atomically $ newTVar MM.empty let astate = AnnounceState st ns - publishToNodes is - | Left _ <- aPublish = return () - | Right publish <- aPublish = do + publishToNodes is = do forM_ is $ \(Binding ni mtok _) -> do forM_ mtok $ \tok -> do - got <- publish r tok (Just ni) + got <- aPublish r tok (Just ni) now <- getPOSIXTime forM_ got $ \_ -> do atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) @@ -134,20 +157,7 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInte bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) return $ MM.toList bs publishToNodes is - onResult sr - | Right _ <- aPublish = return True - | Left sendit <- aPublish = do - scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do - got <- sendit r sr - -- If we had a way to get the source of a search result, we might want to - -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' - -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent - -- a message be forgotten. - -- - -- forM_ got $ \_ -> do - -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) - return () - return True -- True to keep searching. + onResult sr = return True searchAgain = do -- Canceling a pending search here seems to make announcements more reliable. searchCancel st @@ -172,3 +182,33 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInte atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) interruptDelay (interrutible announcer) +-- | Schedule a recurring Search + Publish sequence. +scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () +scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do + st <- atomically $ newSearch sSearch sTarget [] + ns <- atomically $ newTVar MM.empty + let astate = AnnounceState st ns + onResult sr = do + -- XXX: Using /k/ here as the announce key is causing the search not to repeat. + scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do + got <- sWithResult r sr + -- If we had a way to get the source of a search result, we might want to + -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' + -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent + -- a message be forgotten. + -- + -- forM_ got $ \_ -> do + -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) + return () + return True -- True to keep searching. + searchAgain = do + -- Canceling a pending search here seems to make announcements more reliable. + searchCancel st + isfin <- searchIsFinished st -- Always True, since we canceled. + return $ when isfin $ void $ fork search + search = do -- thread to fork + atomically $ reset sNearestNodes sSearch sTarget st + searchLoop sSearch sTarget onResult st + atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) + interruptDelay (interrutible announcer) + diff --git a/ToxManager.hs b/ToxManager.hs index af1911d4..1e9c618d 100644 --- a/ToxManager.hs +++ b/ToxManager.hs @@ -81,10 +81,10 @@ toxman announcer toxbkts tox presence = ToxManager -- Schedule recurring announce. -- akey <- atomically $ packAnnounceKey announcer $ "toxid:" ++ show pubid - schedule announcer + scheduleAnnounce announcer akey (AnnounceMethod (toxQSearch tox) - (Right $ toxAnnounceSendData tox) + (toxAnnounceSendData tox) nearNodes pubid toxAnnounceInterval) diff --git a/ToxToXMPP.hs b/ToxToXMPP.hs index ac24ce6d..ad5cb0dd 100644 --- a/ToxToXMPP.hs +++ b/ToxToXMPP.hs @@ -165,10 +165,10 @@ forkAccountWatcher acc tox st announcer = forkIO $ do -- likelihood of failure as the chances of packet loss -- happening to all (up to to 8) packets sent is low. -- - schedule announcer + scheduleSearch announcer akey - (AnnounceMethod (toxQSearch tox) - (Left $ \theirkey rendezvous -> do + (SearchMethod (toxQSearch tox) + (\theirkey rendezvous -> do dkey <- Tox.getContactInfo tox sendMessage (Tox.toxToRoute tox) diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 28bfc9b4..7cf3393c 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -1009,10 +1009,14 @@ clientSession s@Session{..} sock cnum h = do (const method) announceSendData) dhtQuery - doit :: Char -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () - doit '+' = schedule - doit '-' = \a k _ _ -> cancel a k - doit _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?" + doitR :: Char -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () + doitR '+' = scheduleAnnounce + doitR '-' = \a k _ _ -> cancel a k + doitR _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?" + doitL :: Char -> Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () + doitL '+' = scheduleSearch + doitL '-' = \a k _ _ -> cancel a k + doitL _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?" matchingResult :: ( Typeable stok , Typeable ptok @@ -1046,9 +1050,9 @@ clientSession s@Session{..} sock cnum h = do dta <- either (const Nothing) Just $ announceParseData dtastr return $ do akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) - doit op announcer + doitR op announcer akey - (AnnounceMethod qsearch (Right asend) + (AnnounceMethod qsearch asend (\nid -> R.kclosest (searchSpace qsearch) searchK nid @@ -1072,9 +1076,9 @@ clientSession s@Session{..} sock cnum h = do pub <- selectedKey return $ do akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) - doit op announcer + doitL op announcer akey - (AnnounceMethod qsearch (Left $ asend pub) + (SearchMethod qsearch (asend pub) (\nid -> R.kclosest (searchSpace qsearch) searchK nid -- cgit v1.2.3