diff options
-rw-r--r-- | Announcer.hs | 157 | ||||
-rw-r--r-- | Announcer/Tox.hs | 176 | ||||
-rw-r--r-- | ToxManager.hs | 3 | ||||
-rw-r--r-- | dht-client.cabal | 1 | ||||
-rw-r--r-- | examples/dhtd.hs | 3 |
5 files changed, 189 insertions, 151 deletions
diff --git a/Announcer.hs b/Announcer.hs index 7fd72e2d..f0d65656 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -11,12 +11,15 @@ module Announcer | |||
11 | , AnnounceKey | 11 | , AnnounceKey |
12 | , packAnnounceKey | 12 | , packAnnounceKey |
13 | , unpackAnnounceKey | 13 | , unpackAnnounceKey |
14 | , AnnounceMethod(..) | ||
15 | , forkAnnouncer | 14 | , forkAnnouncer |
16 | , stopAnnouncer | 15 | , stopAnnouncer |
17 | , schedule | ||
18 | , cancel | 16 | , cancel |
19 | , itemStatusNum | 17 | , itemStatusNum |
18 | |||
19 | -- lower level, Announcer.Tox needs these. | ||
20 | , scheduleImmediately | ||
21 | , ScheduledItem(..) | ||
22 | , interrutible | ||
20 | ) where | 23 | ) where |
21 | 24 | ||
22 | import qualified Data.MinMaxPSQ as MM | 25 | import qualified Data.MinMaxPSQ as MM |
@@ -86,14 +89,6 @@ data Announcer = Announcer | |||
86 | , interrutible :: InterruptibleDelay | 89 | , interrutible :: InterruptibleDelay |
87 | } | 90 | } |
88 | 91 | ||
89 | announceK :: Int | ||
90 | announceK = 8 | ||
91 | |||
92 | data AnnounceState = forall nid addr tok ni r. AnnounceState | ||
93 | { aState :: SearchState nid addr tok ni r | ||
94 | , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) | ||
95 | } | ||
96 | |||
97 | -- | Schedules an event to occur long ago at the epoch (which effectively makes | 92 | -- | Schedules an event to occur long ago at the epoch (which effectively makes |
98 | -- the event happen as soon as possible). Note that the caller will usually | 93 | -- the event happen as soon as possible). Note that the caller will usually |
99 | -- also want to interrupt the 'interrutible' delay so that it finds this item | 94 | -- also want to interrupt the 'interrutible' delay so that it finds this item |
@@ -109,113 +104,8 @@ stopAnnouncer announcer = do | |||
109 | interruptDelay (interrutible announcer) | 104 | interruptDelay (interrutible announcer) |
110 | atomically $ readTVar (announcerActive announcer) >>= check . not | 105 | atomically $ readTVar (announcerActive announcer) >>= check . not |
111 | 106 | ||
112 | -- | This type specifies an item that can be announced on appropriate nodes in | 107 | cancel :: Announcer -> AnnounceKey -> IO () |
113 | -- a Kademlia network. | 108 | cancel announcer k = do |
114 | data AnnounceMethod r = forall nid ni sr addr tok a. | ||
115 | ( Show nid | ||
116 | , Hashable nid | ||
117 | , Hashable ni | ||
118 | , Ord addr | ||
119 | , Ord nid | ||
120 | , Ord ni | ||
121 | ) => AnnounceMethod | ||
122 | { aSearch :: Search nid addr tok ni sr | ||
123 | -- ^ This is the Kademlia search to run repeatedly to find the | ||
124 | -- nearby nodes. A new search is started whenever one is not | ||
125 | -- already in progress at announce time. Repeated searches are | ||
126 | -- likely to finish faster than the first since nearby nodes | ||
127 | -- are not discarded. | ||
128 | , aPublish :: Either (r -> sr -> IO ()) | ||
129 | (r -> tok -> Maybe ni -> IO (Maybe a)) | ||
130 | -- ^ The action to perform when we find nearby nodes. The | ||
131 | -- destination node is given as a Maybe so that methods that | ||
132 | -- treat 'Nothing' as loop-back address can be passed here, | ||
133 | -- however 'Nothing' will not be passed by the announcer | ||
134 | -- thread. | ||
135 | -- | ||
136 | -- There are two cases: | ||
137 | -- | ||
138 | -- [Left] The action to perform requires a search result. | ||
139 | -- This was implemented to support Tox's DHTKey and | ||
140 | -- Friend-Request messages. | ||
141 | -- | ||
142 | -- [Right] The action requires a "token" from the destination | ||
143 | -- node. This is the more typical "announce" semantics for | ||
144 | -- Kademlia. | ||
145 | , aBuckets :: TVar (R.BucketList ni) | ||
146 | -- ^ Set this to the current Kademlia routing table buckets. | ||
147 | -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4? | ||
148 | , aTarget :: nid | ||
149 | -- ^ This is the Kademlia node-id of the item being announced. | ||
150 | , aInterval :: POSIXTime | ||
151 | -- ^ Assuming we have nearby nodes from the search, the item | ||
152 | -- will be announced at this interval. | ||
153 | -- | ||
154 | -- Current implementation is to make the scheduled | ||
155 | -- announcements even if the search hasn't finished. It will | ||
156 | -- use the closest nodes found so far. | ||
157 | } | ||
158 | |||
159 | -- | Schedule a recurring Search + Announce sequence. | ||
160 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
161 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do | ||
162 | st <- atomically $ newSearch aSearch aTarget [] | ||
163 | ns <- atomically $ newTVar MM.empty | ||
164 | let astate = AnnounceState st ns | ||
165 | publishToNodes is | ||
166 | | Left _ <- aPublish = return () | ||
167 | | Right publish <- aPublish = do | ||
168 | forM_ is $ \(Binding ni mtok _) -> do | ||
169 | forM_ mtok $ \tok -> do | ||
170 | got <- publish r tok (Just ni) | ||
171 | now <- getPOSIXTime | ||
172 | forM_ got $ \_ -> do | ||
173 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
174 | announce = do -- publish to current search results | ||
175 | is <- atomically $ do | ||
176 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
177 | return $ MM.toList bs | ||
178 | publishToNodes is | ||
179 | onResult sr | ||
180 | | Right _ <- aPublish = return True | ||
181 | | Left sendit <- aPublish = do | ||
182 | scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do | ||
183 | got <- sendit r sr | ||
184 | -- If we had a way to get the source of a search result, we might want to | ||
185 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | ||
186 | -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent | ||
187 | -- a message be forgotten. | ||
188 | -- | ||
189 | -- forM_ got $ \_ -> do | ||
190 | -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
191 | return () | ||
192 | return True -- True to keep searching. | ||
193 | searchAgain = do | ||
194 | -- Canceling a pending search here seems to make announcements more reliable. | ||
195 | searchCancel st | ||
196 | isfin <- searchIsFinished st -- Always True, since we canceled. | ||
197 | return $ when isfin $ void $ fork search | ||
198 | search = do -- thread to fork | ||
199 | atomically $ reset aBuckets aSearch aTarget st | ||
200 | searchLoop aSearch aTarget onResult st | ||
201 | fork $ do -- Announce to any nodes we haven't already announced to. | ||
202 | is <- atomically $ do | ||
203 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
204 | nq <- readTVar ns | ||
205 | return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) | ||
206 | $ MM.toList bs | ||
207 | publishToNodes is | ||
208 | return () | ||
209 | {- | ||
210 | atomically $ scheduleImmediately announcer k | ||
211 | $ SearchFinished {- st -} search announce aInterval | ||
212 | interruptDelay (interrutible announcer) | ||
213 | -} | ||
214 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) | ||
215 | interruptDelay (interrutible announcer) | ||
216 | |||
217 | cancel :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
218 | cancel announcer k _ _ = do | ||
219 | atomically $ scheduleImmediately announcer k $ DeleteAnnouncement | 109 | atomically $ scheduleImmediately announcer k $ DeleteAnnouncement |
220 | interruptDelay (interrutible announcer) | 110 | interruptDelay (interrutible announcer) |
221 | 111 | ||
@@ -273,37 +163,4 @@ performScheduledItem announcer now = \case | |||
273 | 163 | ||
274 | (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now | 164 | (Binding k (ScheduledItem action) _) -> Just <$> action announcer k now |
275 | 165 | ||
276 | -- announcement started: | ||
277 | newAnnouncement :: STM (IO a) | ||
278 | -> IO () | ||
279 | -> IO () | ||
280 | -> POSIXTime | ||
281 | -> Announcer | ||
282 | -> AnnounceKey | ||
283 | -> POSIXTime | ||
284 | -> STM (IO ()) | ||
285 | newAnnouncement checkFin search announce interval = \announcer k now -> do | ||
286 | modifyTVar (scheduled announcer) | ||
287 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
288 | return $ void $ fork search | ||
289 | |||
290 | -- time for periodic announce: | ||
291 | -- (re-)announce to the current known set of storing-nodes. | ||
292 | -- TODO: If the search is finished, restart the search. | ||
293 | reAnnounce :: STM (IO a) | ||
294 | -> IO () | ||
295 | -> POSIXTime | ||
296 | -> Announcer | ||
297 | -> AnnounceKey | ||
298 | -> POSIXTime | ||
299 | -> STM (IO ()) | ||
300 | reAnnounce checkFin announce interval = \announcer k now -> do | ||
301 | isfin <- checkFin | ||
302 | modifyTVar (scheduled announcer) | ||
303 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
304 | return $ do | ||
305 | isfin | ||
306 | hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now | ||
307 | announce | ||
308 | |||
309 | 166 | ||
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs new file mode 100644 index 00000000..eab974bc --- /dev/null +++ b/Announcer/Tox.hs | |||
@@ -0,0 +1,176 @@ | |||
1 | {-# LANGUAGE DeriveDataTypeable #-} | ||
2 | {-# LANGUAGE DeriveGeneric #-} | ||
3 | {-# LANGUAGE ExistentialQuantification #-} | ||
4 | {-# LANGUAGE FlexibleContexts #-} | ||
5 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | {-# LANGUAGE NamedFieldPuns #-} | ||
8 | {-# LANGUAGE NondecreasingIndentation #-} | ||
9 | module Announcer.Tox where | ||
10 | -- , AnnounceMethod(..) | ||
11 | -- , schedule | ||
12 | |||
13 | import Announcer | ||
14 | import qualified Data.MinMaxPSQ as MM | ||
15 | import Data.Wrapper.PSQ as PSQ | ||
16 | import InterruptibleDelay | ||
17 | import Network.Kademlia.Routing as R | ||
18 | import Network.Kademlia.Search | ||
19 | |||
20 | import Control.Concurrent.Lifted.Instrument | ||
21 | import Control.Concurrent.STM | ||
22 | import Control.Monad | ||
23 | import Data.Hashable | ||
24 | import Data.Maybe | ||
25 | import Data.Ord | ||
26 | import Data.Time.Clock.POSIX | ||
27 | import System.IO | ||
28 | |||
29 | |||
30 | announceK :: Int | ||
31 | announceK = 8 | ||
32 | |||
33 | data AnnounceState = forall nid addr tok ni r. AnnounceState | ||
34 | { aState :: SearchState nid addr tok ni r | ||
35 | , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) | ||
36 | } | ||
37 | |||
38 | -- | This type specifies an item that can be announced on appropriate nodes in | ||
39 | -- a Kademlia network. | ||
40 | data AnnounceMethod r = forall nid ni sr addr tok a. | ||
41 | ( Show nid | ||
42 | , Hashable nid | ||
43 | , Hashable ni | ||
44 | , Ord addr | ||
45 | , Ord nid | ||
46 | , Ord ni | ||
47 | ) => AnnounceMethod | ||
48 | { aSearch :: Search nid addr tok ni sr | ||
49 | -- ^ This is the Kademlia search to run repeatedly to find the | ||
50 | -- nearby nodes. A new search is started whenever one is not | ||
51 | -- already in progress at announce time. Repeated searches are | ||
52 | -- likely to finish faster than the first since nearby nodes | ||
53 | -- are not discarded. | ||
54 | , aPublish :: Either (r -> sr -> IO ()) | ||
55 | (r -> tok -> Maybe ni -> IO (Maybe a)) | ||
56 | -- ^ The action to perform when we find nearby nodes. The | ||
57 | -- destination node is given as a Maybe so that methods that | ||
58 | -- treat 'Nothing' as loop-back address can be passed here, | ||
59 | -- however 'Nothing' will not be passed by the announcer | ||
60 | -- thread. | ||
61 | -- | ||
62 | -- There are two cases: | ||
63 | -- | ||
64 | -- [Left] The action to perform requires a search result. | ||
65 | -- This was implemented to support Tox's DHTKey and | ||
66 | -- Friend-Request messages. | ||
67 | -- | ||
68 | -- [Right] The action requires a "token" from the destination | ||
69 | -- node. This is the more typical "announce" semantics for | ||
70 | -- Kademlia. | ||
71 | , aBuckets :: TVar (R.BucketList ni) | ||
72 | -- ^ Set this to the current Kademlia routing table buckets. | ||
73 | -- TODO: List of TVars to have separate routing tables for IPv6 and IPv4? | ||
74 | , aTarget :: nid | ||
75 | -- ^ This is the Kademlia node-id of the item being announced. | ||
76 | , aInterval :: POSIXTime | ||
77 | -- ^ Assuming we have nearby nodes from the search, the item | ||
78 | -- will be announced at this interval. | ||
79 | -- | ||
80 | -- Current implementation is to make the scheduled | ||
81 | -- announcements even if the search hasn't finished. It will | ||
82 | -- use the closest nodes found so far. | ||
83 | } | ||
84 | |||
85 | |||
86 | -- announcement started: | ||
87 | newAnnouncement :: STM (IO a) | ||
88 | -> IO () | ||
89 | -> IO () | ||
90 | -> POSIXTime | ||
91 | -> Announcer | ||
92 | -> AnnounceKey | ||
93 | -> POSIXTime | ||
94 | -> STM (IO ()) | ||
95 | newAnnouncement checkFin search announce interval = \announcer k now -> do | ||
96 | modifyTVar (scheduled announcer) | ||
97 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
98 | return $ void $ fork search | ||
99 | |||
100 | -- time for periodic announce: | ||
101 | -- (re-)announce to the current known set of storing-nodes. | ||
102 | -- TODO: If the search is finished, restart the search. | ||
103 | reAnnounce :: STM (IO a) | ||
104 | -> IO () | ||
105 | -> POSIXTime | ||
106 | -> Announcer | ||
107 | -> AnnounceKey | ||
108 | -> POSIXTime | ||
109 | -> STM (IO ()) | ||
110 | reAnnounce checkFin announce interval = \announcer k now -> do | ||
111 | isfin <- checkFin | ||
112 | modifyTVar (scheduled announcer) | ||
113 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
114 | return $ do | ||
115 | isfin | ||
116 | hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now | ||
117 | announce | ||
118 | |||
119 | -- | Schedule a recurring Search + Announce sequence. | ||
120 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | ||
121 | schedule announcer k AnnounceMethod{aSearch,aPublish,aBuckets,aTarget,aInterval} r = do | ||
122 | st <- atomically $ newSearch aSearch aTarget [] | ||
123 | ns <- atomically $ newTVar MM.empty | ||
124 | let astate = AnnounceState st ns | ||
125 | publishToNodes is | ||
126 | | Left _ <- aPublish = return () | ||
127 | | Right publish <- aPublish = do | ||
128 | forM_ is $ \(Binding ni mtok _) -> do | ||
129 | forM_ mtok $ \tok -> do | ||
130 | got <- publish r tok (Just ni) | ||
131 | now <- getPOSIXTime | ||
132 | forM_ got $ \_ -> do | ||
133 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
134 | announce = do -- publish to current search results | ||
135 | is <- atomically $ do | ||
136 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
137 | return $ MM.toList bs | ||
138 | publishToNodes is | ||
139 | onResult sr | ||
140 | | Right _ <- aPublish = return True | ||
141 | | Left sendit <- aPublish = do | ||
142 | scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do | ||
143 | got <- sendit r sr | ||
144 | -- If we had a way to get the source of a search result, we might want to | ||
145 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | ||
146 | -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent | ||
147 | -- a message be forgotten. | ||
148 | -- | ||
149 | -- forM_ got $ \_ -> do | ||
150 | -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
151 | return () | ||
152 | return True -- True to keep searching. | ||
153 | searchAgain = do | ||
154 | -- Canceling a pending search here seems to make announcements more reliable. | ||
155 | searchCancel st | ||
156 | isfin <- searchIsFinished st -- Always True, since we canceled. | ||
157 | return $ when isfin $ void $ fork search | ||
158 | search = do -- thread to fork | ||
159 | atomically $ reset aBuckets aSearch aTarget st | ||
160 | searchLoop aSearch aTarget onResult st | ||
161 | fork $ do -- Announce to any nodes we haven't already announced to. | ||
162 | is <- atomically $ do | ||
163 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | ||
164 | nq <- readTVar ns | ||
165 | return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) | ||
166 | $ MM.toList bs | ||
167 | publishToNodes is | ||
168 | return () | ||
169 | {- | ||
170 | atomically $ scheduleImmediately announcer k | ||
171 | $ SearchFinished {- st -} search announce aInterval | ||
172 | interruptDelay (interrutible announcer) | ||
173 | -} | ||
174 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) | ||
175 | interruptDelay (interrutible announcer) | ||
176 | |||
diff --git a/ToxManager.hs b/ToxManager.hs index 81def17f..9b730f55 100644 --- a/ToxManager.hs +++ b/ToxManager.hs | |||
@@ -4,6 +4,7 @@ | |||
4 | module ToxManager where | 4 | module ToxManager where |
5 | 5 | ||
6 | import Announcer | 6 | import Announcer |
7 | import Announcer.Tox | ||
7 | import Connection | 8 | import Connection |
8 | -- import Control.Concurrent | 9 | -- import Control.Concurrent |
9 | import Control.Concurrent.STM | 10 | import Control.Concurrent.STM |
@@ -124,12 +125,14 @@ toxman announcer toxbkts tox presence = ToxManager | |||
124 | forM_ kbkts $ \(akey,bkts) -> do | 125 | forM_ kbkts $ \(akey,bkts) -> do |
125 | cancel announcer | 126 | cancel announcer |
126 | akey | 127 | akey |
128 | {- | ||
127 | (AnnounceMethod (toxQSearch tox) | 129 | (AnnounceMethod (toxQSearch tox) |
128 | (Right $ toxAnnounceSendData tox) | 130 | (Right $ toxAnnounceSendData tox) |
129 | bkts | 131 | bkts |
130 | pubid | 132 | pubid |
131 | toxAnnounceInterval) | 133 | toxAnnounceInterval) |
132 | pub | 134 | pub |
135 | -} | ||
133 | 136 | ||
134 | , setToxConnectionPolicy = \me them p -> do | 137 | , setToxConnectionPolicy = \me them p -> do |
135 | let m = do meid <- readMaybe $ T.unpack $ T.take 43 me | 138 | let m = do meid <- readMaybe $ T.unpack $ T.take 43 me |
diff --git a/dht-client.cabal b/dht-client.cabal index 9dc5ceb9..3169cabd 100644 --- a/dht-client.cabal +++ b/dht-client.cabal | |||
@@ -109,6 +109,7 @@ library | |||
109 | Text.XXD | 109 | Text.XXD |
110 | Network.Tox.ContactInfo | 110 | Network.Tox.ContactInfo |
111 | Announcer | 111 | Announcer |
112 | Announcer.Tox | ||
112 | InterruptibleDelay | 113 | InterruptibleDelay |
113 | ByteStringOperators | 114 | ByteStringOperators |
114 | ClientState | 115 | ClientState |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 28e9f261..ac78d552 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -67,6 +67,7 @@ import System.Posix.Signals | |||
67 | 67 | ||
68 | 68 | ||
69 | import Announcer | 69 | import Announcer |
70 | import Announcer.Tox | ||
70 | import ToxManager | 71 | import ToxManager |
71 | import Crypto.Tox -- (zeros32,SecretKey,PublicKey, generateSecretKey, toPublic, encodeSecret, decodeSecret, userKeys) | 72 | import Crypto.Tox -- (zeros32,SecretKey,PublicKey, generateSecretKey, toPublic, encodeSecret, decodeSecret, userKeys) |
72 | import Network.UPNP as UPNP | 73 | import Network.UPNP as UPNP |
@@ -1009,7 +1010,7 @@ clientSession s@Session{..} sock cnum h = do | |||
1009 | dhtQuery | 1010 | dhtQuery |
1010 | doit :: Char -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 1011 | doit :: Char -> Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
1011 | doit '+' = schedule | 1012 | doit '+' = schedule |
1012 | doit '-' = cancel | 1013 | doit '-' = \a k _ _ -> cancel a k |
1013 | doit _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?" | 1014 | doit _ = \_ _ _ _ -> hPutClientChunk h "Starting(+) or canceling(-)?" |
1014 | matchingResult :: | 1015 | matchingResult :: |
1015 | ( Typeable stok | 1016 | ( Typeable stok |