diff options
-rw-r--r-- | Announcer.hs | 118 | ||||
-rw-r--r-- | Ticker.hs | 36 | ||||
-rw-r--r-- | examples/dhtd.hs | 15 |
3 files changed, 154 insertions, 15 deletions
diff --git a/Announcer.hs b/Announcer.hs index bc52ee38..8008267c 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -1,23 +1,121 @@ | |||
1 | {-# LANGUAGE ExistentialQuantification #-} | 1 | {-# LANGUAGE ExistentialQuantification #-} |
2 | module Announcer where | 2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | module Announcer | ||
5 | ( Announcer | ||
6 | , AnnounceKey | ||
7 | , packAnnounceKey | ||
8 | , unpackAnnounceKey | ||
9 | , AnnounceMethod(..) | ||
10 | , forkAnnouncer | ||
11 | , stopAnnouncer | ||
12 | , schedule | ||
13 | , cancel | ||
14 | ) where | ||
3 | 15 | ||
16 | import Data.Wrapper.PSQ as PSQ | ||
4 | import Network.Kademlia.Search | 17 | import Network.Kademlia.Search |
18 | import Ticker | ||
19 | |||
20 | import Control.Concurrent.Lifted.Instrument | ||
21 | import Control.Concurrent.STM | ||
22 | import Control.Monad | ||
23 | import Data.ByteString (ByteString) | ||
24 | import qualified Data.ByteString.Char8 as Char8 | ||
25 | import Data.Function | ||
26 | import Data.Hashable | ||
27 | import Data.Maybe | ||
28 | import Data.Time.Clock.POSIX | ||
29 | |||
30 | newtype AnnounceKey = AnnounceKey ByteString | ||
31 | deriving (Hashable,Ord,Eq) | ||
32 | |||
33 | packAnnounceKey :: Announcer -> String -> STM AnnounceKey | ||
34 | packAnnounceKey _ = return . AnnounceKey . Char8.pack | ||
35 | |||
36 | unpackAnnounceKey :: AnnounceKey -> AnnounceKey -> STM String | ||
37 | unpackAnnounceKey _ (AnnounceKey bs) = return $ Char8.unpack bs | ||
38 | |||
39 | data ScheduledItem | ||
40 | = forall r. ScheduledItem (AnnounceMethod r) | ||
41 | | StopAnnouncer | ||
5 | 42 | ||
6 | data Announcer = Announcer | 43 | data Announcer = Announcer |
44 | { scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) | ||
45 | , announcerActive :: TVar Bool | ||
46 | , lastTick :: TVar POSIXTime | ||
47 | , announceTicker :: Ticker | ||
48 | } | ||
7 | 49 | ||
8 | forkAnnouncer :: IO Announcer | 50 | scheduleImmediately :: Announcer -> ScheduledItem -> STM () |
9 | forkAnnouncer = return Announcer | 51 | scheduleImmediately announcer item |
52 | = modifyTVar' (scheduled announcer) (PSQ.insert' (AnnounceKey "") item 0) | ||
10 | 53 | ||
11 | stopAnnouncer :: Announcer -> IO () | 54 | stopAnnouncer :: Announcer -> IO () |
12 | stopAnnouncer _ = return () | 55 | stopAnnouncer announcer = do |
56 | atomically $ scheduleImmediately announcer StopAnnouncer | ||
57 | atomically $ readTVar (announcerActive announcer) >>= check . not | ||
13 | 58 | ||
14 | data AnnounceMethod ni r = forall nid addr r tok a. AnnounceMethod | 59 | data AnnounceMethod r = forall nid ni addr r tok a. AnnounceMethod |
15 | { aSearch :: Search nid addr tok ni r | 60 | { aSearch :: Search nid addr tok ni r |
16 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) | 61 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) |
17 | } | 62 | } |
18 | 63 | ||
19 | schedule :: Announcer -> AnnounceMethod ni r -> r -> IO () | 64 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
20 | schedule _ _ _ = return () | 65 | schedule _ _ _ _ = do |
66 | -- fork the search | ||
67 | -- add it to the priority queue of announce methods. | ||
68 | -- update ticker | ||
69 | return () | ||
70 | |||
71 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
72 | cancel _ _ _ _ = return () | ||
73 | |||
74 | |||
75 | forkAnnouncer :: IO Announcer | ||
76 | forkAnnouncer = do | ||
77 | tickvar <- atomically $ newTVar 0 | ||
78 | ticker <- forkTicker $ writeTVar tickvar | ||
79 | announcer <- atomically $ Announcer <$> newTVar PSQ.empty | ||
80 | <*> newTVar True | ||
81 | <*> pure tickvar | ||
82 | <*> pure ticker | ||
83 | fork $ announceThread announcer | ||
84 | return announcer | ||
85 | |||
86 | |||
87 | announceThread :: Announcer -> IO () | ||
88 | announceThread announcer = do | ||
89 | myThreadId >>= flip labelThread "announcer" | ||
90 | fix $ \loop -> do | ||
91 | action <- atomically $ do | ||
92 | now <- readTVar $ lastTick announcer | ||
93 | (item,q) <- readTVar (scheduled announcer) | ||
94 | >>= maybe retry return . PSQ.minView | ||
95 | when (prio item > now) retry -- Is it time to do something? | ||
96 | writeTVar (scheduled announcer) q -- Remove the event from the queue. | ||
97 | performScheduledItem announcer item -- Go for it! | ||
98 | mapM_ (>> loop) action | ||
99 | atomically $ writeTVar (announcerActive announcer) False | ||
100 | |||
101 | performScheduledItem :: Announcer -> Binding' AnnounceKey POSIXTime ScheduledItem -> STM (Maybe (IO ())) | ||
102 | performScheduledItem announcer = \case | ||
103 | |||
104 | (Binding _ StopAnnouncer _) -> return Nothing | ||
105 | |||
106 | -- announcement added: | ||
107 | |||
108 | -- wait for time to announce or for search to finish. | ||
109 | -- | ||
110 | -- time for periodic announce: | ||
111 | -- (re-)announce to the current known set of storing-nodes. | ||
112 | -- If the search is finished, restart the search. | ||
113 | -- | ||
114 | -- search finished: | ||
115 | -- if any of the current storing-nodes set have not been | ||
116 | -- announced to, announce to them. | ||
117 | -- | ||
118 | -- | ||
119 | -- announcement removed: | ||
120 | -- | ||
21 | 121 | ||
22 | cancel :: Announcer -> AnnounceMethod ni r -> r -> IO () | ||
23 | cancel _ _ _ = return () | ||
diff --git a/Ticker.hs b/Ticker.hs new file mode 100644 index 00000000..ba423def --- /dev/null +++ b/Ticker.hs | |||
@@ -0,0 +1,36 @@ | |||
1 | module Ticker where | ||
2 | |||
3 | import Control.Concurrent | ||
4 | import Control.Concurrent.STM | ||
5 | import Control.Exception | ||
6 | import Control.Monad | ||
7 | import Data.Function | ||
8 | import Data.Time.Clock.POSIX | ||
9 | import Data.Typeable | ||
10 | |||
11 | data ScheduleTick = ScheduleTick Int deriving (Show, Typeable) | ||
12 | instance Exception ScheduleTick | ||
13 | |||
14 | newtype Ticker = Ticker ThreadId | ||
15 | |||
16 | -- | Fork a thread that will invoke an STM transaction whenever a tick event | ||
17 | -- occurs. The first tick happens immediately, later ticks must be scheduled | ||
18 | -- with 'scheduleTick'. | ||
19 | forkTicker :: (POSIXTime -> STM ()) -> IO Ticker | ||
20 | forkTicker tick = do | ||
21 | getPOSIXTime >>= atomically . tick | ||
22 | tid <- forkIO $ fix $ \loop -> do | ||
23 | getPOSIXTime >>= atomically . tick | ||
24 | catch loop $ \(ScheduleTick interval) -> do | ||
25 | threadDelay interval | ||
26 | when (interval >= 0) loop | ||
27 | return $ Ticker tid | ||
28 | |||
29 | -- | Schedule the next tick. If you supply a negative number, the ticker | ||
30 | -- thread will terminate. Otherwise, the next tick will be scheduled for the | ||
31 | -- given number of microseconds from now. | ||
32 | -- | ||
33 | -- Note: If a tick was scheduled to happen sooner, that tick will be canceled | ||
34 | -- in favor of this one. Only one tick is scheduled at a time. | ||
35 | scheduleTick :: Ticker -> Int -> IO () | ||
36 | scheduleTick (Ticker tid) usecs = throwTo tid (ScheduleTick usecs) | ||
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index fa4ce95b..931e1ba0 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -311,6 +311,8 @@ forkSearch method nid DHTQuery{qsearch,qshowTok,qshowR} dhtSearches dhtBuckets | |||
311 | , searchResults = results | 311 | , searchResults = results |
312 | } | 312 | } |
313 | modifyTVar' dhtSearches $ Map.insert (method,nid) new | 313 | modifyTVar' dhtSearches $ Map.insert (method,nid) new |
314 | -- Finally, we write the search loop action into a tvar that will be executed in a new | ||
315 | -- thread. | ||
314 | writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st | 316 | writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st |
315 | 317 | ||
316 | reportSearchResults :: (Show t, Ord t1, Ord t, Hashable t) => | 318 | reportSearchResults :: (Show t, Ord t1, Ord t, Hashable t) => |
@@ -552,10 +554,10 @@ clientSession s@Session{..} sock cnum h = do | |||
552 | (dtastr,ys) = break isSpace $ dropWhile isSpace xs | 554 | (dtastr,ys) = break isSpace $ dropWhile isSpace xs |
553 | a = Map.lookup method dhtAnnouncables | 555 | a = Map.lookup method dhtAnnouncables |
554 | q = Map.lookup method dhtQuery | 556 | q = Map.lookup method dhtQuery |
555 | doit :: Char -> proxy ni -> Announcer -> AnnounceMethod ni r -> r -> IO () | 557 | doit :: Char -> proxy ni -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
556 | doit '+' _ = schedule | 558 | doit '+' _ = schedule |
557 | doit '-' _ = cancel | 559 | doit '-' _ = cancel |
558 | doit _ _ = \_ _ _ -> hPutClient h "Starting(+) or canceling(-)?" | 560 | doit _ _ = \_ _ _ _ -> hPutClient h "Starting(+) or canceling(-)?" |
559 | matchingResult :: | 561 | matchingResult :: |
560 | ( Typeable sr | 562 | ( Typeable sr |
561 | , Typeable stok | 563 | , Typeable stok |
@@ -573,9 +575,12 @@ clientSession s@Session{..} sock cnum h = do | |||
573 | DHTQuery { qsearch } <- q | 575 | DHTQuery { qsearch } <- q |
574 | (Refl,Refl,nr@Refl) <- matchingResult qsearch announceSendData | 576 | (Refl,Refl,nr@Refl) <- matchingResult qsearch announceSendData |
575 | dta <- either (const Nothing) Just $ announceParseData dtastr | 577 | dta <- either (const Nothing) Just $ announceParseData dtastr |
576 | return $ doit op nr announcer | 578 | return $ do |
577 | (AnnounceMethod qsearch announceSendData) | 579 | akey <- atomically $ packAnnounceKey announcer (method ++ ":" ++ dtastr) |
578 | dta | 580 | doit op nr announcer |
581 | akey | ||
582 | (AnnounceMethod qsearch announceSendData) | ||
583 | dta | ||
579 | fromMaybe (hPutClient h "error.") mameth | 584 | fromMaybe (hPutClient h "error.") mameth |
580 | 585 | ||
581 | ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts | 586 | ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts |