summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-10-31 17:42:15 -0400
committerjoe <joe@jerkface.net>2017-10-31 17:42:15 -0400
commit9eef4cbd00586df2fad36f3cab3d04b807b92e2f (patch)
treebc3448768e0d83a864d4f26e6d4a229865d1b9b7
parent4b39ca7d2c7d1592fd5109b9208539ae88fce093 (diff)
WIP: a command (recurring announcements) (Part 4)
-rw-r--r--Announcer.hs118
-rw-r--r--Ticker.hs36
-rw-r--r--examples/dhtd.hs15
3 files changed, 154 insertions, 15 deletions
diff --git a/Announcer.hs b/Announcer.hs
index bc52ee38..8008267c 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -1,23 +1,121 @@
1{-# LANGUAGE ExistentialQuantification #-} 1{-# LANGUAGE ExistentialQuantification #-}
2module Announcer where 2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
3{-# LANGUAGE LambdaCase #-}
4module Announcer
5 ( Announcer
6 , AnnounceKey
7 , packAnnounceKey
8 , unpackAnnounceKey
9 , AnnounceMethod(..)
10 , forkAnnouncer
11 , stopAnnouncer
12 , schedule
13 , cancel
14 ) where
3 15
16import Data.Wrapper.PSQ as PSQ
4import Network.Kademlia.Search 17import Network.Kademlia.Search
18import Ticker
19
20import Control.Concurrent.Lifted.Instrument
21import Control.Concurrent.STM
22import Control.Monad
23import Data.ByteString (ByteString)
24import qualified Data.ByteString.Char8 as Char8
25import Data.Function
26import Data.Hashable
27import Data.Maybe
28import Data.Time.Clock.POSIX
29
30newtype AnnounceKey = AnnounceKey ByteString
31 deriving (Hashable,Ord,Eq)
32
33packAnnounceKey :: Announcer -> String -> STM AnnounceKey
34packAnnounceKey _ = return . AnnounceKey . Char8.pack
35
36unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String
37unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs
38
39data ScheduledItem
40 = forall r. ScheduledItem (AnnounceMethod r)
41 | StopAnnouncer
5 42
6data Announcer = Announcer 43data Announcer = Announcer
44 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem)
45 , announcerActive :: TVar Bool
46 , lastTick :: TVar POSIXTime
47 , announceTicker :: Ticker
48 }
7 49
8forkAnnouncer :: IO Announcer 50scheduleImmediately :: Announcer -> ScheduledItem -> STM ()
9forkAnnouncer = return Announcer 51scheduleImmediately announcer item
52 = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0)
10 53
11stopAnnouncer :: Announcer -> IO () 54stopAnnouncer :: Announcer -> IO ()
12stopAnnouncer _ = return () 55stopAnnouncer announcer = do
56 atomically $ scheduleImmediately announcer StopAnnouncer
57 atomically $ readTVar (announcerActive announcer) >>= check . not
13 58
14data AnnounceMethod ni r = forall nid addr r tok a. AnnounceMethod 59data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod
15 { aSearch :: Search nid addr tok ni r 60 { aSearch :: Search nid addr tok ni r
16 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) 61 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
17 } 62 }
18 63
19schedule :: Announcer -> AnnounceMethod ni r -> r -> IO () 64schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
20schedule _ _ _ = return () 65schedule _ _ _ _ = do
66 -- fork the search
67 -- add it to the priority queue of announce methods.
68 -- update ticker
69 return ()
70
71cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
72cancel _ _ _ _ = return ()
73
74
75forkAnnouncer :: IO Announcer
76forkAnnouncer = do
77 tickvar <- atomically $ newTVar 0
78 ticker <- forkTicker $ writeTVar tickvar
79 announcer <- atomically $ Announcer <$> newTVar PSQ.empty
80 <*> newTVar True
81 <*> pure tickvar
82 <*> pure ticker
83 fork $ announceThread announcer
84 return announcer
85
86
87announceThread :: Announcer -> IO ()
88announceThread announcer = do
89 myThreadId >>= flip labelThread "announcer"
90 fix $ \loop -> do
91 action <- atomically $ do
92 now <- readTVar $ lastTick announcer
93 (item,q) <- readTVar (scheduled announcer)
94 >>= maybe retry return . PSQ.minView
95 when (prio item > now) retry -- Is it time to do something?
96 writeTVar (scheduled announcer) q -- Remove the event from the queue.
97 performScheduledItem announcer item -- Go for it!
98 mapM_ (>> loop) action
99 atomically $ writeTVar (announcerActive announcer) False
100
101performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ()))
102performScheduledItem announcer = \case
103
104 (Binding _ StopAnnouncer _) -> return Nothing
105
106 -- announcement added:
107
108 -- wait for time to announce or for search to finish.
109 --
110 -- time for periodic announce:
111 -- (re-)announce to the current known set of storing-nodes.
112 -- If the search is finished, restart the search.
113 --
114 -- search finished:
115 -- if any of the current storing-nodes set have not been
116 -- announced to, announce to them.
117 --
118 --
119 -- announcement removed:
120 --
21 121
22cancel :: Announcer -> AnnounceMethod ni r -> r -> IO ()
23cancel _ _ _ = return ()
diff --git a/Ticker.hs b/Ticker.hs
new file mode 100644
index 00000000..ba423def
--- /dev/null
+++ b/Ticker.hs
@@ -0,0 +1,36 @@
1module Ticker where
2
3import Control.Concurrent
4import Control.Concurrent.STM
5import Control.Exception
6import Control.Monad
7import Data.Function
8import Data.Time.Clock.POSIX
9import Data.Typeable
10
11data ScheduleTick = ScheduleTick Int deriving (Show, Typeable)
12instance Exception ScheduleTick
13
14newtype Ticker = Ticker ThreadId
15
16-- | Fork a thread that will invoke an STM transaction whenever a tick event
17-- occurs. The first tick happens immediately, later ticks must be scheduled
18-- with 'scheduleTick'.
19forkTicker :: (POSIXTime -> STM ()) -> IO Ticker
20forkTicker tick = do
21 getPOSIXTime >>= atomically . tick
22 tid <- forkIO $ fix $ \loop -> do
23 getPOSIXTime >>= atomically . tick
24 catch loop $ \(ScheduleTick interval) -> do
25 threadDelay interval
26 when (interval >= 0) loop
27 return $ Ticker tid
28
29-- | Schedule the next tick. If you supply a negative number, the ticker
30-- thread will terminate. Otherwise, the next tick will be scheduled for the
31-- given number of microseconds from now.
32--
33-- Note: If a tick was scheduled to happen sooner, that tick will be canceled
34-- in favor of this one. Only one tick is scheduled at a time.
35scheduleTick :: Ticker -> Int -> IO ()
36scheduleTick (Ticker tid) usecs = throwTo tid (ScheduleTick usecs)
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index fa4ce95b..931e1ba0 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -311,6 +311,8 @@ forkSearch method nid DHTQuery{qsearch,qshowTok,qshowR} dhtSearches dhtBuckets
311 , searchResults = results 311 , searchResults = results
312 } 312 }
313 modifyTVar' dhtSearches $ Map.insert (method,nid) new 313 modifyTVar' dhtSearches $ Map.insert (method,nid) new
314 -- Finally, we write the search loop action into a tvar that will be executed in a new
315 -- thread.
314 writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st 316 writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st
315 317
316reportSearchResults :: (Show t, Ord t1, Ord t, Hashable t) => 318reportSearchResults :: (Show t, Ord t1, Ord t, Hashable t) =>
@@ -552,10 +554,10 @@ clientSession s@Session{..} sock cnum h = do
552 (dtastr,ys) = break isSpace $ dropWhile isSpace xs 554 (dtastr,ys) = break isSpace $ dropWhile isSpace xs
553 a = Map.lookup method dhtAnnouncables 555 a = Map.lookup method dhtAnnouncables
554 q = Map.lookup method dhtQuery 556 q = Map.lookup method dhtQuery
555 doit :: Char -> proxy ni -> Announcer -> AnnounceMethod ni r -> r -> IO () 557 doit :: Char -> proxy ni -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
556 doit '+' _ = schedule 558 doit '+' _ = schedule
557 doit '-' _ = cancel 559 doit '-' _ = cancel
558 doit _ _ = \_ _ _ -> hPutClient h "Starting(+) or canceling(-)?" 560 doit _ _ = \_ _ _ _ -> hPutClient h "Starting(+) or canceling(-)?"
559 matchingResult :: 561 matchingResult ::
560 ( Typeable sr 562 ( Typeable sr
561 , Typeable stok 563 , Typeable stok
@@ -573,9 +575,12 @@ clientSession s@Session{..} sock cnum h = do
573 DHTQuery { qsearch } <- q 575 DHTQuery { qsearch } <- q
574 (Refl,Refl,nr@Refl) <- matchingResult qsearch announceSendData 576 (Refl,Refl,nr@Refl) <- matchingResult qsearch announceSendData
575 dta <- either (const Nothing) Just $ announceParseData dtastr 577 dta <- either (const Nothing) Just $ announceParseData dtastr
576 return $ doit op nr announcer 578 return $ do
577 (AnnounceMethod qsearch announceSendData) 579 akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr)
578 dta 580 doit op nr announcer
581 akey
582 (AnnounceMethod qsearch announceSendData)
583 dta
579 fromMaybe (hPutClient h "error.") mameth 584 fromMaybe (hPutClient h "error.") mameth
580 585
581 ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts 586 ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts