diff options
Diffstat (limited to 'src/Network/Kademlia/Search.hs')
-rw-r--r-- | src/Network/Kademlia/Search.hs | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs new file mode 100644 index 00000000..195bed14 --- /dev/null +++ b/src/Network/Kademlia/Search.hs | |||
@@ -0,0 +1,204 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE PatternSynonyms #-} | ||
3 | {-# LANGUAGE RecordWildCards #-} | ||
4 | {-# LANGUAGE ScopedTypeVariables #-} | ||
5 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | module Network.Kademlia.Search where | ||
8 | |||
9 | import Control.Concurrent.Tasks | ||
10 | import Control.Concurrent.STM | ||
11 | import Control.Exception | ||
12 | import Control.Monad | ||
13 | import Data.Bool | ||
14 | import Data.Function | ||
15 | import Data.List | ||
16 | import qualified Data.Map.Strict as Map | ||
17 | ;import Data.Map.Strict (Map) | ||
18 | import Data.Maybe | ||
19 | import qualified Data.Set as Set | ||
20 | ;import Data.Set (Set) | ||
21 | import System.IO | ||
22 | import System.IO.Error | ||
23 | |||
24 | import qualified Data.MinMaxPSQ as MM | ||
25 | ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') | ||
26 | import qualified Data.Wrapper.PSQ as PSQ | ||
27 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) | ||
28 | import Network.Address hiding (NodeId) | ||
29 | import Network.DHT.Routing as R | ||
30 | #ifdef THREAD_DEBUG | ||
31 | import Control.Concurrent.Lifted.Instrument | ||
32 | #else | ||
33 | import Control.Concurrent.Lifted | ||
34 | import GHC.Conc (labelThread) | ||
35 | #endif | ||
36 | |||
37 | data Search nid addr tok ni r = Search | ||
38 | { searchSpace :: KademliaSpace nid ni | ||
39 | , searchNodeAddress :: ni -> addr | ||
40 | , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok)) | ||
41 | } | ||
42 | |||
43 | data SearchState nid addr tok ni r = SearchState | ||
44 | {- | ||
45 | { searchParams :: Search nid addr ni r | ||
46 | |||
47 | , searchTarget :: nid | ||
48 | -- | This action will be performed at least once on each search result. | ||
49 | -- It may be invoked multiple times since different nodes may report the | ||
50 | -- same result. If the action returns 'False', the search will be | ||
51 | -- aborted, otherwise it will continue until it is decided that we've | ||
52 | -- asked the closest K nodes to the target. | ||
53 | , searchResult :: r -> STM Bool | ||
54 | |||
55 | -} | ||
56 | |||
57 | { -- | The number of pending queries. Incremented before any query is sent | ||
58 | -- and decremented when we get a reply. | ||
59 | searchPendingCount :: TVar Int | ||
60 | -- | Nodes scheduled to be queried. | ||
61 | , searchQueued :: TVar (MinMaxPSQ ni nid) | ||
62 | -- | The nearest K nodes that issued a reply. | ||
63 | , searchInformant :: TVar (MinMaxPSQ' ni nid tok) | ||
64 | -- | This tracks already-queried addresses so we avoid bothering them | ||
65 | -- again. XXX: We could probably keep only the pending queries in this | ||
66 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | ||
67 | -- should limit the number of outstanding queries. | ||
68 | , searchVisited :: TVar (Set addr) | ||
69 | } | ||
70 | |||
71 | newSearch :: ( Ord addr | ||
72 | , PSQKey nid | ||
73 | , PSQKey ni | ||
74 | ) => | ||
75 | {- | ||
76 | KademliaSpace nid ni | ||
77 | -> (ni -> addr) | ||
78 | -> (ni -> IO ([ni], [r])) -- the query action. | ||
79 | -> (r -> STM Bool) -- receives search results. | ||
80 | -> nid -- target of search | ||
81 | -} | ||
82 | Search nid addr tok ni r | ||
83 | -> nid | ||
84 | -> [ni] -- Initial nodes to query. | ||
85 | -> STM (SearchState nid addr tok ni r) | ||
86 | newSearch (Search space nAddr qry) target ns = do | ||
87 | c <- newTVar 0 | ||
88 | q <- newTVar $ MM.fromList | ||
89 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | ||
90 | $ ns | ||
91 | i <- newTVar MM.empty | ||
92 | v <- newTVar Set.empty | ||
93 | return -- (Search space nAddr qry) , r , target | ||
94 | ( SearchState c q i v ) | ||
95 | |||
96 | searchAlpha :: Int | ||
97 | searchAlpha = 8 | ||
98 | |||
99 | searchK :: Int | ||
100 | searchK = 8 | ||
101 | |||
102 | sendQuery :: forall addr nid tok ni r. | ||
103 | ( Ord addr | ||
104 | , PSQKey nid | ||
105 | , PSQKey ni | ||
106 | , Show nid | ||
107 | ) => | ||
108 | Search nid addr tok ni r | ||
109 | -> nid | ||
110 | -> (r -> STM Bool) | ||
111 | -> SearchState nid addr tok ni r | ||
112 | -> Binding ni nid | ||
113 | -> IO () | ||
114 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do | ||
115 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | ||
116 | reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) | ||
117 | -- (ns,rs) | ||
118 | let tok = error "TODO: token" | ||
119 | atomically $ do | ||
120 | modifyTVar searchPendingCount pred | ||
121 | maybe (return ()) go reply | ||
122 | where | ||
123 | go (ns,rs,tok) = do | ||
124 | vs <- readTVar searchVisited | ||
125 | -- We only queue a node if it is not yet visited | ||
126 | let insertFoundNode :: ni | ||
127 | -> MinMaxPSQ ni nid | ||
128 | -> MinMaxPSQ ni nid | ||
129 | insertFoundNode n q | ||
130 | | searchNodeAddress n `Set.member` vs | ||
131 | = q | ||
132 | | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget | ||
133 | $ kademliaLocation searchSpace n ) | ||
134 | q | ||
135 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns | ||
136 | modifyTVar searchInformant $ MM.insertTake' searchK ni tok d | ||
137 | flip fix rs $ \loop -> \case | ||
138 | r:rs' -> do | ||
139 | wanting <- searchResult r | ||
140 | if wanting then loop rs' | ||
141 | else searchCancel sch | ||
142 | [] -> return () | ||
143 | |||
144 | |||
145 | searchIsFinished :: ( PSQKey nid | ||
146 | , PSQKey ni | ||
147 | ) => SearchState nid addr tok ni r -> STM Bool | ||
148 | searchIsFinished SearchState{ ..} = do | ||
149 | q <- readTVar searchQueued | ||
150 | cnt <- readTVar searchPendingCount | ||
151 | informants <- readTVar searchInformant | ||
152 | return $ cnt == 0 | ||
153 | && ( MM.null q | ||
154 | || ( MM.size informants >= searchK | ||
155 | && ( PSQ.prio (fromJust $ MM.findMax informants) | ||
156 | <= PSQ.prio (fromJust $ MM.findMin q)))) | ||
157 | |||
158 | searchCancel :: SearchState nid addr tok ni r -> STM () | ||
159 | searchCancel SearchState{..} = do | ||
160 | writeTVar searchPendingCount 0 | ||
161 | writeTVar searchQueued MM.empty | ||
162 | |||
163 | search :: | ||
164 | ( Ord r | ||
165 | , Ord addr | ||
166 | , PSQKey nid | ||
167 | , PSQKey ni | ||
168 | , Show nid | ||
169 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) | ||
170 | search sch buckets target result = do | ||
171 | let ns = R.kclosest (searchSpace sch) searchK target buckets | ||
172 | st <- atomically $ newSearch sch target ns | ||
173 | fork $ searchLoop sch target result st | ||
174 | return st | ||
175 | |||
176 | searchLoop sch@Search{..} target result s@SearchState{..} = do | ||
177 | myThreadId >>= flip labelThread ("search."++show target) | ||
178 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do | ||
179 | join $ atomically $ do | ||
180 | cnt <- readTVar $ searchPendingCount | ||
181 | informants <- readTVar searchInformant | ||
182 | found <- MM.minView <$> readTVar searchQueued | ||
183 | case found of | ||
184 | Just (ni :-> d, q) | ||
185 | | -- If there's fewer than /k/ informants and there's any | ||
186 | -- node we haven't yet got a response from. | ||
187 | (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) | ||
188 | -- Or there's no informants yet at all. | ||
189 | || MM.null informants | ||
190 | -- Or if the closest scheduled node is nearer than the | ||
191 | -- nearest /k/ informants. | ||
192 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) | ||
193 | -> -- Then the search continues, send a query. | ||
194 | do writeTVar searchQueued q | ||
195 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | ||
196 | modifyTVar searchPendingCount succ | ||
197 | return $ do | ||
198 | forkTask g | ||
199 | "searchQuery" | ||
200 | $ sendQuery sch target result s (ni :-> d) | ||
201 | again | ||
202 | _ -> -- Otherwise, we are finished. | ||
203 | do check (cnt == 0) | ||
204 | return $ return () | ||