summaryrefslogtreecommitdiff
path: root/dht
diff options
context:
space:
mode:
Diffstat (limited to 'dht')
-rw-r--r--dht/Announcer.hs63
-rw-r--r--dht/Announcer/Tox.hs2
-rw-r--r--dht/ToxManager.hs9
-rw-r--r--dht/examples/dhtd.hs1
4 files changed, 39 insertions, 36 deletions
diff --git a/dht/Announcer.hs b/dht/Announcer.hs
index c6a04cb1..41f86f24 100644
--- a/dht/Announcer.hs
+++ b/dht/Announcer.hs
@@ -16,7 +16,6 @@ module Announcer
16 , forkAnnouncer 16 , forkAnnouncer
17 , stopAnnouncer 17 , stopAnnouncer
18 , cancel 18 , cancel
19 , itemStatusNum
20 , runAction 19 , runAction
21 , unschedule 20 , unschedule
22 , delayAction 21 , delayAction
@@ -36,15 +35,17 @@ import Control.Monad
36import Data.ByteString (ByteString) 35import Data.ByteString (ByteString)
37import qualified Data.ByteString.Char8 as Char8 36import qualified Data.ByteString.Char8 as Char8
38import Data.Hashable 37import Data.Hashable
38import Data.Time.Clock (NominalDiffTime)
39import Data.Time.Clock.POSIX 39import Data.Time.Clock.POSIX
40import qualified GHC.Generics as Generics 40import qualified GHC.Generics as Generics
41-- import Generic.Data.Internal.Meta as Lyxia 41-- import Generic.Data.Internal.Meta as Lyxia
42 42
43-- | Actions are scheduled and canceled via this key.
43newtype AnnounceKey = AnnounceKey ByteString 44newtype AnnounceKey = AnnounceKey ByteString
44 deriving (Hashable,Ord,Eq) 45 deriving (Hashable,Ord,Eq)
45 46
46instance Show AnnounceKey where 47-- instance Show AnnounceKey where
47 show (AnnounceKey bs) = "AnnounceKey " ++ show (Char8.unpack bs) 48-- show (AnnounceKey bs) = "AnnounceKey " ++ show (Char8.unpack bs)
48 49
49packAnnounceKey :: Announcer -> String -> AnnounceKey 50packAnnounceKey :: Announcer -> String -> AnnounceKey
50packAnnounceKey _ = AnnounceKey . Char8.pack 51packAnnounceKey _ = AnnounceKey . Char8.pack
@@ -58,17 +59,9 @@ unpackAnnounceKey _ (AnnounceKey bs) = Char8.unpack bs
58-- 'NewAnnouncement' which triggers the ordinary recurring scheduling of 59-- 'NewAnnouncement' which triggers the ordinary recurring scheduling of
59-- 'Announce'. 60-- 'Announce'.
60data ScheduledItem = ScheduledItem (Announcer -> AnnounceKey -> POSIXTime -> STM (IO ())) 61data ScheduledItem = ScheduledItem (Announcer -> AnnounceKey -> POSIXTime -> STM (IO ()))
61 -- Can't use Data because STM and IO. :(
62 -- deriving Data {- itemStatusNum sch = constrIndex $ toConstr sch -}
63 -- Using Generic works with Lyxia's generic-data package...
64 -- {- itemStatusNum sch = Lyxia.conIdToInt $ Lyxia.conId sch -}
65 -- For now, make sure to keep 'itemStatusNum' up to date with 'ScheduledItem'.
66 deriving Generics.Generic 62 deriving Generics.Generic
67 63
68 64
69itemStatusNum :: ScheduledItem -> Int
70itemStatusNum = const 1
71
72newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem } 65newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem }
73 66
74emptySchedule :: Schedule 67emptySchedule :: Schedule
@@ -81,32 +74,37 @@ scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled annou
81 74
82data Announcer = Announcer 75data Announcer = Announcer
83 { -- | The queue of scheduled events. The priority is the time at which an 76 { -- | The queue of scheduled events. The priority is the time at which an
84 -- event is supposed to occur. Do not write to this TVar ever. 77 -- event is supposed to occur.
85 scheduled :: TVar Schedule 78 scheduled :: TVar Schedule
86 -- | This TVar is False when the Announcer thread has finished. 79 -- | This TVar is False when the Announcer thread has finished.
87 , announcerActive :: TVar Bool 80 , announcerActive :: TVar Bool
88 -- | This delay is used to wait until it's time to act on the earliest 81 -- | This channel is used to communicate instructions to the timing
89 -- scheduled item. It will be interrupted whenever a new item is 82 -- thread. It can be used to schedule actions or shutdown the thread.
90 -- scheduled.
91 , commander :: TChan SchedulerCommand 83 , commander :: TChan SchedulerCommand
92 } 84 }
93 85
86-- | Schedule an action to happen immediately.
94scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () 87scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM ()
95scheduleImmediately announcer k item = 88scheduleImmediately announcer k item =
96 writeTChan (commander announcer) $ RunActionSTM k item 89 writeTChan (commander announcer) $ RunActionSTM k item
97 90
91-- | Schedule an action to happen at a specific time.
98scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () 92scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM ()
99scheduleAbs announcer k item absTime = 93scheduleAbs announcer k item absTime =
100 writeTChan (commander announcer) $ ScheduleAction (k, absTime, item) 94 writeTChan (commander announcer) $ ScheduleAction (k, absTime, item)
101 95
102scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () 96-- | Schedule an action to occur s specified delta into the future from now.
97scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> NominalDiffTime -> STM ()
103scheduleRel announcer k item relTime = 98scheduleRel announcer k item relTime =
104 writeTChan (commander announcer) $ DelayAction (k, relTime, item) 99 writeTChan (commander announcer) $ DelayAction (k, relTime, item)
105 100
101-- | Schedule an action to occur s specified delta into the future from now.
102-- This is an alias for 'scheduleRel'.
106delayAction = scheduleRel 103delayAction = scheduleRel
107 104
108runAction :: Announcer -> IO () -> STM () 105-- | Fork an action immediately. The String argument is a thread label.
109runAction announcer = writeTChan (commander announcer) . RunAction 106runAction :: Announcer -> String -> IO () -> STM ()
107runAction announcer lbl = writeTChan (commander announcer) . RunAction lbl
110 108
111-- | Terminate the 'Announcer' thread. Don't use the Announcer after this. 109-- | Terminate the 'Announcer' thread. Don't use the Announcer after this.
112stopAnnouncer :: Announcer -> IO () 110stopAnnouncer :: Announcer -> IO ()
@@ -114,9 +112,11 @@ stopAnnouncer announcer = do
114 atomically $ writeTChan (commander announcer) ShutdownScheduler 112 atomically $ writeTChan (commander announcer) ShutdownScheduler
115 atomically $ readTVar (announcerActive announcer) >>= check . not 113 atomically $ readTVar (announcerActive announcer) >>= check . not
116 114
115-- | Cancel a scheduled action (STM).
117unschedule :: Announcer -> AnnounceKey -> STM () 116unschedule :: Announcer -> AnnounceKey -> STM ()
118unschedule announcer k = writeTChan (commander announcer) $ UnscheduleAction k 117unschedule announcer k = writeTChan (commander announcer) $ UnscheduleAction k
119 118
119-- | Cancel a scheduled action (IO).
120cancel :: Announcer -> AnnounceKey -> IO () 120cancel :: Announcer -> AnnounceKey -> IO ()
121cancel = ((.).(.)) atomically unschedule 121cancel = ((.).(.)) atomically unschedule
122 122
@@ -127,21 +127,27 @@ forkAnnouncer = do
127 announcer <- atomically $ Announcer <$> newTVar emptySchedule 127 announcer <- atomically $ Announcer <$> newTVar emptySchedule
128 <*> newTVar True 128 <*> newTVar True
129 <*> newTChan 129 <*> newTChan
130 fork $ announceThread announcer 130 tid <- forkIO $ listener announcer
131 labelThread tid "announcer"
131 return announcer 132 return announcer
132 133
134-- Wait for an item to occur on the given channel, or for the given TVar to be
135-- True.
133readTChanTimeout :: TVar Bool -> TChan a -> STM (Maybe a) 136readTChanTimeout :: TVar Bool -> TChan a -> STM (Maybe a)
134readTChanTimeout delay pktChannel = do 137readTChanTimeout delay pktChannel =
135 Just <$> readTChan pktChannel <|> pure Nothing <* (readTVar >=> check) delay 138 orElse (Just <$> readTChan pktChannel)
139 (pure Nothing <* (readTVar >=> check) delay)
136 140
137data SchedulerCommand 141data SchedulerCommand
138 = ShutdownScheduler 142 = ShutdownScheduler
139 | ScheduleAction KPS -- run an action at an absolute time (todo: use UTCTime) 143 | ScheduleAction KPS -- run an action at an absolute time
140 | DelayAction KPS -- run an action at a time relative to the present (todo: use NominalDiffTime) 144 | DelayAction KPS -- run an action at a time relative to the present
141 | RunAction (IO ()) 145 | RunAction String (IO ())
142 | RunActionSTM AnnounceKey ScheduledItem 146 | RunActionSTM AnnounceKey ScheduledItem
143 | UnscheduleAction AnnounceKey 147 | UnscheduleAction AnnounceKey
144 148
149-- This is the main scheduling thread that polls the commander TChan and forks
150-- scheduled actions.
145listener :: Announcer -> IO () 151listener :: Announcer -> IO ()
146listener announcer = relisten 152listener announcer = relisten
147 -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule). 153 -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule).
@@ -158,7 +164,7 @@ listener announcer = relisten
158 atomically (readTChan chan) >>= handleCommand 164 atomically (readTChan chan) >>= handleCommand
159 Just ((k, p, ScheduledItem f), queue') -> do 165 Just ((k, p, ScheduledItem f), queue') -> do
160 now <- getPOSIXTime 166 now <- getPOSIXTime
161 note $ "queue full - listening with timeout - " ++ show (p, now) 167 note $ "queue non-empty - listening with timeout - " ++ show (p, now)
162 delay <- registerDelay (toMicroseconds (p - now)) 168 delay <- registerDelay (toMicroseconds (p - now))
163 join $ atomically $ do 169 join $ atomically $ do
164 readTChanTimeout delay chan >>= \case 170 readTChanTimeout delay chan >>= \case
@@ -174,7 +180,7 @@ listener announcer = relisten
174 case cmd of 180 case cmd of
175 ScheduleAction (k, p, s) -> atomically $ modifyScheduled $ PSQ.insert' k s p 181 ScheduleAction (k, p, s) -> atomically $ modifyScheduled $ PSQ.insert' k s p
176 UnscheduleAction k -> atomically $ modifyScheduled $ PSQ.delete k 182 UnscheduleAction k -> atomically $ modifyScheduled $ PSQ.delete k
177 RunAction io -> void $ fork io 183 RunAction lbl io -> void $ forkLabeled lbl io
178 RunActionSTM k (ScheduledItem f) -> do 184 RunActionSTM k (ScheduledItem f) -> do
179 now <- getPOSIXTime 185 now <- getPOSIXTime
180 void . fork . join . atomically $ modifyScheduled (PSQ.delete k) >> f announcer k now 186 void . fork . join . atomically $ modifyScheduled (PSQ.delete k) >> f announcer k now
@@ -182,8 +188,3 @@ listener announcer = relisten
182 now <- getPOSIXTime 188 now <- getPOSIXTime
183 atomically $ modifyScheduled $ PSQ.insert' k s (now + p) 189 atomically $ modifyScheduled $ PSQ.insert' k s (now + p)
184 190
185announceThread :: Announcer -> IO ()
186announceThread announcer = do
187 myThreadId >>= flip labelThread "announcer"
188 listener announcer
189
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs
index f8343f8d..4b775049 100644
--- a/dht/Announcer/Tox.hs
+++ b/dht/Announcer/Tox.hs
@@ -183,7 +183,7 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge
183 ns <- atomically $ newTVar MM.empty 183 ns <- atomically $ newTVar MM.empty
184 let astate = AnnounceState st ns 184 let astate = AnnounceState st ns
185 onResult sr = do 185 onResult sr = do
186 runAction announcer $ do 186 runAction announcer "with-search-result" $ do
187 got <- sWithResult r sr 187 got <- sWithResult r sr
188 -- If we had a way to get the source of a search result, we might want to 188 -- If we had a way to get the source of a search result, we might want to
189 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' 189 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
diff --git a/dht/ToxManager.hs b/dht/ToxManager.hs
index 408b12d2..39377733 100644
--- a/dht/ToxManager.hs
+++ b/dht/ToxManager.hs
@@ -327,13 +327,16 @@ gotDhtPubkey theirDhtKey tx theirKey = do
327 , rumoredAddress = assume akey 327 , rumoredAddress = assume akey
328 } 328 }
329 329
330 showak :: AnnounceKey -> String
331 showak k = unpackAnnounceKey (txAnnouncer tx) k
332
330 assume :: AnnounceKey -> POSIXTime -> SockAddr -> NodeInfo -> STM () 333 assume :: AnnounceKey -> POSIXTime -> SockAddr -> NodeInfo -> STM ()
331 assume akey time addr ni = 334 assume akey time addr ni =
332 tput XNodeinfoSearch $ show ("rumor", akey, time, addr, ni) 335 tput XNodeinfoSearch $ show ("rumor", showak akey, time, addr, ni)
333 336
334 observe :: AnnounceKey -> POSIXTime -> NodeInfo -> STM () 337 observe :: AnnounceKey -> POSIXTime -> NodeInfo -> STM ()
335 observe akey time ni@(nodeAddr -> addr) = do 338 observe akey time ni@(nodeAddr -> addr) = do
336 tput XNodeinfoSearch $ show ("observation", akey, time, addr) 339 tput XNodeinfoSearch $ show ("observation", showak akey, time, addr)
337 setContactAddr time theirKey ni (txAccount tx) 340 setContactAddr time theirKey ni (txAccount tx)
338 341
339gotAddr :: NodeInfo -> ToxToXMPP -> PublicKey -> IO () 342gotAddr :: NodeInfo -> ToxToXMPP -> PublicKey -> IO ()
@@ -408,7 +411,7 @@ gotAddr' ni@(nodeAddr -> addr) tx theirKey theirDhtKey = atomically blee
408 getCookie ni isActive getC ann akey now = getCookieAgain 411 getCookie ni isActive getC ann akey now = getCookieAgain
409 where 412 where
410 getCookieAgain = do 413 getCookieAgain = do
411 tput XNodeinfoSearch $ show ("getCookieAgain", akey) 414 tput XNodeinfoSearch $ show ("getCookieAgain", unpackAnnounceKey ann akey)
412 mbContact <- getC 415 mbContact <- getC
413 case mbContact of 416 case mbContact of
414 Nothing -> return $ return () 417 Nothing -> return $ return ()
diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs
index e24bb5df..42fcc67b 100644
--- a/dht/examples/dhtd.hs
+++ b/dht/examples/dhtd.hs
@@ -872,7 +872,6 @@ clientSession s@Session{..} sock cnum h = do
872 let kstr = unpackAnnounceKey announcer k 872 let kstr = unpackAnnounceKey announcer k
873 return [ if ptm==0 then "now" 873 return [ if ptm==0 then "now"
874 else show (ptm - now) 874 else show (ptm - now)
875 , show (itemStatusNum item)
876 , kstr 875 , kstr
877 ] 876 ]
878 hPutClient h $ showColumns rs 877 hPutClient h $ showColumns rs