summaryrefslogtreecommitdiff
path: root/Announcer.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-10-31 17:42:15 -0400
committerjoe <joe@jerkface.net>2017-10-31 17:42:15 -0400
commit9eef4cbd00586df2fad36f3cab3d04b807b92e2f (patch)
treebc3448768e0d83a864d4f26e6d4a229865d1b9b7 /Announcer.hs
parent4b39ca7d2c7d1592fd5109b9208539ae88fce093 (diff)
WIP: a command (recurring announcements) (Part 4)
Diffstat (limited to 'Announcer.hs')
-rw-r--r--Announcer.hs118
1 files changed, 108 insertions, 10 deletions
diff --git a/Announcer.hs b/Announcer.hs
index bc52ee38..8008267c 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -1,23 +1,121 @@
1{-# LANGUAGE ExistentialQuantification #-} 1{-# LANGUAGE ExistentialQuantification #-}
2module Announcer where 2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
3{-# LANGUAGE LambdaCase #-}
4module Announcer
5 ( Announcer
6 , AnnounceKey
7 , packAnnounceKey
8 , unpackAnnounceKey
9 , AnnounceMethod(..)
10 , forkAnnouncer
11 , stopAnnouncer
12 , schedule
13 , cancel
14 ) where
3 15
16import Data.Wrapper.PSQ as PSQ
4import Network.Kademlia.Search 17import Network.Kademlia.Search
18import Ticker
19
20import Control.Concurrent.Lifted.Instrument
21import Control.Concurrent.STM
22import Control.Monad
23import Data.ByteString (ByteString)
24import qualified Data.ByteString.Char8 as Char8
25import Data.Function
26import Data.Hashable
27import Data.Maybe
28import Data.Time.Clock.POSIX
29
30newtype AnnounceKey = AnnounceKey ByteString
31 deriving (Hashable,Ord,Eq)
32
33packAnnounceKey :: Announcer -> String -> STM AnnounceKey
34packAnnounceKey _ = return . AnnounceKey . Char8.pack
35
36unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String
37unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs
38
39data ScheduledItem
40 = forall r. ScheduledItem (AnnounceMethod r)
41 | StopAnnouncer
5 42
6data Announcer = Announcer 43data Announcer = Announcer
44 { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem)
45 , announcerActive :: TVar Bool
46 , lastTick :: TVar POSIXTime
47 , announceTicker :: Ticker
48 }
7 49
8forkAnnouncer :: IO Announcer 50scheduleImmediately :: Announcer -> ScheduledItem -> STM ()
9forkAnnouncer = return Announcer 51scheduleImmediately announcer item
52 = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0)
10 53
11stopAnnouncer :: Announcer -> IO () 54stopAnnouncer :: Announcer -> IO ()
12stopAnnouncer _ = return () 55stopAnnouncer announcer = do
56 atomically $ scheduleImmediately announcer StopAnnouncer
57 atomically $ readTVar (announcerActive announcer) >>= check . not
13 58
14data AnnounceMethod ni r = forall nid addr r tok a. AnnounceMethod 59data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod
15 { aSearch :: Search nid addr tok ni r 60 { aSearch :: Search nid addr tok ni r
16 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) 61 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
17 } 62 }
18 63
19schedule :: Announcer -> AnnounceMethod ni r -> r -> IO () 64schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
20schedule _ _ _ = return () 65schedule _ _ _ _ = do
66 -- fork the search
67 -- add it to the priority queue of announce methods.
68 -- update ticker
69 return ()
70
71cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
72cancel _ _ _ _ = return ()
73
74
75forkAnnouncer :: IO Announcer
76forkAnnouncer = do
77 tickvar <- atomically $ newTVar 0
78 ticker <- forkTicker $ writeTVar tickvar
79 announcer <- atomically $ Announcer <$> newTVar PSQ.empty
80 <*> newTVar True
81 <*> pure tickvar
82 <*> pure ticker
83 fork $ announceThread announcer
84 return announcer
85
86
87announceThread :: Announcer -> IO ()
88announceThread announcer = do
89 myThreadId >>= flip labelThread "announcer"
90 fix $ \loop -> do
91 action <- atomically $ do
92 now <- readTVar $ lastTick announcer
93 (item,q) <- readTVar (scheduled announcer)
94 >>= maybe retry return . PSQ.minView
95 when (prio item > now) retry -- Is it time to do something?
96 writeTVar (scheduled announcer) q -- Remove the event from the queue.
97 performScheduledItem announcer item -- Go for it!
98 mapM_ (>> loop) action
99 atomically $ writeTVar (announcerActive announcer) False
100
101performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ()))
102performScheduledItem announcer = \case
103
104 (Binding _ StopAnnouncer _) -> return Nothing
105
106 -- announcement added:
107
108 -- wait for time to announce or for search to finish.
109 --
110 -- time for periodic announce:
111 -- (re-)announce to the current known set of storing-nodes.
112 -- If the search is finished, restart the search.
113 --
114 -- search finished:
115 -- if any of the current storing-nodes set have not been
116 -- announced to, announce to them.
117 --
118 --
119 -- announcement removed:
120 --
21 121
22cancel :: Announcer -> AnnounceMethod ni r -> r -> IO ()
23cancel _ _ _ = return ()