summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-01 16:33:15 -0400
committerjoe <joe@jerkface.net>2017-11-01 16:33:15 -0400
commit32b47d5fcac748b9fa4d93e45ee4fc52d523c601 (patch)
tree1ce9ca9803eee6d4701d59aaca4059303bdb6843
parente80fe4a1b0cae4de60509b560e7845f59bf91b9e (diff)
Announce command now includes a recurring search.
-rw-r--r--Announcer.hs62
1 files changed, 44 insertions, 18 deletions
diff --git a/Announcer.hs b/Announcer.hs
index 2f0eca10..668e00c2 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -29,6 +29,7 @@ import qualified Data.ByteString.Char8 as Char8
29import Data.Function 29import Data.Function
30import Data.Hashable 30import Data.Hashable
31import Data.Maybe 31import Data.Maybe
32import Data.Ord
32import Data.Time.Clock.POSIX 33import Data.Time.Clock.POSIX
33 34
34newtype AnnounceKey = AnnounceKey ByteString 35newtype AnnounceKey = AnnounceKey ByteString
@@ -41,11 +42,10 @@ unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String
41unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs 42unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs
42 43
43data ScheduledItem 44data ScheduledItem
44 = forall r. ScheduledItem (AnnounceMethod r) 45 = StopAnnouncer
45 | StopAnnouncer 46 | NewAnnouncement (STM (IO ())) (IO ()) (IO ()) POSIXTime
46 | NewAnnouncement (IO ()) (IO ()) POSIXTime
47 | SearchFinished (IO ()) (IO ()) POSIXTime 47 | SearchFinished (IO ()) (IO ()) POSIXTime
48 | Announce (IO ()) POSIXTime 48 | Announce (STM (IO ())) (IO ()) POSIXTime
49 | DeleteAnnouncement 49 | DeleteAnnouncement
50 50
51data Announcer = Announcer 51data Announcer = Announcer
@@ -54,6 +54,14 @@ data Announcer = Announcer
54 , interrutible :: InterruptibleDelay 54 , interrutible :: InterruptibleDelay
55 } 55 }
56 56
57announceK :: Int
58announceK = 8
59
60data AnnounceState = forall nid addr tok ni r. AnnounceState
61 { aState :: SearchState nid addr tok ni r
62 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
63 }
64
57scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () 65scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM ()
58scheduleImmediately announcer k item 66scheduleImmediately announcer k item
59 = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) 67 = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0)
@@ -82,22 +90,39 @@ data AnnounceMethod r = forall nid ni sr addr tok a.
82schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 90schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
83schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do 91schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do
84 st <- atomically $ newSearch aSearch aTarget [] 92 st <- atomically $ newSearch aSearch aTarget []
85 let announce = do -- publish to current search results 93 ns <- atomically $ newTVar MM.empty
94 let astate = AnnounceState st ns
95 publishToNodes is = do
96 forM_ is $ \(Binding ni mtok _) -> do
97 forM_ mtok $ \tok -> do
98 got <- aPublish r tok (Just ni)
99 now <- getPOSIXTime
100 forM_ got $ \_ -> do
101 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
102 announce = do -- publish to current search results
86 is <- atomically $ do 103 is <- atomically $ do
87 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) 104 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
88 return $ MM.toList bs 105 return $ MM.toList bs
89 forM_ is $ \(Binding ni mtok _) -> do 106 publishToNodes is
90 forM_ mtok $ \tok -> do
91 aPublish r tok (Just ni)
92 return ()
93 onResult _ = return True -- action for each search-hit (True = keep searching) 107 onResult _ = return True -- action for each search-hit (True = keep searching)
108 searchAgain = searchIsFinished st >>= \isfin -> return $ when isfin $ void $ fork search
94 search = do -- thread to fork 109 search = do -- thread to fork
95 atomically $ reset aBuckets aSearch aTarget st 110 atomically $ reset aBuckets aSearch aTarget st
96 searchLoop aSearch aTarget onResult st 111 searchLoop aSearch aTarget onResult st
112 fork $ do -- Announce to any nodes we haven't already announced to.
113 is <- atomically $ do
114 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
115 nq <- readTVar ns
116 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
117 $ MM.toList bs
118 publishToNodes is
119 return ()
120 {-
97 atomically $ scheduleImmediately announcer k 121 atomically $ scheduleImmediately announcer k
98 $ SearchFinished {- st -} search announce aInterval 122 $ SearchFinished {- st -} search announce aInterval
99 interruptDelay (interrutible announcer) 123 interruptDelay (interrutible announcer)
100 atomically $ scheduleImmediately announcer k $ NewAnnouncement search announce aInterval 124 -}
125 atomically $ scheduleImmediately announcer k $ NewAnnouncement searchAgain search announce aInterval
101 interruptDelay (interrutible announcer) 126 interruptDelay (interrutible announcer)
102 127
103cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 128cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
@@ -145,12 +170,10 @@ performScheduledItem announcer now = \case
145 (Binding _ StopAnnouncer _) -> return Nothing 170 (Binding _ StopAnnouncer _) -> return Nothing
146 171
147 -- announcement started: 172 -- announcement started:
148 (Binding k (NewAnnouncement search announce interval) _) -> do 173 (Binding k (NewAnnouncement checkFin search announce interval) _) -> do
149 modifyTVar (scheduled announcer) 174 modifyTVar (scheduled announcer)
150 (PSQ.insert' k (Announce announce interval) (now + interval)) 175 (PSQ.insert' k (Announce checkFin announce interval) (now + interval))
151 return $ Just $ do 176 return $ Just $ void $ fork search
152 fork search
153 return ()
154 177
155 -- announcement removed: 178 -- announcement removed:
156 (Binding k DeleteAnnouncement _) -> return $ Just $ return () 179 (Binding k DeleteAnnouncement _) -> return $ Just $ return ()
@@ -158,10 +181,13 @@ performScheduledItem announcer now = \case
158 -- time for periodic announce: 181 -- time for periodic announce:
159 -- (re-)announce to the current known set of storing-nodes. 182 -- (re-)announce to the current known set of storing-nodes.
160 -- TODO: If the search is finished, restart the search. 183 -- TODO: If the search is finished, restart the search.
161 (Binding k (Announce announce interval) _) -> do 184 (Binding k (Announce checkFin announce interval) _) -> do
185 isfin <- checkFin
162 modifyTVar (scheduled announcer) 186 modifyTVar (scheduled announcer)
163 (PSQ.insert' k (Announce announce interval) (now + interval)) 187 (PSQ.insert' k (Announce checkFin announce interval) (now + interval))
164 return $ Just announce 188 return $ Just $ do
189 isfin
190 announce
165 191
166 -- search finished: 192 -- search finished:
167 -- if any of the current storing-nodes set have not been 193 -- if any of the current storing-nodes set have not been