diff options
Diffstat (limited to 'dht')
-rw-r--r-- | dht/Announcer.hs | 63 | ||||
-rw-r--r-- | dht/Announcer/Tox.hs | 2 | ||||
-rw-r--r-- | dht/ToxManager.hs | 9 | ||||
-rw-r--r-- | dht/examples/dhtd.hs | 1 |
4 files changed, 39 insertions, 36 deletions
diff --git a/dht/Announcer.hs b/dht/Announcer.hs index c6a04cb1..41f86f24 100644 --- a/dht/Announcer.hs +++ b/dht/Announcer.hs | |||
@@ -16,7 +16,6 @@ module Announcer | |||
16 | , forkAnnouncer | 16 | , forkAnnouncer |
17 | , stopAnnouncer | 17 | , stopAnnouncer |
18 | , cancel | 18 | , cancel |
19 | , itemStatusNum | ||
20 | , runAction | 19 | , runAction |
21 | , unschedule | 20 | , unschedule |
22 | , delayAction | 21 | , delayAction |
@@ -36,15 +35,17 @@ import Control.Monad | |||
36 | import Data.ByteString (ByteString) | 35 | import Data.ByteString (ByteString) |
37 | import qualified Data.ByteString.Char8 as Char8 | 36 | import qualified Data.ByteString.Char8 as Char8 |
38 | import Data.Hashable | 37 | import Data.Hashable |
38 | import Data.Time.Clock (NominalDiffTime) | ||
39 | import Data.Time.Clock.POSIX | 39 | import Data.Time.Clock.POSIX |
40 | import qualified GHC.Generics as Generics | 40 | import qualified GHC.Generics as Generics |
41 | -- import Generic.Data.Internal.Meta as Lyxia | 41 | -- import Generic.Data.Internal.Meta as Lyxia |
42 | 42 | ||
43 | -- | Actions are scheduled and canceled via this key. | ||
43 | newtype AnnounceKey = AnnounceKey ByteString | 44 | newtype AnnounceKey = AnnounceKey ByteString |
44 | deriving (Hashable,Ord,Eq) | 45 | deriving (Hashable,Ord,Eq) |
45 | 46 | ||
46 | instance Show AnnounceKey where | 47 | -- instance Show AnnounceKey where |
47 | show (AnnounceKey bs) = "AnnounceKey " ++ show (Char8.unpack bs) | 48 | -- show (AnnounceKey bs) = "AnnounceKey " ++ show (Char8.unpack bs) |
48 | 49 | ||
49 | packAnnounceKey :: Announcer -> String -> AnnounceKey | 50 | packAnnounceKey :: Announcer -> String -> AnnounceKey |
50 | packAnnounceKey _ = AnnounceKey . Char8.pack | 51 | packAnnounceKey _ = AnnounceKey . Char8.pack |
@@ -58,17 +59,9 @@ unpackAnnounceKey _ (AnnounceKey bs) = Char8.unpack bs | |||
58 | -- 'NewAnnouncement' which triggers the ordinary recurring scheduling of | 59 | -- 'NewAnnouncement' which triggers the ordinary recurring scheduling of |
59 | -- 'Announce'. | 60 | -- 'Announce'. |
60 | data ScheduledItem = ScheduledItem (Announcer -> AnnounceKey -> POSIXTime -> STM (IO ())) | 61 | data ScheduledItem = ScheduledItem (Announcer -> AnnounceKey -> POSIXTime -> STM (IO ())) |
61 | -- Can't use Data because STM and IO. :( | ||
62 | -- deriving Data {- itemStatusNum sch = constrIndex $ toConstr sch -} | ||
63 | -- Using Generic works with Lyxia's generic-data package... | ||
64 | -- {- itemStatusNum sch = Lyxia.conIdToInt $ Lyxia.conId sch -} | ||
65 | -- For now, make sure to keep 'itemStatusNum' up to date with 'ScheduledItem'. | ||
66 | deriving Generics.Generic | 62 | deriving Generics.Generic |
67 | 63 | ||
68 | 64 | ||
69 | itemStatusNum :: ScheduledItem -> Int | ||
70 | itemStatusNum = const 1 | ||
71 | |||
72 | newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem } | 65 | newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem } |
73 | 66 | ||
74 | emptySchedule :: Schedule | 67 | emptySchedule :: Schedule |
@@ -81,32 +74,37 @@ scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled annou | |||
81 | 74 | ||
82 | data Announcer = Announcer | 75 | data Announcer = Announcer |
83 | { -- | The queue of scheduled events. The priority is the time at which an | 76 | { -- | The queue of scheduled events. The priority is the time at which an |
84 | -- event is supposed to occur. Do not write to this TVar ever. | 77 | -- event is supposed to occur. |
85 | scheduled :: TVar Schedule | 78 | scheduled :: TVar Schedule |
86 | -- | This TVar is False when the Announcer thread has finished. | 79 | -- | This TVar is False when the Announcer thread has finished. |
87 | , announcerActive :: TVar Bool | 80 | , announcerActive :: TVar Bool |
88 | -- | This delay is used to wait until it's time to act on the earliest | 81 | -- | This channel is used to communicate instructions to the timing |
89 | -- scheduled item. It will be interrupted whenever a new item is | 82 | -- thread. It can be used to schedule actions or shutdown the thread. |
90 | -- scheduled. | ||
91 | , commander :: TChan SchedulerCommand | 83 | , commander :: TChan SchedulerCommand |
92 | } | 84 | } |
93 | 85 | ||
86 | -- | Schedule an action to happen immediately. | ||
94 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () | 87 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () |
95 | scheduleImmediately announcer k item = | 88 | scheduleImmediately announcer k item = |
96 | writeTChan (commander announcer) $ RunActionSTM k item | 89 | writeTChan (commander announcer) $ RunActionSTM k item |
97 | 90 | ||
91 | -- | Schedule an action to happen at a specific time. | ||
98 | scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () | 92 | scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () |
99 | scheduleAbs announcer k item absTime = | 93 | scheduleAbs announcer k item absTime = |
100 | writeTChan (commander announcer) $ ScheduleAction (k, absTime, item) | 94 | writeTChan (commander announcer) $ ScheduleAction (k, absTime, item) |
101 | 95 | ||
102 | scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () | 96 | -- | Schedule an action to occur s specified delta into the future from now. |
97 | scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> NominalDiffTime -> STM () | ||
103 | scheduleRel announcer k item relTime = | 98 | scheduleRel announcer k item relTime = |
104 | writeTChan (commander announcer) $ DelayAction (k, relTime, item) | 99 | writeTChan (commander announcer) $ DelayAction (k, relTime, item) |
105 | 100 | ||
101 | -- | Schedule an action to occur s specified delta into the future from now. | ||
102 | -- This is an alias for 'scheduleRel'. | ||
106 | delayAction = scheduleRel | 103 | delayAction = scheduleRel |
107 | 104 | ||
108 | runAction :: Announcer -> IO () -> STM () | 105 | -- | Fork an action immediately. The String argument is a thread label. |
109 | runAction announcer = writeTChan (commander announcer) . RunAction | 106 | runAction :: Announcer -> String -> IO () -> STM () |
107 | runAction announcer lbl = writeTChan (commander announcer) . RunAction lbl | ||
110 | 108 | ||
111 | -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. | 109 | -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. |
112 | stopAnnouncer :: Announcer -> IO () | 110 | stopAnnouncer :: Announcer -> IO () |
@@ -114,9 +112,11 @@ stopAnnouncer announcer = do | |||
114 | atomically $ writeTChan (commander announcer) ShutdownScheduler | 112 | atomically $ writeTChan (commander announcer) ShutdownScheduler |
115 | atomically $ readTVar (announcerActive announcer) >>= check . not | 113 | atomically $ readTVar (announcerActive announcer) >>= check . not |
116 | 114 | ||
115 | -- | Cancel a scheduled action (STM). | ||
117 | unschedule :: Announcer -> AnnounceKey -> STM () | 116 | unschedule :: Announcer -> AnnounceKey -> STM () |
118 | unschedule announcer k = writeTChan (commander announcer) $ UnscheduleAction k | 117 | unschedule announcer k = writeTChan (commander announcer) $ UnscheduleAction k |
119 | 118 | ||
119 | -- | Cancel a scheduled action (IO). | ||
120 | cancel :: Announcer -> AnnounceKey -> IO () | 120 | cancel :: Announcer -> AnnounceKey -> IO () |
121 | cancel = ((.).(.)) atomically unschedule | 121 | cancel = ((.).(.)) atomically unschedule |
122 | 122 | ||
@@ -127,21 +127,27 @@ forkAnnouncer = do | |||
127 | announcer <- atomically $ Announcer <$> newTVar emptySchedule | 127 | announcer <- atomically $ Announcer <$> newTVar emptySchedule |
128 | <*> newTVar True | 128 | <*> newTVar True |
129 | <*> newTChan | 129 | <*> newTChan |
130 | fork $ announceThread announcer | 130 | tid <- forkIO $ listener announcer |
131 | labelThread tid "announcer" | ||
131 | return announcer | 132 | return announcer |
132 | 133 | ||
134 | -- Wait for an item to occur on the given channel, or for the given TVar to be | ||
135 | -- True. | ||
133 | readTChanTimeout :: TVar Bool -> TChan a -> STM (Maybe a) | 136 | readTChanTimeout :: TVar Bool -> TChan a -> STM (Maybe a) |
134 | readTChanTimeout delay pktChannel = do | 137 | readTChanTimeout delay pktChannel = |
135 | Just <$> readTChan pktChannel <|> pure Nothing <* (readTVar >=> check) delay | 138 | orElse (Just <$> readTChan pktChannel) |
139 | (pure Nothing <* (readTVar >=> check) delay) | ||
136 | 140 | ||
137 | data SchedulerCommand | 141 | data SchedulerCommand |
138 | = ShutdownScheduler | 142 | = ShutdownScheduler |
139 | | ScheduleAction KPS -- run an action at an absolute time (todo: use UTCTime) | 143 | | ScheduleAction KPS -- run an action at an absolute time |
140 | | DelayAction KPS -- run an action at a time relative to the present (todo: use NominalDiffTime) | 144 | | DelayAction KPS -- run an action at a time relative to the present |
141 | | RunAction (IO ()) | 145 | | RunAction String (IO ()) |
142 | | RunActionSTM AnnounceKey ScheduledItem | 146 | | RunActionSTM AnnounceKey ScheduledItem |
143 | | UnscheduleAction AnnounceKey | 147 | | UnscheduleAction AnnounceKey |
144 | 148 | ||
149 | -- This is the main scheduling thread that polls the commander TChan and forks | ||
150 | -- scheduled actions. | ||
145 | listener :: Announcer -> IO () | 151 | listener :: Announcer -> IO () |
146 | listener announcer = relisten | 152 | listener announcer = relisten |
147 | -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule). | 153 | -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule). |
@@ -158,7 +164,7 @@ listener announcer = relisten | |||
158 | atomically (readTChan chan) >>= handleCommand | 164 | atomically (readTChan chan) >>= handleCommand |
159 | Just ((k, p, ScheduledItem f), queue') -> do | 165 | Just ((k, p, ScheduledItem f), queue') -> do |
160 | now <- getPOSIXTime | 166 | now <- getPOSIXTime |
161 | note $ "queue full - listening with timeout - " ++ show (p, now) | 167 | note $ "queue non-empty - listening with timeout - " ++ show (p, now) |
162 | delay <- registerDelay (toMicroseconds (p - now)) | 168 | delay <- registerDelay (toMicroseconds (p - now)) |
163 | join $ atomically $ do | 169 | join $ atomically $ do |
164 | readTChanTimeout delay chan >>= \case | 170 | readTChanTimeout delay chan >>= \case |
@@ -174,7 +180,7 @@ listener announcer = relisten | |||
174 | case cmd of | 180 | case cmd of |
175 | ScheduleAction (k, p, s) -> atomically $ modifyScheduled $ PSQ.insert' k s p | 181 | ScheduleAction (k, p, s) -> atomically $ modifyScheduled $ PSQ.insert' k s p |
176 | UnscheduleAction k -> atomically $ modifyScheduled $ PSQ.delete k | 182 | UnscheduleAction k -> atomically $ modifyScheduled $ PSQ.delete k |
177 | RunAction io -> void $ fork io | 183 | RunAction lbl io -> void $ forkLabeled lbl io |
178 | RunActionSTM k (ScheduledItem f) -> do | 184 | RunActionSTM k (ScheduledItem f) -> do |
179 | now <- getPOSIXTime | 185 | now <- getPOSIXTime |
180 | void . fork . join . atomically $ modifyScheduled (PSQ.delete k) >> f announcer k now | 186 | void . fork . join . atomically $ modifyScheduled (PSQ.delete k) >> f announcer k now |
@@ -182,8 +188,3 @@ listener announcer = relisten | |||
182 | now <- getPOSIXTime | 188 | now <- getPOSIXTime |
183 | atomically $ modifyScheduled $ PSQ.insert' k s (now + p) | 189 | atomically $ modifyScheduled $ PSQ.insert' k s (now + p) |
184 | 190 | ||
185 | announceThread :: Announcer -> IO () | ||
186 | announceThread announcer = do | ||
187 | myThreadId >>= flip labelThread "announcer" | ||
188 | listener announcer | ||
189 | |||
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs index f8343f8d..4b775049 100644 --- a/dht/Announcer/Tox.hs +++ b/dht/Announcer/Tox.hs | |||
@@ -183,7 +183,7 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge | |||
183 | ns <- atomically $ newTVar MM.empty | 183 | ns <- atomically $ newTVar MM.empty |
184 | let astate = AnnounceState st ns | 184 | let astate = AnnounceState st ns |
185 | onResult sr = do | 185 | onResult sr = do |
186 | runAction announcer $ do | 186 | runAction announcer "with-search-result" $ do |
187 | got <- sWithResult r sr | 187 | got <- sWithResult r sr |
188 | -- If we had a way to get the source of a search result, we might want to | 188 | -- If we had a way to get the source of a search result, we might want to |
189 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | 189 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' |
diff --git a/dht/ToxManager.hs b/dht/ToxManager.hs index 408b12d2..39377733 100644 --- a/dht/ToxManager.hs +++ b/dht/ToxManager.hs | |||
@@ -327,13 +327,16 @@ gotDhtPubkey theirDhtKey tx theirKey = do | |||
327 | , rumoredAddress = assume akey | 327 | , rumoredAddress = assume akey |
328 | } | 328 | } |
329 | 329 | ||
330 | showak :: AnnounceKey -> String | ||
331 | showak k = unpackAnnounceKey (txAnnouncer tx) k | ||
332 | |||
330 | assume :: AnnounceKey -> POSIXTime -> SockAddr -> NodeInfo -> STM () | 333 | assume :: AnnounceKey -> POSIXTime -> SockAddr -> NodeInfo -> STM () |
331 | assume akey time addr ni = | 334 | assume akey time addr ni = |
332 | tput XNodeinfoSearch $ show ("rumor", akey, time, addr, ni) | 335 | tput XNodeinfoSearch $ show ("rumor", showak akey, time, addr, ni) |
333 | 336 | ||
334 | observe :: AnnounceKey -> POSIXTime -> NodeInfo -> STM () | 337 | observe :: AnnounceKey -> POSIXTime -> NodeInfo -> STM () |
335 | observe akey time ni@(nodeAddr -> addr) = do | 338 | observe akey time ni@(nodeAddr -> addr) = do |
336 | tput XNodeinfoSearch $ show ("observation", akey, time, addr) | 339 | tput XNodeinfoSearch $ show ("observation", showak akey, time, addr) |
337 | setContactAddr time theirKey ni (txAccount tx) | 340 | setContactAddr time theirKey ni (txAccount tx) |
338 | 341 | ||
339 | gotAddr :: NodeInfo -> ToxToXMPP -> PublicKey -> IO () | 342 | gotAddr :: NodeInfo -> ToxToXMPP -> PublicKey -> IO () |
@@ -408,7 +411,7 @@ gotAddr' ni@(nodeAddr -> addr) tx theirKey theirDhtKey = atomically blee | |||
408 | getCookie ni isActive getC ann akey now = getCookieAgain | 411 | getCookie ni isActive getC ann akey now = getCookieAgain |
409 | where | 412 | where |
410 | getCookieAgain = do | 413 | getCookieAgain = do |
411 | tput XNodeinfoSearch $ show ("getCookieAgain", akey) | 414 | tput XNodeinfoSearch $ show ("getCookieAgain", unpackAnnounceKey ann akey) |
412 | mbContact <- getC | 415 | mbContact <- getC |
413 | case mbContact of | 416 | case mbContact of |
414 | Nothing -> return $ return () | 417 | Nothing -> return $ return () |
diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index e24bb5df..42fcc67b 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs | |||
@@ -872,7 +872,6 @@ clientSession s@Session{..} sock cnum h = do | |||
872 | let kstr = unpackAnnounceKey announcer k | 872 | let kstr = unpackAnnounceKey announcer k |
873 | return [ if ptm==0 then "now" | 873 | return [ if ptm==0 then "now" |
874 | else show (ptm - now) | 874 | else show (ptm - now) |
875 | , show (itemStatusNum item) | ||
876 | , kstr | 875 | , kstr |
877 | ] | 876 | ] |
878 | hPutClient h $ showColumns rs | 877 | hPutClient h $ showColumns rs |