diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 33 |
1 files changed, 24 insertions, 9 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index a9efba89..8c441cb0 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -6,7 +6,6 @@ | |||
6 | {-# LANGUAGE LambdaCase #-} | 6 | {-# LANGUAGE LambdaCase #-} |
7 | module Network.BitTorrent.DHT.Search where | 7 | module Network.BitTorrent.DHT.Search where |
8 | 8 | ||
9 | import Control.Concurrent | ||
10 | import Control.Concurrent.Async.Pool | 9 | import Control.Concurrent.Async.Pool |
11 | import Control.Concurrent.STM | 10 | import Control.Concurrent.STM |
12 | import Control.Exception | 11 | import Control.Exception |
@@ -28,11 +27,17 @@ import qualified Data.Wrapper.PSQ as PSQ | |||
28 | import Network.Address hiding (NodeId) | 27 | import Network.Address hiding (NodeId) |
29 | import Network.DatagramServer.Types | 28 | import Network.DatagramServer.Types |
30 | import Network.DHT.Routing as R | 29 | import Network.DHT.Routing as R |
30 | #ifdef THREAD_DEBUG | ||
31 | import Control.Concurrent.Lifted.Instrument | ||
32 | #else | ||
33 | import Control.Concurrent.Lifted | ||
34 | import GHC.Conc (labelThread) | ||
35 | #endif | ||
31 | 36 | ||
32 | data Search nid addr ni r = Search | 37 | data Search nid addr ni r = Search |
33 | { searchSpace :: KademliaSpace nid ni | 38 | { searchSpace :: KademliaSpace nid ni |
34 | , searchNodeAddress :: ni -> addr | 39 | , searchNodeAddress :: ni -> addr |
35 | , searchQuery :: ni -> IO ([ni], [r]) | 40 | , searchQuery :: nid -> ni -> IO ([ni], [r]) |
36 | } | 41 | } |
37 | 42 | ||
38 | data SearchState nid addr ni r = SearchState | 43 | data SearchState nid addr ni r = SearchState |
@@ -89,7 +94,7 @@ newSearch (Search space nAddr qry) target ns = atomically $ do | |||
89 | ( SearchState c q i v ) | 94 | ( SearchState c q i v ) |
90 | 95 | ||
91 | searchAlpha :: Int | 96 | searchAlpha :: Int |
92 | searchAlpha = 3 | 97 | searchAlpha = 8 |
93 | 98 | ||
94 | searchK :: Int | 99 | searchK :: Int |
95 | searchK = 8 | 100 | searchK = 8 |
@@ -99,6 +104,7 @@ sendQuery :: forall addr nid ni r. | |||
99 | , Ord r | 104 | , Ord r |
100 | , PSQKey nid | 105 | , PSQKey nid |
101 | , PSQKey ni | 106 | , PSQKey ni |
107 | , Show nid | ||
102 | ) => | 108 | ) => |
103 | Search nid addr ni r | 109 | Search nid addr ni r |
104 | -> nid | 110 | -> nid |
@@ -107,8 +113,9 @@ sendQuery :: forall addr nid ni r. | |||
107 | -> Binding ni nid | 113 | -> Binding ni nid |
108 | -> IO () | 114 | -> IO () |
109 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do | 115 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do |
116 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | ||
110 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | 117 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) |
111 | (searchQuery ni) | 118 | (searchQuery searchTarget ni) |
112 | atomically $ do | 119 | atomically $ do |
113 | modifyTVar searchPendingCount pred | 120 | modifyTVar searchPendingCount pred |
114 | vs <- readTVar searchVisited | 121 | vs <- readTVar searchVisited |
@@ -156,11 +163,17 @@ search :: | |||
156 | , Ord addr | 163 | , Ord addr |
157 | , PSQKey nid | 164 | , PSQKey nid |
158 | , PSQKey ni | 165 | , PSQKey ni |
159 | ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO () | 166 | , Show nid |
160 | search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> do | 167 | ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r) |
168 | search sch@Search{..} buckets target result = do | ||
161 | let ns = R.kclosest searchSpace searchK target buckets | 169 | let ns = R.kclosest searchSpace searchK target buckets |
162 | s@SearchState{..} <- newSearch sch target ns | 170 | st <- newSearch sch target ns |
163 | fix $ \again -> do | 171 | fork $ go st |
172 | return st | ||
173 | where | ||
174 | go s@SearchState{..} = do | ||
175 | myThreadId >>= flip labelThread ("search."++show target) | ||
176 | withTaskGroup searchAlpha $ \g -> fix $ \again -> do | ||
164 | join $ atomically $ do | 177 | join $ atomically $ do |
165 | cnt <- readTVar $ searchPendingCount | 178 | cnt <- readTVar $ searchPendingCount |
166 | informants <- readTVar searchInformant | 179 | informants <- readTVar searchInformant |
@@ -170,9 +183,11 @@ search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> | |||
170 | | -- If there's fewer than /k/ informants and there's any | 183 | | -- If there's fewer than /k/ informants and there's any |
171 | -- node we haven't yet got a response from. | 184 | -- node we haven't yet got a response from. |
172 | (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) | 185 | (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) |
186 | -- Or there's no informants yet at all. | ||
187 | || MM.null informants | ||
173 | -- Or if the closest scheduled node is nearer than the | 188 | -- Or if the closest scheduled node is nearer than the |
174 | -- nearest /k/ informants. | 189 | -- nearest /k/ informants. |
175 | || (PSQ.prio (fromJust $ MM.findMax informants) > d) | 190 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) |
176 | -> -- Then the search continues, send a query. | 191 | -> -- Then the search continues, send a query. |
177 | do writeTVar searchQueued q | 192 | do writeTVar searchQueued q |
178 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | 193 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) |