diff options
Diffstat (limited to 'Announcer')
-rw-r--r-- | Announcer/Tox.hs | 102 |
1 files changed, 71 insertions, 31 deletions
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs index 03cd0871..60a5a8b1 100644 --- a/Announcer/Tox.hs +++ b/Announcer/Tox.hs | |||
@@ -50,23 +50,16 @@ data AnnounceMethod r = forall nid ni sr addr tok a. | |||
50 | -- already in progress at announce time. Repeated searches are | 50 | -- already in progress at announce time. Repeated searches are |
51 | -- likely to finish faster than the first since nearby nodes | 51 | -- likely to finish faster than the first since nearby nodes |
52 | -- are not discarded. | 52 | -- are not discarded. |
53 | , aPublish :: Either (r -> sr -> IO ()) | 53 | , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a) |
54 | (r -> tok -> Maybe ni -> IO (Maybe a)) | ||
55 | -- ^ The action to perform when we find nearby nodes. The | 54 | -- ^ The action to perform when we find nearby nodes. The |
56 | -- destination node is given as a Maybe so that methods that | 55 | -- destination node is given as a Maybe so that methods that |
57 | -- treat 'Nothing' as loop-back address can be passed here, | 56 | -- treat 'Nothing' as loop-back address can be passed here, |
58 | -- however 'Nothing' will not be passed by the announcer | 57 | -- however 'Nothing' will not be passed by the announcer |
59 | -- thread. | 58 | -- thread. |
60 | -- | 59 | -- |
61 | -- There are two cases: | 60 | -- The action requires a "token" from the destination |
62 | -- | 61 | -- node. This is the more typical "announce" semantics for |
63 | -- [Left] The action to perform requires a search result. | 62 | -- Kademlia. |
64 | -- This was implemented to support Tox's DHTKey and | ||
65 | -- Friend-Request messages. | ||
66 | -- | ||
67 | -- [Right] The action requires a "token" from the destination | ||
68 | -- node. This is the more typical "announce" semantics for | ||
69 | -- Kademlia. | ||
70 | , aNearestNodes :: nid -> STM [ni] | 63 | , aNearestNodes :: nid -> STM [ni] |
71 | -- ^ Method to obtain starting nodes from an iterative Kademlia search. | 64 | -- ^ Method to obtain starting nodes from an iterative Kademlia search. |
72 | , aTarget :: nid | 65 | , aTarget :: nid |
@@ -80,6 +73,38 @@ data AnnounceMethod r = forall nid ni sr addr tok a. | |||
80 | -- use the closest nodes found so far. | 73 | -- use the closest nodes found so far. |
81 | } | 74 | } |
82 | 75 | ||
76 | -- | This type specifies a Kademlia search and an action to perform upon the result. | ||
77 | data SearchMethod r = forall nid ni sr addr tok a. | ||
78 | ( Show nid | ||
79 | , Hashable nid | ||
80 | , Hashable ni | ||
81 | , Ord addr | ||
82 | , Ord nid | ||
83 | , Ord ni | ||
84 | ) => SearchMethod | ||
85 | { sSearch :: Search nid addr tok ni sr | ||
86 | -- ^ This is the Kademlia search to run repeatedly to find the | ||
87 | -- nearby nodes. A new search is started whenever one is not | ||
88 | -- already in progress at announce time. Repeated searches are | ||
89 | -- likely to finish faster than the first since nearby nodes | ||
90 | -- are not discarded. | ||
91 | -- | ||
92 | -- XXX: Currently, "repeatedly" is wrong. | ||
93 | , sWithResult :: r -> sr -> IO () | ||
94 | -- ^ | ||
95 | -- The action to perform upon a search result. This was | ||
96 | -- implemented to support Tox's DHTKey and Friend-Request | ||
97 | -- messages. | ||
98 | , sNearestNodes :: nid -> STM [ni] | ||
99 | -- ^ Method to obtain starting nodes from an iterative Kademlia search. | ||
100 | , sTarget :: nid | ||
101 | -- ^ This is the Kademlia node-id of the item being announced. | ||
102 | , sInterval :: POSIXTime | ||
103 | -- ^ The time between searches. | ||
104 | -- | ||
105 | -- XXX: Currently, search results will stop any repetition. | ||
106 | } | ||
107 | |||
83 | 108 | ||
84 | -- announcement started: | 109 | -- announcement started: |
85 | newAnnouncement :: STM (IO a) | 110 | newAnnouncement :: STM (IO a) |
@@ -113,17 +138,15 @@ reAnnounce checkFin announce interval = \announcer k now -> do | |||
113 | announce | 138 | announce |
114 | 139 | ||
115 | -- | Schedule a recurring Search + Announce sequence. | 140 | -- | Schedule a recurring Search + Announce sequence. |
116 | schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | 141 | scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () |
117 | schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do | 142 | scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do |
118 | st <- atomically $ newSearch aSearch aTarget [] | 143 | st <- atomically $ newSearch aSearch aTarget [] |
119 | ns <- atomically $ newTVar MM.empty | 144 | ns <- atomically $ newTVar MM.empty |
120 | let astate = AnnounceState st ns | 145 | let astate = AnnounceState st ns |
121 | publishToNodes is | 146 | publishToNodes is = do |
122 | | Left _ <- aPublish = return () | ||
123 | | Right publish <- aPublish = do | ||
124 | forM_ is $ \(Binding ni mtok _) -> do | 147 | forM_ is $ \(Binding ni mtok _) -> do |
125 | forM_ mtok $ \tok -> do | 148 | forM_ mtok $ \tok -> do |
126 | got <- publish r tok (Just ni) | 149 | got <- aPublish r tok (Just ni) |
127 | now <- getPOSIXTime | 150 | now <- getPOSIXTime |
128 | forM_ got $ \_ -> do | 151 | forM_ got $ \_ -> do |
129 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | 152 | atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) |
@@ -132,20 +155,7 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInte | |||
132 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | 155 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) |
133 | return $ MM.toList bs | 156 | return $ MM.toList bs |
134 | publishToNodes is | 157 | publishToNodes is |
135 | onResult sr | 158 | onResult sr = return True |
136 | | Right _ <- aPublish = return True | ||
137 | | Left sendit <- aPublish = do | ||
138 | scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do | ||
139 | got <- sendit r sr | ||
140 | -- If we had a way to get the source of a search result, we might want to | ||
141 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | ||
142 | -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent | ||
143 | -- a message be forgotten. | ||
144 | -- | ||
145 | -- forM_ got $ \_ -> do | ||
146 | -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
147 | return () | ||
148 | return True -- True to keep searching. | ||
149 | searchAgain = do | 159 | searchAgain = do |
150 | -- Canceling a pending search here seems to make announcements more reliable. | 160 | -- Canceling a pending search here seems to make announcements more reliable. |
151 | searchCancel st | 161 | searchCancel st |
@@ -170,3 +180,33 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInte | |||
170 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) | 180 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) |
171 | interruptDelay (interrutible announcer) | 181 | interruptDelay (interrutible announcer) |
172 | 182 | ||
183 | -- | Schedule a recurring Search + Publish sequence. | ||
184 | scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () | ||
185 | scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do | ||
186 | st <- atomically $ newSearch sSearch sTarget [] | ||
187 | ns <- atomically $ newTVar MM.empty | ||
188 | let astate = AnnounceState st ns | ||
189 | onResult sr = do | ||
190 | -- XXX: Using /k/ here as the announce key is causing the search not to repeat. | ||
191 | scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do | ||
192 | got <- sWithResult r sr | ||
193 | -- If we had a way to get the source of a search result, we might want to | ||
194 | -- treat it similarly to an announcing node and remember it in the 'aStoringNodes' | ||
195 | -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent | ||
196 | -- a message be forgotten. | ||
197 | -- | ||
198 | -- forM_ got $ \_ -> do | ||
199 | -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) | ||
200 | return () | ||
201 | return True -- True to keep searching. | ||
202 | searchAgain = do | ||
203 | -- Canceling a pending search here seems to make announcements more reliable. | ||
204 | searchCancel st | ||
205 | isfin <- searchIsFinished st -- Always True, since we canceled. | ||
206 | return $ when isfin $ void $ fork search | ||
207 | search = do -- thread to fork | ||
208 | atomically $ reset sNearestNodes sSearch sTarget st | ||
209 | searchLoop sSearch sTarget onResult st | ||
210 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) | ||
211 | interruptDelay (interrutible announcer) | ||
212 | |||