summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2018-06-17 17:13:53 -0400
committerjoe <joe@jerkface.net>2018-06-17 17:15:20 -0400
commiteda92679a6d7d27ebb5757ba7056fc452384faac (patch)
tree571f07cb584561750d1fa234f15959ab99872b60
parent75a18e4cb814044a714aa3f487d2e6475de6127a (diff)
Factored out Tox-specific scheduling to Announcer.Tox.
-rw-r--r--Announcer.hs157
-rw-r--r--Announcer/Tox.hs176
-rw-r--r--ToxManager.hs3
-rw-r--r--dht-client.cabal1
-rw-r--r--examples/dhtd.hs3
5 files changed, 189 insertions, 151 deletions
diff --git a/Announcer.hs b/Announcer.hs
index 7fd72e2d..f0d65656 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -11,12 +11,15 @@ module Announcer
11 , AnnounceKey 11 , AnnounceKey
12 , packAnnounceKey 12 , packAnnounceKey
13 , unpackAnnounceKey 13 , unpackAnnounceKey
14 , AnnounceMethod(..)
15 , forkAnnouncer 14 , forkAnnouncer
16 , stopAnnouncer 15 , stopAnnouncer
17 , schedule
18 , cancel 16 , cancel
19 , itemStatusNum 17 , itemStatusNum
18
19 -- lower level, Announcer.Tox needs these.
20 , scheduleImmediately
21 , ScheduledItem(..)
22 , interrutible
20 ) where 23 ) where
21 24
22import qualified Data.MinMaxPSQ as MM 25import qualified Data.MinMaxPSQ as MM
@@ -86,14 +89,6 @@ data Announcer = Announcer
86 , interrutible :: InterruptibleDelay 89 , interrutible :: InterruptibleDelay
87 } 90 }
88 91
89announceK :: Int
90announceK = 8
91
92data AnnounceState = forall nid addr tok ni r. AnnounceState
93 { aState :: SearchState nid addr tok ni r
94 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
95 }
96
97-- | Schedules an event to occur long ago at the epoch (which effectively makes 92-- | Schedules an event to occur long ago at the epoch (which effectively makes
98-- the event happen as soon as possible). Note that the caller will usually 93-- the event happen as soon as possible). Note that the caller will usually
99-- also want to interrupt the 'interrutible' delay so that it finds this item 94-- also want to interrupt the 'interrutible' delay so that it finds this item
@@ -109,113 +104,8 @@ stopAnnouncer announcer = do
109 interruptDelay (interrutible announcer) 104 interruptDelay (interrutible announcer)
110 atomically $ readTVar (announcerActive announcer) >>= check . not 105 atomically $ readTVar (announcerActive announcer) >>= check . not
111 106
112-- | This type specifies an item that can be announced on appropriate nodes in 107cancel :: Announcer -> AnnounceKey -> IO ()
113-- a Kademlia network. 108cancel announcer k = do
114data AnnounceMethod r = forall nid ni sr addr tok a.
115 ( Show nid
116 , Hashable nid
117 , Hashable ni
118 , Ord addr
119 , Ord nid
120 , Ord ni
121 ) => AnnounceMethod
122 { aSearch :: Search nid addr tok ni sr
123 -- ^ This is the Kademlia search to run repeatedly to find the
124 -- nearby nodes. A new search is started whenever one is not
125 -- already in progress at announce time. Repeated searches are
126 -- likely to finish faster than the first since nearby nodes
127 -- are not discarded.
128 , aPublish :: Either (r -> sr -> IO ())
129 (r -> tok -> Maybe ni -> IO (Maybe a))
130 -- ^ The action to perform when we find nearby nodes. The
131 -- destination node is given as a Maybe so that methods that
132 -- treat 'Nothing' as loop-back address can be passed here,
133 -- however 'Nothing' will not be passed by the announcer
134 -- thread.
135 --
136 -- There are two cases:
137 --
138 -- [Left] The action to perform requires a search result.
139 -- This was implemented to support Tox's DHTKey and
140 -- Friend-Request messages.
141 --
142 -- [Right] The action requires a "token" from the destination
143 -- node. This is the more typical "announce" semantics for
144 -- Kademlia.
145 , aBuckets :: TVar (R.BucketList ni)
146 -- ^ Set this to the current Kademlia routing table buckets.
147 -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4?
148 , aTarget :: nid
149 -- ^ This is the Kademlia node-id of the item being announced.
150 , aInterval :: POSIXTime
151 -- ^ Assuming we have nearby nodes from the search, the item
152 -- will be announced at this interval.
153 --
154 -- Current implementation is to make the scheduled
155 -- announcements even if the search hasn't finished. It will
156 -- use the closest nodes found so far.
157 }
158
159-- | Schedule a recurring Search + Announce sequence.
160schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
161schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do
162 st <- atomically $ newSearch aSearch aTarget []
163 ns <- atomically $ newTVar MM.empty
164 let astate = AnnounceState st ns
165 publishToNodes is
166 | Left _ <- aPublish = return ()
167 | Right publish <- aPublish = do
168 forM_ is $ \(Binding ni mtok _) -> do
169 forM_ mtok $ \tok -> do
170 got <- publish r tok (Just ni)
171 now <- getPOSIXTime
172 forM_ got $ \_ -> do
173 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
174 announce = do -- publish to current search results
175 is <- atomically $ do
176 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
177 return $ MM.toList bs
178 publishToNodes is
179 onResult sr
180 | Right _ <- aPublish = return True
181 | Left sendit <- aPublish = do
182 scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do
183 got <- sendit r sr
184 -- If we had a way to get the source of a search result, we might want to
185 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
186 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
187 -- a message be forgotten.
188 --
189 -- forM_ got $ \_ -> do
190 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
191 return ()
192 return True -- True to keep searching.
193 searchAgain = do
194 -- Canceling a pending search here seems to make announcements more reliable.
195 searchCancel st
196 isfin <- searchIsFinished st -- Always True, since we canceled.
197 return $ when isfin $ void $ fork search
198 search = do -- thread to fork
199 atomically $ reset aBuckets aSearch aTarget st
200 searchLoop aSearch aTarget onResult st
201 fork $ do -- Announce to any nodes we haven't already announced to.
202 is <- atomically $ do
203 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
204 nq <- readTVar ns
205 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
206 $ MM.toList bs
207 publishToNodes is
208 return ()
209 {-
210 atomically $ scheduleImmediately announcer k
211 $ SearchFinished {- st -} search announce aInterval
212 interruptDelay (interrutible announcer)
213 -}
214 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
215 interruptDelay (interrutible announcer)
216
217cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
218cancel announcer k _ _ = do
219 atomically $ scheduleImmediately announcer k $ DeleteAnnouncement 109 atomically $ scheduleImmediately announcer k $ DeleteAnnouncement
220 interruptDelay (interrutible announcer) 110 interruptDelay (interrutible announcer)
221 111
@@ -273,37 +163,4 @@ performScheduledItem announcer now = \case
273 163
274 (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now 164 (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now
275 165
276-- announcement started:
277newAnnouncement :: STM (IO a)
278 -> IO ()
279 -> IO ()
280 -> POSIXTime
281 -> Announcer
282 -> AnnounceKey
283 -> POSIXTime
284 -> STM (IO ())
285newAnnouncement checkFin search announce interval = \announcer k now -> do
286 modifyTVar (scheduled announcer)
287 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
288 return $ void $ fork search
289
290-- time for periodic announce:
291-- (re-)announce to the current known set of storing-nodes.
292-- TODO: If the search is finished, restart the search.
293reAnnounce :: STM (IO a)
294 -> IO ()
295 -> POSIXTime
296 -> Announcer
297 -> AnnounceKey
298 -> POSIXTime
299 -> STM (IO ())
300reAnnounce checkFin announce interval = \announcer k now -> do
301 isfin <- checkFin
302 modifyTVar (scheduled announcer)
303 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
304 return $ do
305 isfin
306 hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now
307 announce
308
309 166
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs
new file mode 100644
index 00000000..eab974bc
--- /dev/null
+++ b/Announcer/Tox.hs
@@ -0,0 +1,176 @@
1{-# LANGUAGE DeriveDataTypeable #-}
2{-# LANGUAGE DeriveGeneric #-}
3{-# LANGUAGE ExistentialQuantification #-}
4{-# LANGUAGE FlexibleContexts #-}
5{-# LANGUAGE GeneralizedNewtypeDeriving #-}
6{-# LANGUAGE LambdaCase #-}
7{-# LANGUAGE NamedFieldPuns #-}
8{-# LANGUAGE NondecreasingIndentation #-}
9module Announcer.Tox where
10 -- , AnnounceMethod(..)
11 -- , schedule
12
13import Announcer
14import qualified Data.MinMaxPSQ as MM
15import Data.Wrapper.PSQ as PSQ
16import InterruptibleDelay
17import Network.Kademlia.Routing as R
18import Network.Kademlia.Search
19
20import Control.Concurrent.Lifted.Instrument
21import Control.Concurrent.STM
22import Control.Monad
23import Data.Hashable
24import Data.Maybe
25import Data.Ord
26import Data.Time.Clock.POSIX
27import System.IO
28
29
30announceK :: Int
31announceK = 8
32
33data AnnounceState = forall nid addr tok ni r. AnnounceState
34 { aState :: SearchState nid addr tok ni r
35 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
36 }
37
38-- | This type specifies an item that can be announced on appropriate nodes in
39-- a Kademlia network.
40data AnnounceMethod r = forall nid ni sr addr tok a.
41 ( Show nid
42 , Hashable nid
43 , Hashable ni
44 , Ord addr
45 , Ord nid
46 , Ord ni
47 ) => AnnounceMethod
48 { aSearch :: Search nid addr tok ni sr
49 -- ^ This is the Kademlia search to run repeatedly to find the
50 -- nearby nodes. A new search is started whenever one is not
51 -- already in progress at announce time. Repeated searches are
52 -- likely to finish faster than the first since nearby nodes
53 -- are not discarded.
54 , aPublish :: Either (r -> sr -> IO ())
55 (r -> tok -> Maybe ni -> IO (Maybe a))
56 -- ^ The action to perform when we find nearby nodes. The
57 -- destination node is given as a Maybe so that methods that
58 -- treat 'Nothing' as loop-back address can be passed here,
59 -- however 'Nothing' will not be passed by the announcer
60 -- thread.
61 --
62 -- There are two cases:
63 --
64 -- [Left] The action to perform requires a search result.
65 -- This was implemented to support Tox's DHTKey and
66 -- Friend-Request messages.
67 --
68 -- [Right] The action requires a "token" from the destination
69 -- node. This is the more typical "announce" semantics for
70 -- Kademlia.
71 , aBuckets :: TVar (R.BucketList ni)
72 -- ^ Set this to the current Kademlia routing table buckets.
73 -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4?
74 , aTarget :: nid
75 -- ^ This is the Kademlia node-id of the item being announced.
76 , aInterval :: POSIXTime
77 -- ^ Assuming we have nearby nodes from the search, the item
78 -- will be announced at this interval.
79 --
80 -- Current implementation is to make the scheduled
81 -- announcements even if the search hasn't finished. It will
82 -- use the closest nodes found so far.
83 }
84
85
86-- announcement started:
87newAnnouncement :: STM (IO a)
88 -> IO ()
89 -> IO ()
90 -> POSIXTime
91 -> Announcer
92 -> AnnounceKey
93 -> POSIXTime
94 -> STM (IO ())
95newAnnouncement checkFin search announce interval = \announcer k now -> do
96 modifyTVar (scheduled announcer)
97 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
98 return $ void $ fork search
99
100-- time for periodic announce:
101-- (re-)announce to the current known set of storing-nodes.
102-- TODO: If the search is finished, restart the search.
103reAnnounce :: STM (IO a)
104 -> IO ()
105 -> POSIXTime
106 -> Announcer
107 -> AnnounceKey
108 -> POSIXTime
109 -> STM (IO ())
110reAnnounce checkFin announce interval = \announcer k now -> do
111 isfin <- checkFin
112 modifyTVar (scheduled announcer)
113 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
114 return $ do
115 isfin
116 hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now
117 announce
118
119-- | Schedule a recurring Search + Announce sequence.
120schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
121schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do
122 st <- atomically $ newSearch aSearch aTarget []
123 ns <- atomically $ newTVar MM.empty
124 let astate = AnnounceState st ns
125 publishToNodes is
126 | Left _ <- aPublish = return ()
127 | Right publish <- aPublish = do
128 forM_ is $ \(Binding ni mtok _) -> do
129 forM_ mtok $ \tok -> do
130 got <- publish r tok (Just ni)
131 now <- getPOSIXTime
132 forM_ got $ \_ -> do
133 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
134 announce = do -- publish to current search results
135 is <- atomically $ do
136 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
137 return $ MM.toList bs
138 publishToNodes is
139 onResult sr
140 | Right _ <- aPublish = return True
141 | Left sendit <- aPublish = do
142 scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do
143 got <- sendit r sr
144 -- If we had a way to get the source of a search result, we might want to
145 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
146 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
147 -- a message be forgotten.
148 --
149 -- forM_ got $ \_ -> do
150 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
151 return ()
152 return True -- True to keep searching.
153 searchAgain = do
154 -- Canceling a pending search here seems to make announcements more reliable.
155 searchCancel st
156 isfin <- searchIsFinished st -- Always True, since we canceled.
157 return $ when isfin $ void $ fork search
158 search = do -- thread to fork
159 atomically $ reset aBuckets aSearch aTarget st
160 searchLoop aSearch aTarget onResult st
161 fork $ do -- Announce to any nodes we haven't already announced to.
162 is <- atomically $ do
163 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
164 nq <- readTVar ns
165 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
166 $ MM.toList bs
167 publishToNodes is
168 return ()
169 {-
170 atomically $ scheduleImmediately announcer k
171 $ SearchFinished {- st -} search announce aInterval
172 interruptDelay (interrutible announcer)
173 -}
174 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
175 interruptDelay (interrutible announcer)
176
diff --git a/ToxManager.hs b/ToxManager.hs
index 81def17f..9b730f55 100644
--- a/ToxManager.hs
+++ b/ToxManager.hs
@@ -4,6 +4,7 @@
4module ToxManager where 4module ToxManager where
5 5
6import Announcer 6import Announcer
7import Announcer.Tox
7import Connection 8import Connection
8-- import Control.Concurrent 9-- import Control.Concurrent
9import Control.Concurrent.STM 10import Control.Concurrent.STM
@@ -124,12 +125,14 @@ toxman announcer toxbkts tox presence = ToxManager
124 forM_ kbkts $ \(akey,bkts) -> do 125 forM_ kbkts $ \(akey,bkts) -> do
125 cancel announcer 126 cancel announcer
126 akey 127 akey
128 {-
127 (AnnounceMethod (toxQSearch tox) 129 (AnnounceMethod (toxQSearch tox)
128 (Right $ toxAnnounceSendData tox) 130 (Right $ toxAnnounceSendData tox)
129 bkts 131 bkts
130 pubid 132 pubid
131 toxAnnounceInterval) 133 toxAnnounceInterval)
132 pub 134 pub
135 -}
133 136
134 , setToxConnectionPolicy = \me them p -> do 137 , setToxConnectionPolicy = \me them p -> do
135 let m = do meid <- readMaybe $ T.unpack $ T.take 43 me 138 let m = do meid <- readMaybe $ T.unpack $ T.take 43 me
diff --git a/dht-client.cabal b/dht-client.cabal
index 9dc5ceb9..3169cabd 100644
--- a/dht-client.cabal
+++ b/dht-client.cabal
@@ -109,6 +109,7 @@ library
109 Text.XXD 109 Text.XXD
110 Network.Tox.ContactInfo 110 Network.Tox.ContactInfo
111 Announcer 111 Announcer
112 Announcer.Tox
112 InterruptibleDelay 113 InterruptibleDelay
113 ByteStringOperators 114 ByteStringOperators
114 ClientState 115 ClientState
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 28e9f261..ac78d552 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -67,6 +67,7 @@ import System.Posix.Signals
67 67
68 68
69import Announcer 69import Announcer
70import Announcer.Tox
70import ToxManager 71import ToxManager
71import Crypto.Tox -- (zeros32,SecretKey,PublicKey, generateSecretKey, toPublic, encodeSecret, decodeSecret, userKeys) 72import Crypto.Tox -- (zeros32,SecretKey,PublicKey, generateSecretKey, toPublic, encodeSecret, decodeSecret, userKeys)
72import Network.UPNP as UPNP 73import Network.UPNP as UPNP
@@ -1009,7 +1010,7 @@ clientSession s@Session{..} sock cnum h = do
1009 dhtQuery 1010 dhtQuery
1010 doit :: Char -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 1011 doit :: Char -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
1011 doit '+' = schedule 1012 doit '+' = schedule
1012 doit '-' = cancel 1013 doit '-' = \a k _ _ -> cancel a k
1013 doit _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?" 1014 doit _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?"
1014 matchingResult :: 1015 matchingResult ::
1015 ( Typeable stok 1016 ( Typeable stok