summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Search.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-02-01 03:21:52 -0500
committerjoe <joe@jerkface.net>2017-02-01 03:21:52 -0500
commitc51e64666b672637843a04c2f279d7d0c9eed01c (patch)
treed6f50018659ac3c5c3d72ee9bde3824514bd9f6a /src/Network/BitTorrent/DHT/Search.hs
parent0d1de683de78a70ce9c054b444bb6f19c39d112c (diff)
New improved iterative search algorithm.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Search.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs92
1 files changed, 92 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
new file mode 100644
index 00000000..1fe73c30
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -0,0 +1,92 @@
1{-# LANGUAGE PatternSynonyms #-}
2{-# LANGUAGE RecordWildCards #-}
3module Network.BitTorrent.DHT.Search where
4
5import Control.Concurrent
6import Control.Concurrent.Async.Pool
7import Control.Concurrent.STM
8import Control.Exception
9import Control.Monad
10import Data.Bool
11import Data.Function
12import Data.List
13import qualified Data.Map.Strict as Map
14 ;import Data.Map.Strict (Map)
15import Data.Maybe
16import qualified Data.Set as Set
17 ;import Data.Set (Set)
18import System.IO
19
20import qualified Data.MinMaxPSQ as MM
21 ;import Data.MinMaxPSQ (MinMaxPSQ)
22import qualified Data.Wrapper.PSQ as PSQ
23 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ)
24import Network.BitTorrent.Address
25
26data IterativeSearch ip r = IterativeSearch
27 { searchTarget :: NodeId
28 , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r])
29 , searchPendingCount :: TVar Int
30 , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance)
31 , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance)
32 , searchVisited :: TVar (Set (NodeAddr ip))
33 , searchResults :: TVar (Set r)
34 }
35
36newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r]))
37 -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r)
38newSearch qry target ns = atomically $ do
39 c <- newTVar 0
40 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns
41 i <- newTVar MM.empty
42 v <- newTVar Set.empty
43 r <- newTVar Set.empty
44 return $ IterativeSearch target qry c q i v r
45
46searchAlpha :: Int
47searchAlpha = 3
48
49searchK :: Int
50searchK = 8
51
52sendQuery :: (Ord a, Ord t) =>
53 IterativeSearch t a
54 -> Binding (NodeInfo t) NodeDistance
55 -> IO ()
56sendQuery IterativeSearch{..} (ni :-> d) = do
57 (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
58 (searchQuery ni)
59 atomically $ do
60 modifyTVar searchPendingCount pred
61 vs <- readTVar searchVisited
62 -- We only queue a node if it is not yet visited
63 let insertFoundNode n q
64 | nodeAddr n `Set.member` vs = q
65 | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q
66 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
67 modifyTVar searchInformant $ MM.insertTake searchK ni d
68 modifyTVar searchResults $ \s -> foldr Set.insert s rs
69
70search ::
71 (Ord r, Ord ip) =>
72 IterativeSearch ip r -> IO ()
73search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do
74 fix $ \again -> do
75 join $ atomically $ do
76 found <- MM.minView <$> readTVar searchQueued
77 cnt <- readTVar $ searchPendingCount
78 case found of
79 Nothing -> retry
80 Just (ni :-> d, q) -> do
81 informants <- readTVar searchInformant
82 if MM.size informants < searchK
83 && (cnt > 0 || not (MM.null q))
84 || PSQ.prio (fromJust $ MM.findMax informants) > d
85 then do
86 writeTVar searchQueued q
87 modifyTVar searchVisited $ Set.insert (nodeAddr ni)
88 modifyTVar searchPendingCount succ
89 return $ withAsync g (sendQuery s (ni :-> d)) (const again)
90 else do
91 check (cnt == 0)
92 return $ return ()