diff options
author | joe <joe@jerkface.net> | 2017-10-31 22:09:49 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-10-31 22:10:11 -0400 |
commit | cb853466ae8a5cffccb0f74da7aa7d2d85f83959 (patch) | |
tree | 7abf39b62ec6289d3a04f0f636c0911cc40918eb /Announcer.hs | |
parent | 8db2baf3b4fc6b495e3988557a4db39ea9a531f8 (diff) |
Bug fix to Announcer.
Diffstat (limited to 'Announcer.hs')
-rw-r--r-- | Announcer.hs | 51 |
1 files changed, 31 insertions, 20 deletions
diff --git a/Announcer.hs b/Announcer.hs index a9b30940..4f4eba8f 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -16,7 +16,7 @@ module Announcer | |||
16 | 16 | ||
17 | import Data.Wrapper.PSQ as PSQ | 17 | import Data.Wrapper.PSQ as PSQ |
18 | import Network.Kademlia.Search | 18 | import Network.Kademlia.Search |
19 | import Ticker | 19 | import InterruptibleDelay |
20 | 20 | ||
21 | import Control.Concurrent.Lifted.Instrument | 21 | import Control.Concurrent.Lifted.Instrument |
22 | import Control.Concurrent.STM | 22 | import Control.Concurrent.STM |
@@ -42,10 +42,9 @@ data ScheduledItem | |||
42 | | StopAnnouncer | 42 | | StopAnnouncer |
43 | 43 | ||
44 | data Announcer = Announcer | 44 | data Announcer = Announcer |
45 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) | 45 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) |
46 | , announcerActive :: TVar Bool | 46 | , announcerActive :: TVar Bool |
47 | , lastTick :: TVar POSIXTime | 47 | , interrutible :: InterruptibleDelay |
48 | , announceTicker :: Ticker | ||
49 | } | 48 | } |
50 | 49 | ||
51 | scheduleImmediately :: Announcer -> ScheduledItem -> STM () | 50 | scheduleImmediately :: Announcer -> ScheduledItem -> STM () |
@@ -62,25 +61,27 @@ data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod | |||
62 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) | 61 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) |
63 | } | 62 | } |
64 | 63 | ||
64 | -- startDelay :: InterruptibleDelay -> Microseconds -> IO Bool | ||
65 | -- interruptDelay :: InterruptibleDelay -> IO () | ||
66 | |||
65 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 67 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
66 | schedule _ _ _ _ = do | 68 | schedule announcer _ _ _ = do |
67 | -- fork the search | 69 | -- fork the search |
68 | -- add it to the priority queue of announce methods. | 70 | -- add it to the priority queue of announce methods. |
69 | -- update ticker | 71 | interruptDelay (interrutible announcer) |
70 | return () | ||
71 | 72 | ||
72 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 73 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
73 | cancel _ _ _ _ = return () | 74 | cancel announcer _ _ _ = do |
75 | -- cancel search/announce | ||
76 | interruptDelay (interrutible announcer) | ||
74 | 77 | ||
75 | 78 | ||
76 | forkAnnouncer :: IO Announcer | 79 | forkAnnouncer :: IO Announcer |
77 | forkAnnouncer = do | 80 | forkAnnouncer = do |
78 | tickvar <- atomically $ newTVar 0 | 81 | delay <- interruptibleDelay |
79 | ticker <- forkTicker $ writeTVar tickvar | ||
80 | announcer <- atomically $ Announcer <$> newTVar PSQ.empty | 82 | announcer <- atomically $ Announcer <$> newTVar PSQ.empty |
81 | <*> newTVar True | 83 | <*> newTVar True |
82 | <*> pure tickvar | 84 | <*> pure delay |
83 | <*> pure ticker | ||
84 | fork $ announceThread announcer | 85 | fork $ announceThread announcer |
85 | return announcer | 86 | return announcer |
86 | 87 | ||
@@ -89,14 +90,24 @@ announceThread :: Announcer -> IO () | |||
89 | announceThread announcer = do | 90 | announceThread announcer = do |
90 | myThreadId >>= flip labelThread "announcer" | 91 | myThreadId >>= flip labelThread "announcer" |
91 | fix $ \loop -> do | 92 | fix $ \loop -> do |
92 | action <- atomically $ do | 93 | join $ atomically $ do |
93 | now <- readTVar $ lastTick announcer | 94 | item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) |
94 | (item,q) <- readTVar (scheduled announcer) | 95 | return $ do |
95 | >>= maybe retry return . PSQ.minView | 96 | now <- getPOSIXTime |
96 | when (prio item > now) retry -- Is it time to do something? | 97 | -- Is it time to do something? |
97 | writeTVar (scheduled announcer) q -- Remove the event from the queue. | 98 | if (prio item > now) |
98 | performScheduledItem announcer item -- Go for it! | 99 | then do -- Yes. Dequeue and handle this event. |
99 | mapM_ (>> loop) action | 100 | action <- atomically $ do |
101 | modifyTVar' (scheduled announcer) | ||
102 | (PSQ.delete (key item)) | ||
103 | performScheduledItem announcer item | ||
104 | -- Are we finished? | ||
105 | mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. | ||
106 | action | ||
107 | else do -- No. Wait a bit. | ||
108 | startDelay (interrutible announcer) (microseconds $ prio item - now) | ||
109 | loop | ||
110 | -- We're done. Let 'stopAnnouncer' know that it can stop blocking. | ||
100 | atomically $ writeTVar (announcerActive announcer) False | 111 | atomically $ writeTVar (announcerActive announcer) False |
101 | 112 | ||
102 | performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) | 113 | performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) |