summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Announcer.hs51
-rw-r--r--InterruptibleDelay.hs41
-rw-r--r--Ticker.hs36
-rw-r--r--dht-client.cabal2
4 files changed, 73 insertions, 57 deletions
diff --git a/Announcer.hs b/Announcer.hs
index a9b30940..4f4eba8f 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -16,7 +16,7 @@ module Announcer
16 16
17import Data.Wrapper.PSQ as PSQ 17import Data.Wrapper.PSQ as PSQ
18import Network.Kademlia.Search 18import Network.Kademlia.Search
19import Ticker 19import InterruptibleDelay
20 20
21import Control.Concurrent.Lifted.Instrument 21import Control.Concurrent.Lifted.Instrument
22import Control.Concurrent.STM 22import Control.Concurrent.STM
@@ -42,10 +42,9 @@ data ScheduledItem
42 | StopAnnouncer 42 | StopAnnouncer
43 43
44data Announcer = Announcer 44data Announcer = Announcer
45 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) 45 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem)
46 , announcerActive :: TVar Bool 46 , announcerActive :: TVar Bool
47 , lastTick :: TVar POSIXTime 47 , interrutible :: InterruptibleDelay
48 , announceTicker :: Ticker
49 } 48 }
50 49
51scheduleImmediately :: Announcer -> ScheduledItem -> STM () 50scheduleImmediately :: Announcer -> ScheduledItem -> STM ()
@@ -62,25 +61,27 @@ data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod
62 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) 61 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
63 } 62 }
64 63
64-- startDelay :: InterruptibleDelay -> Microseconds -> IO Bool
65-- interruptDelay :: InterruptibleDelay -> IO ()
66
65schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 67schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
66schedule _ _ _ _ = do 68schedule announcer _ _ _ = do
67 -- fork the search 69 -- fork the search
68 -- add it to the priority queue of announce methods. 70 -- add it to the priority queue of announce methods.
69 -- update ticker 71 interruptDelay (interrutible announcer)
70 return ()
71 72
72cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 73cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
73cancel _ _ _ _ = return () 74cancel announcer _ _ _ = do
75 -- cancel search/announce
76 interruptDelay (interrutible announcer)
74 77
75 78
76forkAnnouncer :: IO Announcer 79forkAnnouncer :: IO Announcer
77forkAnnouncer = do 80forkAnnouncer = do
78 tickvar <- atomically $ newTVar 0 81 delay <- interruptibleDelay
79 ticker <- forkTicker $ writeTVar tickvar
80 announcer <- atomically $ Announcer <$> newTVar PSQ.empty 82 announcer <- atomically $ Announcer <$> newTVar PSQ.empty
81 <*> newTVar True 83 <*> newTVar True
82 <*> pure tickvar 84 <*> pure delay
83 <*> pure ticker
84 fork $ announceThread announcer 85 fork $ announceThread announcer
85 return announcer 86 return announcer
86 87
@@ -89,14 +90,24 @@ announceThread :: Announcer -> IO ()
89announceThread announcer = do 90announceThread announcer = do
90 myThreadId >>= flip labelThread "announcer" 91 myThreadId >>= flip labelThread "announcer"
91 fix $ \loop -> do 92 fix $ \loop -> do
92 action <- atomically $ do 93 join $ atomically $ do
93 now <- readTVar $ lastTick announcer 94 item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer)
94 (item,q) <- readTVar (scheduled announcer) 95 return $ do
95 >>= maybe retry return . PSQ.minView 96 now <- getPOSIXTime
96 when (prio item > now) retry -- Is it time to do something? 97 -- Is it time to do something?
97 writeTVar (scheduled announcer) q -- Remove the event from the queue. 98 if (prio item > now)
98 performScheduledItem announcer item -- Go for it! 99 then do -- Yes. Dequeue and handle this event.
99 mapM_ (>> loop) action 100 action <- atomically $ do
101 modifyTVar' (scheduled announcer)
102 (PSQ.delete (key item))
103 performScheduledItem announcer item
104 -- Are we finished?
105 mapM_ (>> loop) -- No? Okay, perform scheduled op and loop.
106 action
107 else do -- No. Wait a bit.
108 startDelay (interrutible announcer) (microseconds $ prio item - now)
109 loop
110 -- We're done. Let 'stopAnnouncer' know that it can stop blocking.
100 atomically $ writeTVar (announcerActive announcer) False 111 atomically $ writeTVar (announcerActive announcer) False
101 112
102performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) 113performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ()))
diff --git a/InterruptibleDelay.hs b/InterruptibleDelay.hs
new file mode 100644
index 00000000..d59ec8ef
--- /dev/null
+++ b/InterruptibleDelay.hs
@@ -0,0 +1,41 @@
1module InterruptibleDelay where
2
3import Control.Concurrent
4import Control.Monad
5import Control.Exception ({-evaluate,-}handle,ErrorCall(..))
6import Data.Time.Clock (NominalDiffTime)
7
8type Microseconds = Int
9
10microseconds :: NominalDiffTime -> Microseconds
11microseconds d = round $ 1000000 * d
12
13data InterruptibleDelay = InterruptibleDelay
14 { delayThread :: MVar ThreadId
15 }
16
17interruptibleDelay :: IO InterruptibleDelay
18interruptibleDelay = do
19 fmap InterruptibleDelay newEmptyMVar
20
21startDelay :: InterruptibleDelay -> Microseconds -> IO Bool
22startDelay d interval = do
23 thread <- myThreadId
24 handle (\(ErrorCall _)-> do
25 debugNoise $ "delay interrupted"
26 return False) $ do
27 putMVar (delayThread d) thread
28 threadDelay interval
29 void $ takeMVar (delayThread d)
30 return True
31
32 where debugNoise str = return ()
33
34
35interruptDelay :: InterruptibleDelay -> IO ()
36interruptDelay d = do
37 mthread <- do
38 tryTakeMVar (delayThread d)
39 flip (maybe $ return ()) mthread $ \thread -> do
40 throwTo thread (ErrorCall "Interrupted delay")
41
diff --git a/Ticker.hs b/Ticker.hs
deleted file mode 100644
index ba423def..00000000
--- a/Ticker.hs
+++ /dev/null
@@ -1,36 +0,0 @@
1module Ticker where
2
3import Control.Concurrent
4import Control.Concurrent.STM
5import Control.Exception
6import Control.Monad
7import Data.Function
8import Data.Time.Clock.POSIX
9import Data.Typeable
10
11data ScheduleTick = ScheduleTick Int deriving (Show, Typeable)
12instance Exception ScheduleTick
13
14newtype Ticker = Ticker ThreadId
15
16-- | Fork a thread that will invoke an STM transaction whenever a tick event
17-- occurs. The first tick happens immediately, later ticks must be scheduled
18-- with 'scheduleTick'.
19forkTicker :: (POSIXTime -> STM ()) -> IO Ticker
20forkTicker tick = do
21 getPOSIXTime >>= atomically . tick
22 tid <- forkIO $ fix $ \loop -> do
23 getPOSIXTime >>= atomically . tick
24 catch loop $ \(ScheduleTick interval) -> do
25 threadDelay interval
26 when (interval >= 0) loop
27 return $ Ticker tid
28
29-- | Schedule the next tick. If you supply a negative number, the ticker
30-- thread will terminate. Otherwise, the next tick will be scheduled for the
31-- given number of microseconds from now.
32--
33-- Note: If a tick was scheduled to happen sooner, that tick will be canceled
34-- in favor of this one. Only one tick is scheduled at a time.
35scheduleTick :: Ticker -> Int -> IO ()
36scheduleTick (Ticker tid) usecs = throwTo tid (ScheduleTick usecs)
diff --git a/dht-client.cabal b/dht-client.cabal
index 243f8a75..81d96427 100644
--- a/dht-client.cabal
+++ b/dht-client.cabal
@@ -95,7 +95,7 @@ library
95 Text.XXD 95 Text.XXD
96 Roster 96 Roster
97 Announcer 97 Announcer
98 Ticker 98 InterruptibleDelay
99 99
100 build-depends: base 100 build-depends: base
101 , containers 101 , containers