summaryrefslogtreecommitdiff
path: root/Announcer.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-10-31 22:09:49 -0400
committerjoe <joe@jerkface.net>2017-10-31 22:10:11 -0400
commitcb853466ae8a5cffccb0f74da7aa7d2d85f83959 (patch)
tree7abf39b62ec6289d3a04f0f636c0911cc40918eb /Announcer.hs
parent8db2baf3b4fc6b495e3988557a4db39ea9a531f8 (diff)
Bug fix to Announcer.
Diffstat (limited to 'Announcer.hs')
-rw-r--r--Announcer.hs51
1 files changed, 31 insertions, 20 deletions
diff --git a/Announcer.hs b/Announcer.hs
index a9b30940..4f4eba8f 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -16,7 +16,7 @@ module Announcer
16 16
17import Data.Wrapper.PSQ as PSQ 17import Data.Wrapper.PSQ as PSQ
18import Network.Kademlia.Search 18import Network.Kademlia.Search
19import Ticker 19import InterruptibleDelay
20 20
21import Control.Concurrent.Lifted.Instrument 21import Control.Concurrent.Lifted.Instrument
22import Control.Concurrent.STM 22import Control.Concurrent.STM
@@ -42,10 +42,9 @@ data ScheduledItem
42 | StopAnnouncer 42 | StopAnnouncer
43 43
44data Announcer = Announcer 44data Announcer = Announcer
45 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) 45 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem)
46 , announcerActive :: TVar Bool 46 , announcerActive :: TVar Bool
47 , lastTick :: TVar POSIXTime 47 , interrutible :: InterruptibleDelay
48 , announceTicker :: Ticker
49 } 48 }
50 49
51scheduleImmediately :: Announcer -> ScheduledItem -> STM () 50scheduleImmediately :: Announcer -> ScheduledItem -> STM ()
@@ -62,25 +61,27 @@ data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod
62 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) 61 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
63 } 62 }
64 63
64-- startDelay :: InterruptibleDelay -> Microseconds -> IO Bool
65-- interruptDelay :: InterruptibleDelay -> IO ()
66
65schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 67schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
66schedule _ _ _ _ = do 68schedule announcer _ _ _ = do
67 -- fork the search 69 -- fork the search
68 -- add it to the priority queue of announce methods. 70 -- add it to the priority queue of announce methods.
69 -- update ticker 71 interruptDelay (interrutible announcer)
70 return ()
71 72
72cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 73cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
73cancel _ _ _ _ = return () 74cancel announcer _ _ _ = do
75 -- cancel search/announce
76 interruptDelay (interrutible announcer)
74 77
75 78
76forkAnnouncer :: IO Announcer 79forkAnnouncer :: IO Announcer
77forkAnnouncer = do 80forkAnnouncer = do
78 tickvar <- atomically $ newTVar 0 81 delay <- interruptibleDelay
79 ticker <- forkTicker $ writeTVar tickvar
80 announcer <- atomically $ Announcer <$> newTVar PSQ.empty 82 announcer <- atomically $ Announcer <$> newTVar PSQ.empty
81 <*> newTVar True 83 <*> newTVar True
82 <*> pure tickvar 84 <*> pure delay
83 <*> pure ticker
84 fork $ announceThread announcer 85 fork $ announceThread announcer
85 return announcer 86 return announcer
86 87
@@ -89,14 +90,24 @@ announceThread :: Announcer -> IO ()
89announceThread announcer = do 90announceThread announcer = do
90 myThreadId >>= flip labelThread "announcer" 91 myThreadId >>= flip labelThread "announcer"
91 fix $ \loop -> do 92 fix $ \loop -> do
92 action <- atomically $ do 93 join $ atomically $ do
93 now <- readTVar $ lastTick announcer 94 item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer)
94 (item,q) <- readTVar (scheduled announcer) 95 return $ do
95 >>= maybe retry return . PSQ.minView 96 now <- getPOSIXTime
96 when (prio item > now) retry -- Is it time to do something? 97 -- Is it time to do something?
97 writeTVar (scheduled announcer) q -- Remove the event from the queue. 98 if (prio item > now)
98 performScheduledItem announcer item -- Go for it! 99 then do -- Yes. Dequeue and handle this event.
99 mapM_ (>> loop) action 100 action <- atomically $ do
101 modifyTVar' (scheduled announcer)
102 (PSQ.delete (key item))
103 performScheduledItem announcer item
104 -- Are we finished?
105 mapM_ (>> loop) -- No? Okay, perform scheduled op and loop.
106 action
107 else do -- No. Wait a bit.
108 startDelay (interrutible announcer) (microseconds $ prio item - now)
109 loop
110 -- We're done. Let 'stopAnnouncer' know that it can stop blocking.
100 atomically $ writeTVar (announcerActive announcer) False 111 atomically $ writeTVar (announcerActive announcer) False
101 112
102performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) 113performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ()))