summaryrefslogtreecommitdiff
path: root/kad/src/Network/Kademlia/Search.hs
diff options
context:
space:
mode:
Diffstat (limited to 'kad/src/Network/Kademlia/Search.hs')
-rw-r--r--kad/src/Network/Kademlia/Search.hs236
1 files changed, 236 insertions, 0 deletions
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
new file mode 100644
index 00000000..1be1afc1
--- /dev/null
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -0,0 +1,236 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE PatternSynonyms #-}
3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE LambdaCase #-}
7module Network.Kademlia.Search where
8
9import Control.Concurrent.Tasks
10import Control.Concurrent.STM
11import Control.Monad
12import Data.Function
13import Data.Maybe
14import qualified Data.Set as Set
15 ;import Data.Set (Set)
16import Data.Hashable (Hashable(..)) -- for type sigs
17import System.IO.Error
18
19import qualified Data.MinMaxPSQ as MM
20 ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ')
21import qualified Data.Wrapper.PSQ as PSQ
22 ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey)
23import Network.Kademlia.Routing as R
24#ifdef THREAD_DEBUG
25import Control.Concurrent.Lifted.Instrument
26#else
27import Control.Concurrent.Lifted
28import GHC.Conc (labelThread)
29#endif
30
31data Search nid addr tok ni r = Search
32 { searchSpace :: KademliaSpace nid ni
33 , searchNodeAddress :: ni -> addr
34 , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok)))
35 (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ())
36 , searchAlpha :: Int -- α = 8
37 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
38 -- how fast the queries are. For Tox's much slower onion-routed queries, we
39 -- need to ensure that closer non-responding queries don't completely push out
40 -- farther away queries.
41 --
42 -- For BitTorrent, setting them both 8 was not an issue, but that is no longer
43 -- supported because now the number of remembered informants is now the
44 -- difference between these two numbers. So, if searchK = 16 and searchAlpha =
45 -- 4, then the number of remembered query responses is 12.
46 , searchK :: Int -- K = 16
47 }
48
49data SearchState nid addr tok ni r = SearchState
50 { -- | The number of pending queries. Incremented before any query is sent
51 -- and decremented when we get a reply.
52 searchPendingCount :: TVar Int
53 -- | Nodes scheduled to be queried (roughly at most K).
54 , searchQueued :: TVar (MinMaxPSQ ni nid)
55 -- | The nearest (K - α) nodes that issued a reply.
56 --
57 -- α is the maximum number of simultaneous queries.
58 , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok))
59 -- | This tracks already-queried addresses so we avoid bothering them
60 -- again. XXX: We could probably keep only the pending queries in this
61 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
62 -- should limit the number of outstanding queries.
63 , searchVisited :: TVar (Set addr)
64 , searchSpec :: Search nid addr tok ni r
65 }
66
67
68newSearch :: ( Ord addr
69 , PSQKey nid
70 , PSQKey ni
71 ) =>
72 {-
73 KademliaSpace nid ni
74 -> (ni -> addr)
75 -> (ni -> IO ([ni], [r])) -- the query action.
76 -> (r -> STM Bool) -- receives search results.
77 -> nid -- target of search
78 -}
79 Search nid addr tok ni r
80 -> nid
81 -> [ni] -- Initial nodes to query.
82 -> STM (SearchState nid addr tok ni r)
83newSearch s@(Search space nAddr qry _ _) target ns = do
84 c <- newTVar 0
85 q <- newTVar $ MM.fromList
86 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
87 $ ns
88 i <- newTVar MM.empty
89 v <- newTVar Set.empty
90 return -- (Search space nAddr qry) , r , target
91 ( SearchState c q i v s )
92
93-- | Discard a value from a key-priority-value tuple. This is useful for
94-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ".
95stripValue :: Binding' k p v -> Binding k p
96stripValue (Binding ni _ nid) = (ni :-> nid)
97
98-- | Reset a 'SearchState' object to ready it for a repeated search.
99reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
100 (nid -> STM [ni])
101 -> Search nid addr1 tok1 ni r1
102 -> nid
103 -> SearchState nid addr tok ni r
104 -> STM (SearchState nid addr tok ni r)
105reset nearestNodes qsearch target st = do
106 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
107 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
108 <$> nearestNodes target
109 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st)
110 writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes
111 writeTVar (searchInformant st) MM.empty
112 writeTVar (searchVisited st) Set.empty
113 writeTVar (searchPendingCount st) 0
114 return st
115
116sendAsyncQuery :: forall addr nid tok ni r.
117 ( Ord addr
118 , PSQKey nid
119 , PSQKey ni
120 , Show nid
121 ) =>
122 Search nid addr tok ni r
123 -> nid
124 -> (r -> STM Bool) -- ^ return False to terminate the search.
125 -> SearchState nid addr tok ni r
126 -> Binding ni nid
127 -> TaskGroup
128 -> IO ()
129sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g =
130 case searchQuery of
131 Left blockingQuery ->
132 forkTask g "searchQuery" $ do
133 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
134 reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing)
135 atomically $ do
136 modifyTVar searchPendingCount pred
137 maybe (return ()) go reply
138 Right nonblockingQuery -> do
139 nonblockingQuery searchTarget ni $ \reply ->
140 atomically $ do
141 modifyTVar searchPendingCount pred
142 maybe (return ()) go reply
143 where
144 go (ns,rs,tok) = do
145 vs <- readTVar searchVisited
146 -- We only queue a node if it is not yet visited
147 let insertFoundNode :: Int
148 -> ni
149 -> MinMaxPSQ ni nid
150 -> MinMaxPSQ ni nid
151 insertFoundNode k n q
152 | searchNodeAddress n `Set.member` vs
153 = q
154 | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget
155 $ kademliaLocation searchSpace n )
156 q
157
158 qsize0 <- MM.size <$> readTVar searchQueued
159 let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow
160 -- only when there's fewer than
161 -- K elements.
162 modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns
163 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
164 flip fix rs $ \loop -> \case
165 r:rs' -> do
166 wanting <- searchResult r
167 if wanting then loop rs'
168 else searchCancel sch
169 [] -> return ()
170
171
172searchIsFinished :: ( PSQKey nid
173 , PSQKey ni
174 ) => SearchState nid addr tok ni r -> STM Bool
175searchIsFinished SearchState{..} = do
176 q <- readTVar searchQueued
177 cnt <- readTVar searchPendingCount
178 informants <- readTVar searchInformant
179 return $ cnt == 0
180 && ( MM.null q
181 || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec)
182 && ( PSQ.prio (fromJust $ MM.findMax informants)
183 <= PSQ.prio (fromJust $ MM.findMin q))))
184
185searchCancel :: SearchState nid addr tok ni r -> STM ()
186searchCancel SearchState{..} = do
187 writeTVar searchPendingCount 0
188 writeTVar searchQueued MM.empty
189
190search ::
191 ( Ord r
192 , Ord addr
193 , PSQKey nid
194 , PSQKey ni
195 , Show nid
196 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r)
197search sch buckets target result = do
198 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
199 st <- atomically $ newSearch sch target ns
200 forkIO $ searchLoop sch target result st
201 return st
202
203searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni )
204 => Search nid addr tok ni r -- ^ Query and distance methods.
205 -> nid -- ^ The target we are searching for.
206 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
207 -> SearchState nid addr tok ni r -- ^ Search-related state.
208 -> IO ()
209searchLoop sch@Search{..} target result s@SearchState{..} = do
210 myThreadId >>= flip labelThread ("search."++show target)
211 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do
212 join $ atomically $ do
213 cnt <- readTVar $ searchPendingCount
214 check (cnt <= 8) -- Only 8 pending queries at a time.
215 informants <- readTVar searchInformant
216 found <- MM.minView <$> readTVar searchQueued
217 case found of
218 Just (ni :-> d, q)
219 | -- If there's fewer than /k - α/ informants and there's any
220 -- node we haven't yet got a response from.
221 (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q))
222 -- Or there's no informants yet at all.
223 || MM.null informants
224 -- Or if the closest scheduled node is nearer than the
225 -- nearest /k/ informants.
226 || (d < PSQ.prio (fromJust $ MM.findMax informants))
227 -> -- Then the search continues, send a query.
228 do writeTVar searchQueued q
229 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
230 modifyTVar searchPendingCount succ
231 return $ do
232 sendAsyncQuery sch target result s (ni :-> d) g
233 again
234 _ -> -- Otherwise, we are finished.
235 do check (cnt == 0)
236 return $ return ()