summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Search.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT/Search.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs33
1 files changed, 24 insertions, 9 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
index a9efba89..8c441cb0 100644
--- a/src/Network/BitTorrent/DHT/Search.hs
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -6,7 +6,6 @@
6{-# LANGUAGE LambdaCase #-} 6{-# LANGUAGE LambdaCase #-}
7module Network.BitTorrent.DHT.Search where 7module Network.BitTorrent.DHT.Search where
8 8
9import Control.Concurrent
10import Control.Concurrent.Async.Pool 9import Control.Concurrent.Async.Pool
11import Control.Concurrent.STM 10import Control.Concurrent.STM
12import Control.Exception 11import Control.Exception
@@ -28,11 +27,17 @@ import qualified Data.Wrapper.PSQ as PSQ
28import Network.Address hiding (NodeId) 27import Network.Address hiding (NodeId)
29import Network.DatagramServer.Types 28import Network.DatagramServer.Types
30import Network.DHT.Routing as R 29import Network.DHT.Routing as R
30#ifdef THREAD_DEBUG
31import Control.Concurrent.Lifted.Instrument
32#else
33import Control.Concurrent.Lifted
34import GHC.Conc (labelThread)
35#endif
31 36
32data Search nid addr ni r = Search 37data Search nid addr ni r = Search
33 { searchSpace :: KademliaSpace nid ni 38 { searchSpace :: KademliaSpace nid ni
34 , searchNodeAddress :: ni -> addr 39 , searchNodeAddress :: ni -> addr
35 , searchQuery :: ni -> IO ([ni], [r]) 40 , searchQuery :: nid -> ni -> IO ([ni], [r])
36 } 41 }
37 42
38data SearchState nid addr ni r = SearchState 43data SearchState nid addr ni r = SearchState
@@ -89,7 +94,7 @@ newSearch (Search space nAddr qry) target ns = atomically $ do
89 ( SearchState c q i v ) 94 ( SearchState c q i v )
90 95
91searchAlpha :: Int 96searchAlpha :: Int
92searchAlpha = 3 97searchAlpha = 8
93 98
94searchK :: Int 99searchK :: Int
95searchK = 8 100searchK = 8
@@ -99,6 +104,7 @@ sendQuery :: forall addr nid ni r.
99 , Ord r 104 , Ord r
100 , PSQKey nid 105 , PSQKey nid
101 , PSQKey ni 106 , PSQKey ni
107 , Show nid
102 ) => 108 ) =>
103 Search nid addr ni r 109 Search nid addr ni r
104 -> nid 110 -> nid
@@ -107,8 +113,9 @@ sendQuery :: forall addr nid ni r.
107 -> Binding ni nid 113 -> Binding ni nid
108 -> IO () 114 -> IO ()
109sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do 115sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do
116 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
110 (ns,rs) <- handle (\(SomeException e) -> return ([],[])) 117 (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
111 (searchQuery ni) 118 (searchQuery searchTarget ni)
112 atomically $ do 119 atomically $ do
113 modifyTVar searchPendingCount pred 120 modifyTVar searchPendingCount pred
114 vs <- readTVar searchVisited 121 vs <- readTVar searchVisited
@@ -156,11 +163,17 @@ search ::
156 , Ord addr 163 , Ord addr
157 , PSQKey nid 164 , PSQKey nid
158 , PSQKey ni 165 , PSQKey ni
159 ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO () 166 , Show nid
160search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> do 167 ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r)
168search sch@Search{..} buckets target result = do
161 let ns = R.kclosest searchSpace searchK target buckets 169 let ns = R.kclosest searchSpace searchK target buckets
162 s@SearchState{..} <- newSearch sch target ns 170 st <- newSearch sch target ns
163 fix $ \again -> do 171 fork $ go st
172 return st
173 where
174 go s@SearchState{..} = do
175 myThreadId >>= flip labelThread ("search."++show target)
176 withTaskGroup searchAlpha $ \g -> fix $ \again -> do
164 join $ atomically $ do 177 join $ atomically $ do
165 cnt <- readTVar $ searchPendingCount 178 cnt <- readTVar $ searchPendingCount
166 informants <- readTVar searchInformant 179 informants <- readTVar searchInformant
@@ -170,9 +183,11 @@ search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g ->
170 | -- If there's fewer than /k/ informants and there's any 183 | -- If there's fewer than /k/ informants and there's any
171 -- node we haven't yet got a response from. 184 -- node we haven't yet got a response from.
172 (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) 185 (MM.size informants < searchK) && (cnt > 0 || not (MM.null q))
186 -- Or there's no informants yet at all.
187 || MM.null informants
173 -- Or if the closest scheduled node is nearer than the 188 -- Or if the closest scheduled node is nearer than the
174 -- nearest /k/ informants. 189 -- nearest /k/ informants.
175 || (PSQ.prio (fromJust $ MM.findMax informants) > d) 190 || (d < PSQ.prio (fromJust $ MM.findMax informants))
176 -> -- Then the search continues, send a query. 191 -> -- Then the search continues, send a query.
177 do writeTVar searchQueued q 192 do writeTVar searchQueued q
178 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) 193 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)