path: root/dht/Announcer
diff options
authorJames Crayne <>2019-09-28 13:43:29 -0400
committerJoe Crayne <>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/Announcer
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/Announcer')
1 files changed, 206 insertions, 0 deletions
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs
new file mode 100644
index 00000000..f8343f8d
--- /dev/null
+++ b/dht/Announcer/Tox.hs
@@ -0,0 +1,206 @@
1{-# LANGUAGE DeriveDataTypeable #-}
2{-# LANGUAGE DeriveGeneric #-}
3{-# LANGUAGE ExistentialQuantification #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE NamedFieldPuns #-}
8{-# LANGUAGE NondecreasingIndentation #-}
9module Announcer.Tox where
10 -- , AnnounceMethod(..)
11 -- , schedule
13import Announcer
14import qualified Data.MinMaxPSQ as MM
15import Data.Wrapper.PSQ as PSQ
16import Network.Kademlia.Search
18import Control.Concurrent.Lifted.Instrument
19import Control.Concurrent.STM
20import Control.Monad
21import Data.Hashable
22import Data.Maybe
23import Data.Ord
24import Data.Time.Clock.POSIX
27announceK :: Int
28announceK = 8
30data AnnounceState = forall nid addr tok ni r. AnnounceState
31 { aState :: SearchState nid addr tok ni r
32 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
33 }
35-- | This type specifies an item that can be announced on appropriate nodes in
36-- a Kademlia network.
37data AnnounceMethod r = forall nid ni sr addr tok a.
38 ( Show nid
39 , Hashable nid
40 , Hashable ni
41 , Ord addr
42 , Ord nid
43 , Ord ni
44 ) => AnnounceMethod
45 { aSearch :: Search nid addr tok ni sr
46 -- ^ This is the Kademlia search to run repeatedly to find the
47 -- nearby nodes. A new search is started whenever one is not
48 -- already in progress at announce time. Repeated searches are
49 -- likely to finish faster than the first since nearby nodes
50 -- are not discarded.
51 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
52 -- ^ The action to perform when we find nearby nodes. The
53 -- destination node is given as a Maybe so that methods that
54 -- treat 'Nothing' as loop-back address can be passed here,
55 -- however 'Nothing' will not be passed by the announcer
56 -- thread.
57 --
58 -- The action requires a "token" from the destination
59 -- node. This is the more typical "announce" semantics for
60 -- Kademlia.
61 , aNearestNodes :: nid -> STM [ni]
62 -- ^ Method to obtain starting nodes from an iterative Kademlia search.
63 , aTarget :: nid
64 -- ^ This is the Kademlia node-id of the item being announced.
65 , aInterval :: POSIXTime
66 -- ^ Assuming we have nearby nodes from the search, the item
67 -- will be announced at this interval.
68 --
69 -- Current implementation is to make the scheduled
70 -- announcements even if the search hasn't finished. It will
71 -- use the closest nodes found so far.
72 }
74-- | This type specifies a Kademlia search and an action to perform upon the result.
75data SearchMethod r = forall nid ni sr addr tok a.
76 ( Show nid
77 , Hashable nid
78 , Hashable ni
79 , Ord addr
80 , Ord nid
81 , Ord ni
82 ) => SearchMethod
83 { sSearch :: Search nid addr tok ni sr
84 -- ^ This is the Kademlia search to run repeatedly to find the
85 -- nearby nodes. A new search is started whenever one is not
86 -- already in progress at announce time. Repeated searches are
87 -- likely to finish faster than the first since nearby nodes
88 -- are not discarded.
89 --
90 -- XXX: Currently, "repeatedly" is wrong.
91 , sWithResult :: r -> sr -> IO ()
92 -- ^
93 -- The action to perform upon a search result. This was
94 -- implemented to support Tox's DHTKey and Friend-Request
95 -- messages.
96 , sNearestNodes :: nid -> STM [ni]
97 -- ^ Method to obtain starting nodes from an iterative Kademlia search.
98 , sTarget :: nid
99 -- ^ This is the Kademlia node-id of the item being announced.
100 , sInterval :: POSIXTime
101 -- ^ The time between searches.
102 --
103 -- XXX: Currently, search results will stop any repetition.
104 }
107-- announcement started:
108newAnnouncement :: STM (IO a)
109 -> IO ()
110 -> IO ()
111 -> POSIXTime
112 -> Announcer
113 -> AnnounceKey
114 -> POSIXTime
115 -> STM (IO ())
116newAnnouncement checkFin search announce interval = \announcer k now -> do
117 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)
118 return $ void $ fork search
120-- time for periodic announce:
121-- (re-)announce to the current known set of storing-nodes.
122-- TODO: If the search is finished, restart the search.
123reAnnounce :: STM (IO a)
124 -> IO ()
125 -> POSIXTime
126 -> Announcer
127 -> AnnounceKey
128 -> POSIXTime
129 -> STM (IO ())
130reAnnounce checkFin announce interval = \announcer k now -> do
131 isfin <- checkFin
132 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)
133 return $ do
134 isfin
135 announce
137-- | Schedule a recurring Search + Announce sequence.
138scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
139scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do
140 st <- atomically $ newSearch aSearch aTarget []
141 ns <- atomically $ newTVar MM.empty
142 let astate = AnnounceState st ns
143 publishToNodes is = do
144 forM_ is $ \(Binding ni mtok _) -> do
145 forM_ mtok $ \tok -> do
146 got <- aPublish r tok (Just ni)
147 now <- getPOSIXTime
148 forM_ got $ \_ -> do
149 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
150 announce = do -- publish to current search results
151 is <- atomically $ do
152 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
153 return $ MM.toList bs
154 publishToNodes is
155 onResult sr = return True
156 searchAgain = do
157 -- Canceling a pending search here seems to make announcements more reliable.
158 searchCancel st
159 isfin <- searchIsFinished st -- Always True, since we canceled.
160 return $ when isfin $ void $ fork search
161 search = do -- thread to fork
162 atomically $ reset aNearestNodes aSearch aTarget st
163 searchLoop aSearch aTarget onResult st
164 fork $ do -- Announce to any nodes we haven't already announced to.
165 is <- atomically $ do
166 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
167 nq <- readTVar ns
168 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
169 $ MM.toList bs
170 publishToNodes is
171 return ()
172 {-
173 atomically $ scheduleImmediately announcer k
174 $ SearchFinished {- st -} search announce aInterval
175 interruptDelay (interrutible announcer)
176 -}
177 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
179-- | Schedule a recurring Search + Publish sequence.
180scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO ()
181scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do
182 st <- atomically $ newSearch sSearch sTarget []
183 ns <- atomically $ newTVar MM.empty
184 let astate = AnnounceState st ns
185 onResult sr = do
186 runAction announcer $ do
187 got <- sWithResult r sr
188 -- If we had a way to get the source of a search result, we might want to
189 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
190 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
191 -- a message be forgotten.
192 --
193 -- forM_ got $ \_ -> do
194 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
195 return ()
196 return True -- True to keep searching.
197 searchAgain = do
198 -- Canceling a pending search here seems to make announcements more reliable.
199 searchCancel st
200 isfin <- searchIsFinished st -- Always True, since we canceled.
201 return $ when isfin $ void $ fork search
202 search = do -- thread to fork
203 atomically $ reset sNearestNodes sSearch sTarget st
204 searchLoop sSearch sTarget onResult st
205 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval)