diff options
author | joe <joe@jerkface.net> | 2017-02-01 03:21:52 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-02-01 03:21:52 -0500 |
commit | c51e64666b672637843a04c2f279d7d0c9eed01c (patch) | |
tree | d6f50018659ac3c5c3d72ee9bde3824514bd9f6a /src/Network/BitTorrent/DHT/Search.hs | |
parent | 0d1de683de78a70ce9c054b444bb6f19c39d112c (diff) |
New improved iterative search algorithm.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Search.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs new file mode 100644 index 00000000..1fe73c30 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -0,0 +1,92 @@ | |||
1 | {-# LANGUAGE PatternSynonyms #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
3 | module Network.BitTorrent.DHT.Search where | ||
4 | |||
5 | import Control.Concurrent | ||
6 | import Control.Concurrent.Async.Pool | ||
7 | import Control.Concurrent.STM | ||
8 | import Control.Exception | ||
9 | import Control.Monad | ||
10 | import Data.Bool | ||
11 | import Data.Function | ||
12 | import Data.List | ||
13 | import qualified Data.Map.Strict as Map | ||
14 | ;import Data.Map.Strict (Map) | ||
15 | import Data.Maybe | ||
16 | import qualified Data.Set as Set | ||
17 | ;import Data.Set (Set) | ||
18 | import System.IO | ||
19 | |||
20 | import qualified Data.MinMaxPSQ as MM | ||
21 | ;import Data.MinMaxPSQ (MinMaxPSQ) | ||
22 | import qualified Data.Wrapper.PSQ as PSQ | ||
23 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) | ||
24 | import Network.BitTorrent.Address | ||
25 | |||
26 | data IterativeSearch ip r = IterativeSearch | ||
27 | { searchTarget :: NodeId | ||
28 | , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r]) | ||
29 | , searchPendingCount :: TVar Int | ||
30 | , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) | ||
31 | , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) | ||
32 | , searchVisited :: TVar (Set (NodeAddr ip)) | ||
33 | , searchResults :: TVar (Set r) | ||
34 | } | ||
35 | |||
36 | newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r])) | ||
37 | -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r) | ||
38 | newSearch qry target ns = atomically $ do | ||
39 | c <- newTVar 0 | ||
40 | q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns | ||
41 | i <- newTVar MM.empty | ||
42 | v <- newTVar Set.empty | ||
43 | r <- newTVar Set.empty | ||
44 | return $ IterativeSearch target qry c q i v r | ||
45 | |||
46 | searchAlpha :: Int | ||
47 | searchAlpha = 3 | ||
48 | |||
49 | searchK :: Int | ||
50 | searchK = 8 | ||
51 | |||
52 | sendQuery :: (Ord a, Ord t) => | ||
53 | IterativeSearch t a | ||
54 | -> Binding (NodeInfo t) NodeDistance | ||
55 | -> IO () | ||
56 | sendQuery IterativeSearch{..} (ni :-> d) = do | ||
57 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | ||
58 | (searchQuery ni) | ||
59 | atomically $ do | ||
60 | modifyTVar searchPendingCount pred | ||
61 | vs <- readTVar searchVisited | ||
62 | -- We only queue a node if it is not yet visited | ||
63 | let insertFoundNode n q | ||
64 | | nodeAddr n `Set.member` vs = q | ||
65 | | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q | ||
66 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns | ||
67 | modifyTVar searchInformant $ MM.insertTake searchK ni d | ||
68 | modifyTVar searchResults $ \s -> foldr Set.insert s rs | ||
69 | |||
70 | search :: | ||
71 | (Ord r, Ord ip) => | ||
72 | IterativeSearch ip r -> IO () | ||
73 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | ||
74 | fix $ \again -> do | ||
75 | join $ atomically $ do | ||
76 | found <- MM.minView <$> readTVar searchQueued | ||
77 | cnt <- readTVar $ searchPendingCount | ||
78 | case found of | ||
79 | Nothing -> retry | ||
80 | Just (ni :-> d, q) -> do | ||
81 | informants <- readTVar searchInformant | ||
82 | if MM.size informants < searchK | ||
83 | && (cnt > 0 || not (MM.null q)) | ||
84 | || PSQ.prio (fromJust $ MM.findMax informants) > d | ||
85 | then do | ||
86 | writeTVar searchQueued q | ||
87 | modifyTVar searchVisited $ Set.insert (nodeAddr ni) | ||
88 | modifyTVar searchPendingCount succ | ||
89 | return $ withAsync g (sendQuery s (ni :-> d)) (const again) | ||
90 | else do | ||
91 | check (cnt == 0) | ||
92 | return $ return () | ||