diff options
Diffstat (limited to 'Announcer.hs')
-rw-r--r-- | Announcer.hs | 157 |
1 files changed, 7 insertions, 150 deletions
diff --git a/Announcer.hs b/Announcer.hs index 7fd72e2d..f0d65656 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -11,12 +11,15 @@ module Announcer | |||
11 | , AnnounceKey | 11 | , AnnounceKey |
12 | , packAnnounceKey | 12 | , packAnnounceKey |
13 | , unpackAnnounceKey | 13 | , unpackAnnounceKey |
14 | , AnnounceMethod(..) | ||
15 | , forkAnnouncer | 14 | , forkAnnouncer |
16 | , stopAnnouncer | 15 | , stopAnnouncer |
17 | , schedule | ||
18 | , cancel | 16 | , cancel |
19 | , itemStatusNum | 17 | , itemStatusNum |
18 | |||
19 | -- lower level, Announcer.Tox needs these. | ||
20 | , scheduleImmediately | ||
21 | , ScheduledItem(..) | ||
22 | , interrutible | ||
20 | ) where | 23 | ) where |
21 | 24 | ||
22 | import qualified Data.MinMaxPSQ as MM | 25 | import qualified Data.MinMaxPSQ as MM |
@@ -86,14 +89,6 @@ data Announcer = Announcer | |||
86 | , interrutible :: InterruptibleDelay | 89 | , interrutible :: InterruptibleDelay |
87 | } | 90 | } |
88 | 91 | ||
89 | announceK :: Int | ||
90 | announceK = 8 | ||
91 | |||
92 | data AnnounceState = forall nid addr tok ni r. AnnounceState | ||
93 | { aState :: SearchState nid addr tok ni r | ||
94 | , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) | ||
95 | } | ||
96 | |||
97 | -- | Schedules an event to occur long ago at the epoch (which effectively makes | 92 | -- | Schedules an event to occur long ago at the epoch (which effectively makes |
98 | -- the event happen as soon as possible). Note that the caller will usually | 93 | -- the event happen as soon as possible). Note that the caller will usually |
99 | -- also want to interrupt the 'interrutible' delay so that it finds this item | 94 | -- also want to interrupt the 'interrutible' delay so that it finds this item |
@@ -109,113 +104,8 @@ stopAnnouncer announcer = do | |||
109 | interruptDelay (interrutible announcer) | 104 | interruptDelay (interrutible announcer) |
110 | atomically $ readTVar (announcerActive announcer) >>= check . not | 105 | atomically $ readTVar (announcerActive announcer) >>= check . not |
111 | 106 | ||
112 | -- | This type specifies an item that can be announced on appropriate nodes in | 107 | cancel :: Announcer -> AnnounceKey -> IO () |
113 | -- a Kademlia network. | 108 | cancel announcer k = do |
114 | data AnnounceMethod r = forall nid ni sr addr tok a. | ||
115 | ( Show nid | ||
116 | , Hashable nid | ||
117 | , Hashable ni | ||
118 | , Ord addr | ||
119 | , Ord nid | ||
120 | , Ord ni | ||
121 | ) => AnnounceMethod | ||
122 | { aSearch :: Search nid addr tok ni sr | ||
123 | -- ^ This is the Kademlia search to run repeatedly to find the | ||
124 | -- nearby nodes. A new search is started whenever one is not | ||
125 | -- already in progress at announce time. Repeated searches are | ||
126 | -- likely to finish faster than the first since nearby nodes | ||
127 | -- are not discarded. | ||
128 | , aPublish :: Either (r -> sr -> IO ()) | ||
129 | (r -> tok -> Maybe ni -> IO (Maybe a)) | ||
130 | -- ^ The action to perform when we find nearby nodes. The | ||
131 | -- destination node is given as a Maybe so that methods that | ||
132 | -- treat 'Nothing' as loop-back address can be passed here, | ||
133 | -- however 'Nothing' will not be passed by the announcer | ||
134 | -- thread. | ||
135 | -- | ||
136 | -- There are two cases: | ||
137 | -- | ||
138 | -- [Left] The action to perform requires a search result. | ||
139 | -- This was implemented to support Tox's DHTKey and | ||
140 | -- Friend-Request messages. | ||
141 | -- | ||
142 | -- [Right] The action requires a "token" from the destination | ||
143 | -- node. This is the more typical "announce" semantics for | ||
144 | -- Kademlia. | ||
145 | , aBuckets :: TVar (R.BucketList ni) | ||
146 | -- ^ Set this to the current Kademlia routing table buckets. | ||
147 | -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4? | ||
148 | , aTarget :: nid | ||
149 | -- ^ This is the Kademlia node-id of the item being announced. | ||
150 | , aInterval :: POSIXTime | ||
151 | -- ^ Assuming we have nearby nodes from the search, the item | ||
152 | -- will be announced at this interval. | ||
153 | -- | ||
154 | -- Current implementation is to make the scheduled | ||
155 | -- announcements even if the search hasn't finished. It will | ||
156 | -- use the closest nodes found so far. | ||
157 | } | ||
158 | |||
159 | -- | Schedule a recurring Search + Announce sequence. | ||
160 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
161 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do | ||
162 | st <- atomically $ newSearch aSearch aTarget [] | ||
163 | ns <- atomically $ newTVar MM.empty | ||
164 | let astate = AnnounceState st ns | ||
165 | publishToNodes is | ||
166 | | Left _ <- aPublish = return () | ||
167 | | Right publish <- aPublish = do | ||
168 | forM_ is $ \(Binding ni mtok _) -> do | ||
169 | forM_ mtok $ \tok -> do | ||
170 | got <- publish r tok (Just ni) | ||
171 | now <- getPOSIXTime | ||
172 | forM_ got $ \_ -> do | ||
173 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
174 | announce = do -- publish to current search results | ||
175 | is <- atomically $ do | ||
176 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
177 | return $ MM.toList bs | ||
178 | publishToNodes is | ||
179 | onResult sr | ||
180 | | Right _ <- aPublish = return True | ||
181 | | Left sendit <- aPublish = do | ||
182 | scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do | ||
183 | got <- sendit r sr | ||
184 | -- If we had a way to get the source of a search result, we might want to | ||
185 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | ||
186 | -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent | ||
187 | -- a message be forgotten. | ||
188 | -- | ||
189 | -- forM_ got $ \_ -> do | ||
190 | -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
191 | return () | ||
192 | return True -- True to keep searching. | ||
193 | searchAgain = do | ||
194 | -- Canceling a pending search here seems to make announcements more reliable. | ||
195 | searchCancel st | ||
196 | isfin <- searchIsFinished st -- Always True, since we canceled. | ||
197 | return $ when isfin $ void $ fork search | ||
198 | search = do -- thread to fork | ||
199 | atomically $ reset aBuckets aSearch aTarget st | ||
200 | searchLoop aSearch aTarget onResult st | ||
201 | fork $ do -- Announce to any nodes we haven't already announced to. | ||
202 | is <- atomically $ do | ||
203 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
204 | nq <- readTVar ns | ||
205 | return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) | ||
206 | $ MM.toList bs | ||
207 | publishToNodes is | ||
208 | return () | ||
209 | {- | ||
210 | atomically $ scheduleImmediately announcer k | ||
211 | $ SearchFinished {- st -} search announce aInterval | ||
212 | interruptDelay (interrutible announcer) | ||
213 | -} | ||
214 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) | ||
215 | interruptDelay (interrutible announcer) | ||
216 | |||
217 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
218 | cancel announcer k _ _ = do | ||
219 | atomically $ scheduleImmediately announcer k $ DeleteAnnouncement | 109 | atomically $ scheduleImmediately announcer k $ DeleteAnnouncement |
220 | interruptDelay (interrutible announcer) | 110 | interruptDelay (interrutible announcer) |
221 | 111 | ||
@@ -273,37 +163,4 @@ performScheduledItem announcer now = \case | |||
273 | 163 | ||
274 | (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now | 164 | (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now |
275 | 165 | ||
276 | -- announcement started: | ||
277 | newAnnouncement :: STM (IO a) | ||
278 | -> IO () | ||
279 | -> IO () | ||
280 | -> POSIXTime | ||
281 | -> Announcer | ||
282 | -> AnnounceKey | ||
283 | -> POSIXTime | ||
284 | -> STM (IO ()) | ||
285 | newAnnouncement checkFin search announce interval = \announcer k now -> do | ||
286 | modifyTVar (scheduled announcer) | ||
287 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
288 | return $ void $ fork search | ||
289 | |||
290 | -- time for periodic announce: | ||
291 | -- (re-)announce to the current known set of storing-nodes. | ||
292 | -- TODO: If the search is finished, restart the search. | ||
293 | reAnnounce :: STM (IO a) | ||
294 | -> IO () | ||
295 | -> POSIXTime | ||
296 | -> Announcer | ||
297 | -> AnnounceKey | ||
298 | -> POSIXTime | ||
299 | -> STM (IO ()) | ||
300 | reAnnounce checkFin announce interval = \announcer k now -> do | ||
301 | isfin <- checkFin | ||
302 | modifyTVar (scheduled announcer) | ||
303 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
304 | return $ do | ||
305 | isfin | ||
306 | hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now | ||
307 | announce | ||
308 | |||
309 | 166 | ||