summaryrefslogtreecommitdiff
path: root/Announcer.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Announcer.hs')
-rw-r--r--Announcer.hs157
1 files changed, 7 insertions, 150 deletions
diff --git a/Announcer.hs b/Announcer.hs
index 7fd72e2d..f0d65656 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -11,12 +11,15 @@ module Announcer
11 , AnnounceKey 11 , AnnounceKey
12 , packAnnounceKey 12 , packAnnounceKey
13 , unpackAnnounceKey 13 , unpackAnnounceKey
14 , AnnounceMethod(..)
15 , forkAnnouncer 14 , forkAnnouncer
16 , stopAnnouncer 15 , stopAnnouncer
17 , schedule
18 , cancel 16 , cancel
19 , itemStatusNum 17 , itemStatusNum
18
19 -- lower level, Announcer.Tox needs these.
20 , scheduleImmediately
21 , ScheduledItem(..)
22 , interrutible
20 ) where 23 ) where
21 24
22import qualified Data.MinMaxPSQ as MM 25import qualified Data.MinMaxPSQ as MM
@@ -86,14 +89,6 @@ data Announcer = Announcer
86 , interrutible :: InterruptibleDelay 89 , interrutible :: InterruptibleDelay
87 } 90 }
88 91
89announceK :: Int
90announceK = 8
91
92data AnnounceState = forall nid addr tok ni r. AnnounceState
93 { aState :: SearchState nid addr tok ni r
94 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
95 }
96
97-- | Schedules an event to occur long ago at the epoch (which effectively makes 92-- | Schedules an event to occur long ago at the epoch (which effectively makes
98-- the event happen as soon as possible). Note that the caller will usually 93-- the event happen as soon as possible). Note that the caller will usually
99-- also want to interrupt the 'interrutible' delay so that it finds this item 94-- also want to interrupt the 'interrutible' delay so that it finds this item
@@ -109,113 +104,8 @@ stopAnnouncer announcer = do
109 interruptDelay (interrutible announcer) 104 interruptDelay (interrutible announcer)
110 atomically $ readTVar (announcerActive announcer) >>= check . not 105 atomically $ readTVar (announcerActive announcer) >>= check . not
111 106
112-- | This type specifies an item that can be announced on appropriate nodes in 107cancel :: Announcer -> AnnounceKey -> IO ()
113-- a Kademlia network. 108cancel announcer k = do
114data AnnounceMethod r = forall nid ni sr addr tok a.
115 ( Show nid
116 , Hashable nid
117 , Hashable ni
118 , Ord addr
119 , Ord nid
120 , Ord ni
121 ) => AnnounceMethod
122 { aSearch :: Search nid addr tok ni sr
123 -- ^ This is the Kademlia search to run repeatedly to find the
124 -- nearby nodes. A new search is started whenever one is not
125 -- already in progress at announce time. Repeated searches are
126 -- likely to finish faster than the first since nearby nodes
127 -- are not discarded.
128 , aPublish :: Either (r -> sr -> IO ())
129 (r -> tok -> Maybe ni -> IO (Maybe a))
130 -- ^ The action to perform when we find nearby nodes. The
131 -- destination node is given as a Maybe so that methods that
132 -- treat 'Nothing' as loop-back address can be passed here,
133 -- however 'Nothing' will not be passed by the announcer
134 -- thread.
135 --
136 -- There are two cases:
137 --
138 -- [Left] The action to perform requires a search result.
139 -- This was implemented to support Tox's DHTKey and
140 -- Friend-Request messages.
141 --
142 -- [Right] The action requires a "token" from the destination
143 -- node. This is the more typical "announce" semantics for
144 -- Kademlia.
145 , aBuckets :: TVar (R.BucketList ni)
146 -- ^ Set this to the current Kademlia routing table buckets.
147 -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4?
148 , aTarget :: nid
149 -- ^ This is the Kademlia node-id of the item being announced.
150 , aInterval :: POSIXTime
151 -- ^ Assuming we have nearby nodes from the search, the item
152 -- will be announced at this interval.
153 --
154 -- Current implementation is to make the scheduled
155 -- announcements even if the search hasn't finished. It will
156 -- use the closest nodes found so far.
157 }
158
159-- | Schedule a recurring Search + Announce sequence.
160schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
161schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do
162 st <- atomically $ newSearch aSearch aTarget []
163 ns <- atomically $ newTVar MM.empty
164 let astate = AnnounceState st ns
165 publishToNodes is
166 | Left _ <- aPublish = return ()
167 | Right publish <- aPublish = do
168 forM_ is $ \(Binding ni mtok _) -> do
169 forM_ mtok $ \tok -> do
170 got <- publish r tok (Just ni)
171 now <- getPOSIXTime
172 forM_ got $ \_ -> do
173 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
174 announce = do -- publish to current search results
175 is <- atomically $ do
176 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
177 return $ MM.toList bs
178 publishToNodes is
179 onResult sr
180 | Right _ <- aPublish = return True
181 | Left sendit <- aPublish = do
182 scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do
183 got <- sendit r sr
184 -- If we had a way to get the source of a search result, we might want to
185 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
186 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
187 -- a message be forgotten.
188 --
189 -- forM_ got $ \_ -> do
190 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
191 return ()
192 return True -- True to keep searching.
193 searchAgain = do
194 -- Canceling a pending search here seems to make announcements more reliable.
195 searchCancel st
196 isfin <- searchIsFinished st -- Always True, since we canceled.
197 return $ when isfin $ void $ fork search
198 search = do -- thread to fork
199 atomically $ reset aBuckets aSearch aTarget st
200 searchLoop aSearch aTarget onResult st
201 fork $ do -- Announce to any nodes we haven't already announced to.
202 is <- atomically $ do
203 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
204 nq <- readTVar ns
205 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
206 $ MM.toList bs
207 publishToNodes is
208 return ()
209 {-
210 atomically $ scheduleImmediately announcer k
211 $ SearchFinished {- st -} search announce aInterval
212 interruptDelay (interrutible announcer)
213 -}
214 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
215 interruptDelay (interrutible announcer)
216
217cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
218cancel announcer k _ _ = do
219 atomically $ scheduleImmediately announcer k $ DeleteAnnouncement 109 atomically $ scheduleImmediately announcer k $ DeleteAnnouncement
220 interruptDelay (interrutible announcer) 110 interruptDelay (interrutible announcer)
221 111
@@ -273,37 +163,4 @@ performScheduledItem announcer now = \case
273 163
274 (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now 164 (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now
275 165
276-- announcement started:
277newAnnouncement :: STM (IO a)
278 -> IO ()
279 -> IO ()
280 -> POSIXTime
281 -> Announcer
282 -> AnnounceKey
283 -> POSIXTime
284 -> STM (IO ())
285newAnnouncement checkFin search announce interval = \announcer k now -> do
286 modifyTVar (scheduled announcer)
287 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
288 return $ void $ fork search
289
290-- time for periodic announce:
291-- (re-)announce to the current known set of storing-nodes.
292-- TODO: If the search is finished, restart the search.
293reAnnounce :: STM (IO a)
294 -> IO ()
295 -> POSIXTime
296 -> Announcer
297 -> AnnounceKey
298 -> POSIXTime
299 -> STM (IO ())
300reAnnounce checkFin announce interval = \announcer k now -> do
301 isfin <- checkFin
302 modifyTVar (scheduled announcer)
303 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
304 return $ do
305 isfin
306 hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now
307 announce
308
309 166