diff options
Diffstat (limited to 'Announcer.hs')
-rw-r--r-- | Announcer.hs | 118 |
1 files changed, 108 insertions, 10 deletions
diff --git a/Announcer.hs b/Announcer.hs index bc52ee38..8008267c 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -1,23 +1,121 @@ | |||
1 | {-# LANGUAGE ExistentialQuantification #-} | 1 | {-# LANGUAGE ExistentialQuantification #-} |
2 | module Announcer where | 2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | module Announcer | ||
5 | ( Announcer | ||
6 | , AnnounceKey | ||
7 | , packAnnounceKey | ||
8 | , unpackAnnounceKey | ||
9 | , AnnounceMethod(..) | ||
10 | , forkAnnouncer | ||
11 | , stopAnnouncer | ||
12 | , schedule | ||
13 | , cancel | ||
14 | ) where | ||
3 | 15 | ||
16 | import Data.Wrapper.PSQ as PSQ | ||
4 | import Network.Kademlia.Search | 17 | import Network.Kademlia.Search |
18 | import Ticker | ||
19 | |||
20 | import Control.Concurrent.Lifted.Instrument | ||
21 | import Control.Concurrent.STM | ||
22 | import Control.Monad | ||
23 | import Data.ByteString (ByteString) | ||
24 | import qualified Data.ByteString.Char8 as Char8 | ||
25 | import Data.Function | ||
26 | import Data.Hashable | ||
27 | import Data.Maybe | ||
28 | import Data.Time.Clock.POSIX | ||
29 | |||
30 | newtype AnnounceKey = AnnounceKey ByteString | ||
31 | deriving (Hashable,Ord,Eq) | ||
32 | |||
33 | packAnnounceKey :: Announcer -> String -> STM AnnounceKey | ||
34 | packAnnounceKey _ = return . AnnounceKey . Char8.pack | ||
35 | |||
36 | unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String | ||
37 | unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs | ||
38 | |||
39 | data ScheduledItem | ||
40 | = forall r. ScheduledItem (AnnounceMethod r) | ||
41 | | StopAnnouncer | ||
5 | 42 | ||
6 | data Announcer = Announcer | 43 | data Announcer = Announcer |
44 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) | ||
45 | , announcerActive :: TVar Bool | ||
46 | , lastTick :: TVar POSIXTime | ||
47 | , announceTicker :: Ticker | ||
48 | } | ||
7 | 49 | ||
8 | forkAnnouncer :: IO Announcer | 50 | scheduleImmediately :: Announcer -> ScheduledItem -> STM () |
9 | forkAnnouncer = return Announcer | 51 | scheduleImmediately announcer item |
52 | = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0) | ||
10 | 53 | ||
11 | stopAnnouncer :: Announcer -> IO () | 54 | stopAnnouncer :: Announcer -> IO () |
12 | stopAnnouncer _ = return () | 55 | stopAnnouncer announcer = do |
56 | atomically $ scheduleImmediately announcer StopAnnouncer | ||
57 | atomically $ readTVar (announcerActive announcer) >>= check . not | ||
13 | 58 | ||
14 | data AnnounceMethod ni r = forall nid addr r tok a. AnnounceMethod | 59 | data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod |
15 | { aSearch :: Search nid addr tok ni r | 60 | { aSearch :: Search nid addr tok ni r |
16 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) | 61 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) |
17 | } | 62 | } |
18 | 63 | ||
19 | schedule :: Announcer -> AnnounceMethod ni r -> r -> IO () | 64 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
20 | schedule _ _ _ = return () | 65 | schedule _ _ _ _ = do |
66 | -- fork the search | ||
67 | -- add it to the priority queue of announce methods. | ||
68 | -- update ticker | ||
69 | return () | ||
70 | |||
71 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
72 | cancel _ _ _ _ = return () | ||
73 | |||
74 | |||
75 | forkAnnouncer :: IO Announcer | ||
76 | forkAnnouncer = do | ||
77 | tickvar <- atomically $ newTVar 0 | ||
78 | ticker <- forkTicker $ writeTVar tickvar | ||
79 | announcer <- atomically $ Announcer <$> newTVar PSQ.empty | ||
80 | <*> newTVar True | ||
81 | <*> pure tickvar | ||
82 | <*> pure ticker | ||
83 | fork $ announceThread announcer | ||
84 | return announcer | ||
85 | |||
86 | |||
87 | announceThread :: Announcer -> IO () | ||
88 | announceThread announcer = do | ||
89 | myThreadId >>= flip labelThread "announcer" | ||
90 | fix $ \loop -> do | ||
91 | action <- atomically $ do | ||
92 | now <- readTVar $ lastTick announcer | ||
93 | (item,q) <- readTVar (scheduled announcer) | ||
94 | >>= maybe retry return . PSQ.minView | ||
95 | when (prio item > now) retry -- Is it time to do something? | ||
96 | writeTVar (scheduled announcer) q -- Remove the event from the queue. | ||
97 | performScheduledItem announcer item -- Go for it! | ||
98 | mapM_ (>> loop) action | ||
99 | atomically $ writeTVar (announcerActive announcer) False | ||
100 | |||
101 | performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) | ||
102 | performScheduledItem announcer = \case | ||
103 | |||
104 | (Binding _ StopAnnouncer _) -> return Nothing | ||
105 | |||
106 | -- announcement added: | ||
107 | |||
108 | -- wait for time to announce or for search to finish. | ||
109 | -- | ||
110 | -- time for periodic announce: | ||
111 | -- (re-)announce to the current known set of storing-nodes. | ||
112 | -- If the search is finished, restart the search. | ||
113 | -- | ||
114 | -- search finished: | ||
115 | -- if any of the current storing-nodes set have not been | ||
116 | -- announced to, announce to them. | ||
117 | -- | ||
118 | -- | ||
119 | -- announcement removed: | ||
120 | -- | ||
21 | 121 | ||
22 | cancel :: Announcer -> AnnounceMethod ni r -> r -> IO () | ||
23 | cancel _ _ _ = return () | ||