summaryrefslogtreecommitdiff
path: root/Announcer
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2018-06-18 13:32:02 -0400
committerjoe <joe@jerkface.net>2018-06-18 13:32:02 -0400
commita45b000e07a806e171f1e4701abd3e025382ecf3 (patch)
tree52b562e145a2c19ba255a73fdf914af953a39e11 /Announcer
parent19364a287f7083fc60beed2d6eae3dd71d27e737 (diff)
Factored separate code-paths for Kademlia announce versus action
on search result.
Diffstat (limited to 'Announcer')
-rw-r--r--Announcer/Tox.hs102
1 files changed, 71 insertions, 31 deletions
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs
index c9164d22..f79f8ee7 100644
--- a/Announcer/Tox.hs
+++ b/Announcer/Tox.hs
@@ -50,23 +50,16 @@ data AnnounceMethod r = forall nid ni sr addr tok a.
50 -- already in progress at announce time. Repeated searches are 50 -- already in progress at announce time. Repeated searches are
51 -- likely to finish faster than the first since nearby nodes 51 -- likely to finish faster than the first since nearby nodes
52 -- are not discarded. 52 -- are not discarded.
53 , aPublish :: Either (r -> sr -> IO ()) 53 , aPublish :: r -> tok -> Maybe ni -> IO (Maybe a)
54 (r -> tok -> Maybe ni -> IO (Maybe a))
55 -- ^ The action to perform when we find nearby nodes. The 54 -- ^ The action to perform when we find nearby nodes. The
56 -- destination node is given as a Maybe so that methods that 55 -- destination node is given as a Maybe so that methods that
57 -- treat 'Nothing' as loop-back address can be passed here, 56 -- treat 'Nothing' as loop-back address can be passed here,
58 -- however 'Nothing' will not be passed by the announcer 57 -- however 'Nothing' will not be passed by the announcer
59 -- thread. 58 -- thread.
60 -- 59 --
61 -- There are two cases: 60 -- The action requires a "token" from the destination
62 -- 61 -- node. This is the more typical "announce" semantics for
63 -- [Left] The action to perform requires a search result. 62 -- Kademlia.
64 -- This was implemented to support Tox's DHTKey and
65 -- Friend-Request messages.
66 --
67 -- [Right] The action requires a "token" from the destination
68 -- node. This is the more typical "announce" semantics for
69 -- Kademlia.
70 , aNearestNodes :: nid -> STM [ni] 63 , aNearestNodes :: nid -> STM [ni]
71 -- ^ Method to obtain starting nodes from an iterative Kademlia search. 64 -- ^ Method to obtain starting nodes from an iterative Kademlia search.
72 , aTarget :: nid 65 , aTarget :: nid
@@ -80,6 +73,38 @@ data AnnounceMethod r = forall nid ni sr addr tok a.
80 -- use the closest nodes found so far. 73 -- use the closest nodes found so far.
81 } 74 }
82 75
76-- | This type specifies a Kademlia search and an action to perform upon the result.
77data SearchMethod r = forall nid ni sr addr tok a.
78 ( Show nid
79 , Hashable nid
80 , Hashable ni
81 , Ord addr
82 , Ord nid
83 , Ord ni
84 ) => SearchMethod
85 { sSearch :: Search nid addr tok ni sr
86 -- ^ This is the Kademlia search to run repeatedly to find the
87 -- nearby nodes. A new search is started whenever one is not
88 -- already in progress at announce time. Repeated searches are
89 -- likely to finish faster than the first since nearby nodes
90 -- are not discarded.
91 --
92 -- XXX: Currently, "repeatedly" is wrong.
93 , sWithResult :: r -> sr -> IO ()
94 -- ^
95 -- The action to perform upon a search result. This was
96 -- implemented to support Tox's DHTKey and Friend-Request
97 -- messages.
98 , sNearestNodes :: nid -> STM [ni]
99 -- ^ Method to obtain starting nodes from an iterative Kademlia search.
100 , sTarget :: nid
101 -- ^ This is the Kademlia node-id of the item being announced.
102 , sInterval :: POSIXTime
103 -- ^ The time between searches.
104 --
105 -- XXX: Currently, search results will stop any repetition.
106 }
107
83 108
84-- announcement started: 109-- announcement started:
85newAnnouncement :: STM (IO a) 110newAnnouncement :: STM (IO a)
@@ -115,17 +140,15 @@ reAnnounce checkFin announce interval = \announcer k now -> do
115 announce 140 announce
116 141
117-- | Schedule a recurring Search + Announce sequence. 142-- | Schedule a recurring Search + Announce sequence.
118schedule :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () 143scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
119schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do 144scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do
120 st <- atomically $ newSearch aSearch aTarget [] 145 st <- atomically $ newSearch aSearch aTarget []
121 ns <- atomically $ newTVar MM.empty 146 ns <- atomically $ newTVar MM.empty
122 let astate = AnnounceState st ns 147 let astate = AnnounceState st ns
123 publishToNodes is 148 publishToNodes is = do
124 | Left _ <- aPublish = return ()
125 | Right publish <- aPublish = do
126 forM_ is $ \(Binding ni mtok _) -> do 149 forM_ is $ \(Binding ni mtok _) -> do
127 forM_ mtok $ \tok -> do 150 forM_ mtok $ \tok -> do
128 got <- publish r tok (Just ni) 151 got <- aPublish r tok (Just ni)
129 now <- getPOSIXTime 152 now <- getPOSIXTime
130 forM_ got $ \_ -> do 153 forM_ got $ \_ -> do
131 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now) 154 atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
@@ -134,20 +157,7 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInte
134 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) 157 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
135 return $ MM.toList bs 158 return $ MM.toList bs
136 publishToNodes is 159 publishToNodes is
137 onResult sr 160 onResult sr = return True
138 | Right _ <- aPublish = return True
139 | Left sendit <- aPublish = do
140 scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do
141 got <- sendit r sr
142 -- If we had a way to get the source of a search result, we might want to
143 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
144 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
145 -- a message be forgotten.
146 --
147 -- forM_ got $ \_ -> do
148 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
149 return ()
150 return True -- True to keep searching.
151 searchAgain = do 161 searchAgain = do
152 -- Canceling a pending search here seems to make announcements more reliable. 162 -- Canceling a pending search here seems to make announcements more reliable.
153 searchCancel st 163 searchCancel st
@@ -172,3 +182,33 @@ schedule announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInte
172 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) 182 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
173 interruptDelay (interrutible announcer) 183 interruptDelay (interrutible announcer)
174 184
185-- | Schedule a recurring Search + Publish sequence.
186scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO ()
187scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do
188 st <- atomically $ newSearch sSearch sTarget []
189 ns <- atomically $ newTVar MM.empty
190 let astate = AnnounceState st ns
191 onResult sr = do
192 -- XXX: Using /k/ here as the announce key is causing the search not to repeat.
193 scheduleImmediately announcer k $ ScheduledItem $ \_ _ _ -> return $ do
194 got <- sWithResult r sr
195 -- If we had a way to get the source of a search result, we might want to
196 -- treat it similarly to an announcing node and remember it in the 'aStoringNodes'
197 -- MinMaxPSQ. For now, I'm just letting the nodes for which we've already sent
198 -- a message be forgotten.
199 --
200 -- forM_ got $ \_ -> do
201 -- atomically $ modifyTVar ns $ MM.insertTake announceK ni (Down now)
202 return ()
203 return True -- True to keep searching.
204 searchAgain = do
205 -- Canceling a pending search here seems to make announcements more reliable.
206 searchCancel st
207 isfin <- searchIsFinished st -- Always True, since we canceled.
208 return $ when isfin $ void $ fork search
209 search = do -- thread to fork
210 atomically $ reset sNearestNodes sSearch sTarget st
211 searchLoop sSearch sTarget onResult st
212 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval)
213 interruptDelay (interrutible announcer)
214