diff options
-rw-r--r-- | Announcer.hs | 51 | ||||
-rw-r--r-- | InterruptibleDelay.hs | 41 | ||||
-rw-r--r-- | Ticker.hs | 36 | ||||
-rw-r--r-- | dht-client.cabal | 2 |
4 files changed, 73 insertions, 57 deletions
diff --git a/Announcer.hs b/Announcer.hs index a9b30940..4f4eba8f 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -16,7 +16,7 @@ module Announcer | |||
16 | 16 | ||
17 | import Data.Wrapper.PSQ as PSQ | 17 | import Data.Wrapper.PSQ as PSQ |
18 | import Network.Kademlia.Search | 18 | import Network.Kademlia.Search |
19 | import Ticker | 19 | import InterruptibleDelay |
20 | 20 | ||
21 | import Control.Concurrent.Lifted.Instrument | 21 | import Control.Concurrent.Lifted.Instrument |
22 | import Control.Concurrent.STM | 22 | import Control.Concurrent.STM |
@@ -42,10 +42,9 @@ data ScheduledItem | |||
42 | | StopAnnouncer | 42 | | StopAnnouncer |
43 | 43 | ||
44 | data Announcer = Announcer | 44 | data Announcer = Announcer |
45 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) | 45 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) |
46 | , announcerActive :: TVar Bool | 46 | , announcerActive :: TVar Bool |
47 | , lastTick :: TVar POSIXTime | 47 | , interrutible :: InterruptibleDelay |
48 | , announceTicker :: Ticker | ||
49 | } | 48 | } |
50 | 49 | ||
51 | scheduleImmediately :: Announcer -> ScheduledItem -> STM () | 50 | scheduleImmediately :: Announcer -> ScheduledItem -> STM () |
@@ -62,25 +61,27 @@ data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod | |||
62 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) | 61 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) |
63 | } | 62 | } |
64 | 63 | ||
64 | -- startDelay :: InterruptibleDelay -> Microseconds -> IO Bool | ||
65 | -- interruptDelay :: InterruptibleDelay -> IO () | ||
66 | |||
65 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 67 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
66 | schedule _ _ _ _ = do | 68 | schedule announcer _ _ _ = do |
67 | -- fork the search | 69 | -- fork the search |
68 | -- add it to the priority queue of announce methods. | 70 | -- add it to the priority queue of announce methods. |
69 | -- update ticker | 71 | interruptDelay (interrutible announcer) |
70 | return () | ||
71 | 72 | ||
72 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 73 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
73 | cancel _ _ _ _ = return () | 74 | cancel announcer _ _ _ = do |
75 | -- cancel search/announce | ||
76 | interruptDelay (interrutible announcer) | ||
74 | 77 | ||
75 | 78 | ||
76 | forkAnnouncer :: IO Announcer | 79 | forkAnnouncer :: IO Announcer |
77 | forkAnnouncer = do | 80 | forkAnnouncer = do |
78 | tickvar <- atomically $ newTVar 0 | 81 | delay <- interruptibleDelay |
79 | ticker <- forkTicker $ writeTVar tickvar | ||
80 | announcer <- atomically $ Announcer <$> newTVar PSQ.empty | 82 | announcer <- atomically $ Announcer <$> newTVar PSQ.empty |
81 | <*> newTVar True | 83 | <*> newTVar True |
82 | <*> pure tickvar | 84 | <*> pure delay |
83 | <*> pure ticker | ||
84 | fork $ announceThread announcer | 85 | fork $ announceThread announcer |
85 | return announcer | 86 | return announcer |
86 | 87 | ||
@@ -89,14 +90,24 @@ announceThread :: Announcer -> IO () | |||
89 | announceThread announcer = do | 90 | announceThread announcer = do |
90 | myThreadId >>= flip labelThread "announcer" | 91 | myThreadId >>= flip labelThread "announcer" |
91 | fix $ \loop -> do | 92 | fix $ \loop -> do |
92 | action <- atomically $ do | 93 | join $ atomically $ do |
93 | now <- readTVar $ lastTick announcer | 94 | item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) |
94 | (item,q) <- readTVar (scheduled announcer) | 95 | return $ do |
95 | >>= maybe retry return . PSQ.minView | 96 | now <- getPOSIXTime |
96 | when (prio item > now) retry -- Is it time to do something? | 97 | -- Is it time to do something? |
97 | writeTVar (scheduled announcer) q -- Remove the event from the queue. | 98 | if (prio item > now) |
98 | performScheduledItem announcer item -- Go for it! | 99 | then do -- Yes. Dequeue and handle this event. |
99 | mapM_ (>> loop) action | 100 | action <- atomically $ do |
101 | modifyTVar' (scheduled announcer) | ||
102 | (PSQ.delete (key item)) | ||
103 | performScheduledItem announcer item | ||
104 | -- Are we finished? | ||
105 | mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. | ||
106 | action | ||
107 | else do -- No. Wait a bit. | ||
108 | startDelay (interrutible announcer) (microseconds $ prio item - now) | ||
109 | loop | ||
110 | -- We're done. Let 'stopAnnouncer' know that it can stop blocking. | ||
100 | atomically $ writeTVar (announcerActive announcer) False | 111 | atomically $ writeTVar (announcerActive announcer) False |
101 | 112 | ||
102 | performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) | 113 | performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) |
diff --git a/InterruptibleDelay.hs b/InterruptibleDelay.hs new file mode 100644 index 00000000..d59ec8ef --- /dev/null +++ b/InterruptibleDelay.hs | |||
@@ -0,0 +1,41 @@ | |||
1 | module InterruptibleDelay where | ||
2 | |||
3 | import Control.Concurrent | ||
4 | import Control.Monad | ||
5 | import Control.Exception ({-evaluate,-}handle,ErrorCall(..)) | ||
6 | import Data.Time.Clock (NominalDiffTime) | ||
7 | |||
8 | type Microseconds = Int | ||
9 | |||
10 | microseconds :: NominalDiffTime -> Microseconds | ||
11 | microseconds d = round $ 1000000 * d | ||
12 | |||
13 | data InterruptibleDelay = InterruptibleDelay | ||
14 | { delayThread :: MVar ThreadId | ||
15 | } | ||
16 | |||
17 | interruptibleDelay :: IO InterruptibleDelay | ||
18 | interruptibleDelay = do | ||
19 | fmap InterruptibleDelay newEmptyMVar | ||
20 | |||
21 | startDelay :: InterruptibleDelay -> Microseconds -> IO Bool | ||
22 | startDelay d interval = do | ||
23 | thread <- myThreadId | ||
24 | handle (\(ErrorCall _)-> do | ||
25 | debugNoise $ "delay interrupted" | ||
26 | return False) $ do | ||
27 | putMVar (delayThread d) thread | ||
28 | threadDelay interval | ||
29 | void $ takeMVar (delayThread d) | ||
30 | return True | ||
31 | |||
32 | where debugNoise str = return () | ||
33 | |||
34 | |||
35 | interruptDelay :: InterruptibleDelay -> IO () | ||
36 | interruptDelay d = do | ||
37 | mthread <- do | ||
38 | tryTakeMVar (delayThread d) | ||
39 | flip (maybe $ return ()) mthread $ \thread -> do | ||
40 | throwTo thread (ErrorCall "Interrupted delay") | ||
41 | |||
diff --git a/Ticker.hs b/Ticker.hs deleted file mode 100644 index ba423def..00000000 --- a/Ticker.hs +++ /dev/null | |||
@@ -1,36 +0,0 @@ | |||
1 | module Ticker where | ||
2 | |||
3 | import Control.Concurrent | ||
4 | import Control.Concurrent.STM | ||
5 | import Control.Exception | ||
6 | import Control.Monad | ||
7 | import Data.Function | ||
8 | import Data.Time.Clock.POSIX | ||
9 | import Data.Typeable | ||
10 | |||
11 | data ScheduleTick = ScheduleTick Int deriving (Show, Typeable) | ||
12 | instance Exception ScheduleTick | ||
13 | |||
14 | newtype Ticker = Ticker ThreadId | ||
15 | |||
16 | -- | Fork a thread that will invoke an STM transaction whenever a tick event | ||
17 | -- occurs. The first tick happens immediately, later ticks must be scheduled | ||
18 | -- with 'scheduleTick'. | ||
19 | forkTicker :: (POSIXTime -> STM ()) -> IO Ticker | ||
20 | forkTicker tick = do | ||
21 | getPOSIXTime >>= atomically . tick | ||
22 | tid <- forkIO $ fix $ \loop -> do | ||
23 | getPOSIXTime >>= atomically . tick | ||
24 | catch loop $ \(ScheduleTick interval) -> do | ||
25 | threadDelay interval | ||
26 | when (interval >= 0) loop | ||
27 | return $ Ticker tid | ||
28 | |||
29 | -- | Schedule the next tick. If you supply a negative number, the ticker | ||
30 | -- thread will terminate. Otherwise, the next tick will be scheduled for the | ||
31 | -- given number of microseconds from now. | ||
32 | -- | ||
33 | -- Note: If a tick was scheduled to happen sooner, that tick will be canceled | ||
34 | -- in favor of this one. Only one tick is scheduled at a time. | ||
35 | scheduleTick :: Ticker -> Int -> IO () | ||
36 | scheduleTick (Ticker tid) usecs = throwTo tid (ScheduleTick usecs) | ||
diff --git a/dht-client.cabal b/dht-client.cabal index 243f8a75..81d96427 100644 --- a/dht-client.cabal +++ b/dht-client.cabal | |||
@@ -95,7 +95,7 @@ library | |||
95 | Text.XXD | 95 | Text.XXD |
96 | Roster | 96 | Roster |
97 | Announcer | 97 | Announcer |
98 | Ticker | 98 | InterruptibleDelay |
99 | 99 | ||
100 | build-depends: base | 100 | build-depends: base |
101 | , containers | 101 | , containers |