diff options
author | joe <joe@jerkface.net> | 2017-11-01 16:33:15 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-01 16:33:15 -0400 |
commit | 32b47d5fcac748b9fa4d93e45ee4fc52d523c601 (patch) | |
tree | 1ce9ca9803eee6d4701d59aaca4059303bdb6843 | |
parent | e80fe4a1b0cae4de60509b560e7845f59bf91b9e (diff) |
Announce command now includes a recurring search.
-rw-r--r-- | Announcer.hs | 62 |
1 files changed, 44 insertions, 18 deletions
diff --git a/Announcer.hs b/Announcer.hs index 2f0eca10..668e00c2 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -29,6 +29,7 @@ import qualified Data.ByteString.Char8 as Char8 | |||
29 | import Data.Function | 29 | import Data.Function |
30 | import Data.Hashable | 30 | import Data.Hashable |
31 | import Data.Maybe | 31 | import Data.Maybe |
32 | import Data.Ord | ||
32 | import Data.Time.Clock.POSIX | 33 | import Data.Time.Clock.POSIX |
33 | 34 | ||
34 | newtype AnnounceKey = AnnounceKey ByteString | 35 | newtype AnnounceKey = AnnounceKey ByteString |
@@ -41,11 +42,10 @@ unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String | |||
41 | unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs | 42 | unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs |
42 | 43 | ||
43 | data ScheduledItem | 44 | data ScheduledItem |
44 | = forall r. ScheduledItem (AnnounceMethod r) | 45 | = StopAnnouncer |
45 | | StopAnnouncer | 46 | | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime |
46 | | NewAnnouncement (IO ()) (IO ()) POSIXTime | ||
47 | | SearchFinished (IO ()) (IO ()) POSIXTime | 47 | | SearchFinished (IO ()) (IO ()) POSIXTime |
48 | | Announce (IO ()) POSIXTime | 48 | | Announce (STM (IO ())) (IO ()) POSIXTime |
49 | | DeleteAnnouncement | 49 | | DeleteAnnouncement |
50 | 50 | ||
51 | data Announcer = Announcer | 51 | data Announcer = Announcer |
@@ -54,6 +54,14 @@ data Announcer = Announcer | |||
54 | , interrutible :: InterruptibleDelay | 54 | , interrutible :: InterruptibleDelay |
55 | } | 55 | } |
56 | 56 | ||
57 | announceK :: Int | ||
58 | announceK = 8 | ||
59 | |||
60 | data AnnounceState = forall nid addr tok ni r. AnnounceState | ||
61 | { aState :: SearchState nid addr tok ni r | ||
62 | , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) | ||
63 | } | ||
64 | |||
57 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () | 65 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () |
58 | scheduleImmediately announcer k item | 66 | scheduleImmediately announcer k item |
59 | = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) | 67 | = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) |
@@ -82,22 +90,39 @@ data AnnounceMethod r = forall nid ni sr addr tok a. | |||
82 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 90 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
83 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do | 91 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do |
84 | st <- atomically $ newSearch aSearch aTarget [] | 92 | st <- atomically $ newSearch aSearch aTarget [] |
85 | let announce = do -- publish to current search results | 93 | ns <- atomically $ newTVar MM.empty |
94 | let astate = AnnounceState st ns | ||
95 | publishToNodes is = do | ||
96 | forM_ is $ \(Binding ni mtok _) -> do | ||
97 | forM_ mtok $ \tok -> do | ||
98 | got <- aPublish r tok (Just ni) | ||
99 | now <- getPOSIXTime | ||
100 | forM_ got $ \_ -> do | ||
101 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
102 | announce = do -- publish to current search results | ||
86 | is <- atomically $ do | 103 | is <- atomically $ do |
87 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | 104 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) |
88 | return $ MM.toList bs | 105 | return $ MM.toList bs |
89 | forM_ is $ \(Binding ni mtok _) -> do | 106 | publishToNodes is |
90 | forM_ mtok $ \tok -> do | ||
91 | aPublish r tok (Just ni) | ||
92 | return () | ||
93 | onResult _ = return True -- action for each search-hit (True = keep searching) | 107 | onResult _ = return True -- action for each search-hit (True = keep searching) |
108 | searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search | ||
94 | search = do -- thread to fork | 109 | search = do -- thread to fork |
95 | atomically $ reset aBuckets aSearch aTarget st | 110 | atomically $ reset aBuckets aSearch aTarget st |
96 | searchLoop aSearch aTarget onResult st | 111 | searchLoop aSearch aTarget onResult st |
112 | fork $ do -- Announce to any nodes we haven't already announced to. | ||
113 | is <- atomically $ do | ||
114 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
115 | nq <- readTVar ns | ||
116 | return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) | ||
117 | $ MM.toList bs | ||
118 | publishToNodes is | ||
119 | return () | ||
120 | {- | ||
97 | atomically $ scheduleImmediately announcer k | 121 | atomically $ scheduleImmediately announcer k |
98 | $ SearchFinished {- st -} search announce aInterval | 122 | $ SearchFinished {- st -} search announce aInterval |
99 | interruptDelay (interrutible announcer) | 123 | interruptDelay (interrutible announcer) |
100 | atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce aInterval | 124 | -} |
125 | atomically $ scheduleImmediately announcer k $ NewAnnouncement searchAgain search announce aInterval | ||
101 | interruptDelay (interrutible announcer) | 126 | interruptDelay (interrutible announcer) |
102 | 127 | ||
103 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 128 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
@@ -145,12 +170,10 @@ performScheduledItem announcer now = \case | |||
145 | (Binding _ StopAnnouncer _) -> return Nothing | 170 | (Binding _ StopAnnouncer _) -> return Nothing |
146 | 171 | ||
147 | -- announcement started: | 172 | -- announcement started: |
148 | (Binding k (NewAnnouncement search announce interval) _) -> do | 173 | (Binding k (NewAnnouncement checkFin search announce interval) _) -> do |
149 | modifyTVar (scheduled announcer) | 174 | modifyTVar (scheduled announcer) |
150 | (PSQ.insert' k (Announce announce interval) (now + interval)) | 175 | (PSQ.insert' k (Announce checkFin announce interval) (now + interval)) |
151 | return $ Just $ do | 176 | return $ Just $ void $ fork search |
152 | fork search | ||
153 | return () | ||
154 | 177 | ||
155 | -- announcement removed: | 178 | -- announcement removed: |
156 | (Binding k DeleteAnnouncement _) -> return $ Just $ return () | 179 | (Binding k DeleteAnnouncement _) -> return $ Just $ return () |
@@ -158,10 +181,13 @@ performScheduledItem announcer now = \case | |||
158 | -- time for periodic announce: | 181 | -- time for periodic announce: |
159 | -- (re-)announce to the current known set of storing-nodes. | 182 | -- (re-)announce to the current known set of storing-nodes. |
160 | -- TODO: If the search is finished, restart the search. | 183 | -- TODO: If the search is finished, restart the search. |
161 | (Binding k (Announce announce interval) _) -> do | 184 | (Binding k (Announce checkFin announce interval) _) -> do |
185 | isfin <- checkFin | ||
162 | modifyTVar (scheduled announcer) | 186 | modifyTVar (scheduled announcer) |
163 | (PSQ.insert' k (Announce announce interval) (now + interval)) | 187 | (PSQ.insert' k (Announce checkFin announce interval) (now + interval)) |
164 | return $ Just announce | 188 | return $ Just $ do |
189 | isfin | ||
190 | announce | ||
165 | 191 | ||
166 | -- search finished: | 192 | -- search finished: |
167 | -- if any of the current storing-nodes set have not been | 193 | -- if any of the current storing-nodes set have not been |