summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia/Search.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-09-15 17:36:53 -0400
committerjoe <joe@jerkface.net>2017-09-15 17:36:53 -0400
commitfdfe7279339f91bad5ceb0cab8e699415686ab3f (patch)
treeadf6b408fd70aff2173a14eefc0ed29f10a864c2 /src/Network/Kademlia/Search.hs
parenta4bc159e22b8c270284bb9ad10e511c6a411ebf6 (diff)
Moved Network.BitTorrent.DHT.Search -> Network.Kademlia.Search
Diffstat (limited to 'src/Network/Kademlia/Search.hs')
-rw-r--r--src/Network/Kademlia/Search.hs204
1 files changed, 204 insertions, 0 deletions
diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs
new file mode 100644
index 00000000..195bed14
--- /dev/null
+++ b/src/Network/Kademlia/Search.hs
@@ -0,0 +1,204 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE PatternSynonyms #-}
3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE LambdaCase #-}
7module Network.Kademlia.Search where
8
9import Control.Concurrent.Tasks
10import Control.Concurrent.STM
11import Control.Exception
12import Control.Monad
13import Data.Bool
14import Data.Function
15import Data.List
16import qualified Data.Map.Strict as Map
17 ;import Data.Map.Strict (Map)
18import Data.Maybe
19import qualified Data.Set as Set
20 ;import Data.Set (Set)
21import System.IO
22import System.IO.Error
23
24import qualified Data.MinMaxPSQ as MM
25 ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ')
26import qualified Data.Wrapper.PSQ as PSQ
27 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey)
28import Network.Address hiding (NodeId)
29import Network.DHT.Routing as R
30#ifdef THREAD_DEBUG
31import Control.Concurrent.Lifted.Instrument
32#else
33import Control.Concurrent.Lifted
34import GHC.Conc (labelThread)
35#endif
36
37data Search nid addr tok ni r = Search
38 { searchSpace :: KademliaSpace nid ni
39 , searchNodeAddress :: ni -> addr
40 , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok))
41 }
42
43data SearchState nid addr tok ni r = SearchState
44 {-
45 { searchParams :: Search nid addr ni r
46
47 , searchTarget :: nid
48 -- | This action will be performed at least once on each search result.
49 -- It may be invoked multiple times since different nodes may report the
50 -- same result. If the action returns 'False', the search will be
51 -- aborted, otherwise it will continue until it is decided that we've
52 -- asked the closest K nodes to the target.
53 , searchResult :: r -> STM Bool
54
55 -}
56
57 { -- | The number of pending queries. Incremented before any query is sent
58 -- and decremented when we get a reply.
59 searchPendingCount :: TVar Int
60 -- | Nodes scheduled to be queried.
61 , searchQueued :: TVar (MinMaxPSQ ni nid)
62 -- | The nearest K nodes that issued a reply.
63 , searchInformant :: TVar (MinMaxPSQ' ni nid tok)
64 -- | This tracks already-queried addresses so we avoid bothering them
65 -- again. XXX: We could probably keep only the pending queries in this
66 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
67 -- should limit the number of outstanding queries.
68 , searchVisited :: TVar (Set addr)
69 }
70
71newSearch :: ( Ord addr
72 , PSQKey nid
73 , PSQKey ni
74 ) =>
75 {-
76 KademliaSpace nid ni
77 -> (ni -> addr)
78 -> (ni -> IO ([ni], [r])) -- the query action.
79 -> (r -> STM Bool) -- receives search results.
80 -> nid -- target of search
81 -}
82 Search nid addr tok ni r
83 -> nid
84 -> [ni] -- Initial nodes to query.
85 -> STM (SearchState nid addr tok ni r)
86newSearch (Search space nAddr qry) target ns = do
87 c <- newTVar 0
88 q <- newTVar $ MM.fromList
89 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
90 $ ns
91 i <- newTVar MM.empty
92 v <- newTVar Set.empty
93 return -- (Search space nAddr qry) , r , target
94 ( SearchState c q i v )
95
96searchAlpha :: Int
97searchAlpha = 8
98
99searchK :: Int
100searchK = 8
101
102sendQuery :: forall addr nid tok ni r.
103 ( Ord addr
104 , PSQKey nid
105 , PSQKey ni
106 , Show nid
107 ) =>
108 Search nid addr tok ni r
109 -> nid
110 -> (r -> STM Bool)
111 -> SearchState nid addr tok ni r
112 -> Binding ni nid
113 -> IO ()
114sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do
115 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
116 reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing)
117 -- (ns,rs)
118 let tok = error "TODO: token"
119 atomically $ do
120 modifyTVar searchPendingCount pred
121 maybe (return ()) go reply
122 where
123 go (ns,rs,tok) = do
124 vs <- readTVar searchVisited
125 -- We only queue a node if it is not yet visited
126 let insertFoundNode :: ni
127 -> MinMaxPSQ ni nid
128 -> MinMaxPSQ ni nid
129 insertFoundNode n q
130 | searchNodeAddress n `Set.member` vs
131 = q
132 | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget
133 $ kademliaLocation searchSpace n )
134 q
135 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
136 modifyTVar searchInformant $ MM.insertTake' searchK ni tok d
137 flip fix rs $ \loop -> \case
138 r:rs' -> do
139 wanting <- searchResult r
140 if wanting then loop rs'
141 else searchCancel sch
142 [] -> return ()
143
144
145searchIsFinished :: ( PSQKey nid
146 , PSQKey ni
147 ) => SearchState nid addr tok ni r -> STM Bool
148searchIsFinished SearchState{ ..} = do
149 q <- readTVar searchQueued
150 cnt <- readTVar searchPendingCount
151 informants <- readTVar searchInformant
152 return $ cnt == 0
153 && ( MM.null q
154 || ( MM.size informants >= searchK
155 && ( PSQ.prio (fromJust $ MM.findMax informants)
156 <= PSQ.prio (fromJust $ MM.findMin q))))
157
158searchCancel :: SearchState nid addr tok ni r -> STM ()
159searchCancel SearchState{..} = do
160 writeTVar searchPendingCount 0
161 writeTVar searchQueued MM.empty
162
163search ::
164 ( Ord r
165 , Ord addr
166 , PSQKey nid
167 , PSQKey ni
168 , Show nid
169 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r)
170search sch buckets target result = do
171 let ns = R.kclosest (searchSpace sch) searchK target buckets
172 st <- atomically $ newSearch sch target ns
173 fork $ searchLoop sch target result st
174 return st
175
176searchLoop sch@Search{..} target result s@SearchState{..} = do
177 myThreadId >>= flip labelThread ("search."++show target)
178 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do
179 join $ atomically $ do
180 cnt <- readTVar $ searchPendingCount
181 informants <- readTVar searchInformant
182 found <- MM.minView <$> readTVar searchQueued
183 case found of
184 Just (ni :-> d, q)
185 | -- If there's fewer than /k/ informants and there's any
186 -- node we haven't yet got a response from.
187 (MM.size informants < searchK) && (cnt > 0 || not (MM.null q))
188 -- Or there's no informants yet at all.
189 || MM.null informants
190 -- Or if the closest scheduled node is nearer than the
191 -- nearest /k/ informants.
192 || (d < PSQ.prio (fromJust $ MM.findMax informants))
193 -> -- Then the search continues, send a query.
194 do writeTVar searchQueued q
195 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
196 modifyTVar searchPendingCount succ
197 return $ do
198 forkTask g
199 "searchQuery"
200 $ sendQuery sch target result s (ni :-> d)
201 again
202 _ -> -- Otherwise, we are finished.
203 do check (cnt == 0)
204 return $ return ()