summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Search.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-07-22 01:15:44 -0400
committerjoe <joe@jerkface.net>2017-07-22 01:15:44 -0400
commit77f6b96492223e7d7b147dac8d026e0b6f6a651b (patch)
tree661e2115a814de82ba251bccf0ab21ae4dfd1ff1 /src/Network/BitTorrent/DHT/Search.hs
parent7f1eb53d34ea6dda02cae1934b5011e38de248a6 (diff)
Implemented bucket refresh for Mainline.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Search.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs78
1 files changed, 57 insertions, 21 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
index c562b988..a9efba89 100644
--- a/src/Network/BitTorrent/DHT/Search.hs
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -3,6 +3,7 @@
3{-# LANGUAGE RecordWildCards #-} 3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-} 4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-} 5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE LambdaCase #-}
6module Network.BitTorrent.DHT.Search where 7module Network.BitTorrent.DHT.Search where
7 8
8import Control.Concurrent 9import Control.Concurrent
@@ -28,39 +29,64 @@ import Network.Address hiding (NodeId)
28import Network.DatagramServer.Types 29import Network.DatagramServer.Types
29import Network.DHT.Routing as R 30import Network.DHT.Routing as R
30 31
31data IterativeSearch nid addr ni r = IterativeSearch 32data Search nid addr ni r = Search
32 { searchTarget :: nid 33 { searchSpace :: KademliaSpace nid ni
33 , searchSpace :: KademliaSpace nid ni
34 , searchNodeAddress :: ni -> addr 34 , searchNodeAddress :: ni -> addr
35 , searchQuery :: ni -> IO ([ni], [r]) 35 , searchQuery :: ni -> IO ([ni], [r])
36 -- | The number of pending queries. Incremented before any query is sent 36 }
37
38data SearchState nid addr ni r = SearchState
39 {-
40 { searchParams :: Search nid addr ni r
41
42 , searchTarget :: nid
43 -- | This action will be performed at least once on each search result.
44 -- It may be invoked multiple times since different nodes may report the
45 -- same result. If the action returns 'False', the search will be
46 -- aborted, otherwise it will continue until it is decided that we've
47 -- asked the closest K nodes to the target.
48 , searchResult :: r -> STM Bool
49
50 -}
51
52 { -- | The number of pending queries. Incremented before any query is sent
37 -- and decremented when we get a reply. 53 -- and decremented when we get a reply.
38 , searchPendingCount :: TVar Int 54 searchPendingCount :: TVar Int
39 -- | Nodes scheduled to be queried. 55 -- | Nodes scheduled to be queried.
40 , searchQueued :: TVar (MinMaxPSQ ni nid) 56 , searchQueued :: TVar (MinMaxPSQ ni nid)
41 -- | The nearest K nodes that issued a reply. 57 -- | The nearest K nodes that issued a reply.
42 , searchInformant :: TVar (MinMaxPSQ ni nid) 58 , searchInformant :: TVar (MinMaxPSQ ni nid)
59 -- | This tracks already-queried addresses so we avoid bothering them
60 -- again. XXX: We could probably keep only the pending queries in this
61 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
62 -- should limit the number of outstanding queries.
43 , searchVisited :: TVar (Set addr) 63 , searchVisited :: TVar (Set addr)
44 , searchResults :: TVar (Set r)
45 } 64 }
46 65
47newSearch :: ( Ord addr 66newSearch :: ( Ord addr
48 , PSQKey nid 67 , PSQKey nid
49 , PSQKey ni 68 , PSQKey ni
50 ) => 69 ) =>
70 {-
51 KademliaSpace nid ni 71 KademliaSpace nid ni
52 -> (ni -> addr) 72 -> (ni -> addr)
53 -> (ni -> IO ([ni], [r])) 73 -> (ni -> IO ([ni], [r])) -- the query action.
54 -> nid -> [ni] -> IO (IterativeSearch nid addr ni r) 74 -> (r -> STM Bool) -- receives search results.
55newSearch space nAddr qry target ns = atomically $ do 75 -> nid -- target of search
76 -}
77 Search nid addr ni r
78 -> nid
79 -> [ni] -- Initial nodes to query.
80 -> IO (SearchState nid addr ni r)
81newSearch (Search space nAddr qry) target ns = atomically $ do
56 c <- newTVar 0 82 c <- newTVar 0
57 q <- newTVar $ MM.fromList 83 q <- newTVar $ MM.fromList
58 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) 84 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
59 $ ns 85 $ ns
60 i <- newTVar MM.empty 86 i <- newTVar MM.empty
61 v <- newTVar Set.empty 87 v <- newTVar Set.empty
62 r <- newTVar Set.empty 88 return -- (Search space nAddr qry) , r , target
63 return $ IterativeSearch target space nAddr qry c q i v r 89 ( SearchState c q i v )
64 90
65searchAlpha :: Int 91searchAlpha :: Int
66searchAlpha = 3 92searchAlpha = 3
@@ -74,10 +100,13 @@ sendQuery :: forall addr nid ni r.
74 , PSQKey nid 100 , PSQKey nid
75 , PSQKey ni 101 , PSQKey ni
76 ) => 102 ) =>
77 IterativeSearch nid addr ni r 103 Search nid addr ni r
104 -> nid
105 -> (r -> STM Bool)
106 -> SearchState nid addr ni r
78 -> Binding ni nid 107 -> Binding ni nid
79 -> IO () 108 -> IO ()
80sendQuery IterativeSearch{..} (ni :-> d) = do 109sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do
81 (ns,rs) <- handle (\(SomeException e) -> return ([],[])) 110 (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
82 (searchQuery ni) 111 (searchQuery ni)
83 atomically $ do 112 atomically $ do
@@ -95,14 +124,19 @@ sendQuery IterativeSearch{..} (ni :-> d) = do
95 q 124 q
96 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns 125 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
97 modifyTVar searchInformant $ MM.insertTake searchK ni d 126 modifyTVar searchInformant $ MM.insertTake searchK ni d
98 modifyTVar searchResults $ \s -> foldr Set.insert s rs 127 flip fix rs $ \loop -> \case
128 r:rs' -> do
129 wanting <- searchResult r
130 if wanting then loop rs'
131 else searchCancel sch
132 [] -> return ()
99 133
100 134
101searchIsFinished :: ( Ord addr 135searchIsFinished :: ( Ord addr
102 , PSQKey nid 136 , PSQKey nid
103 , PSQKey ni 137 , PSQKey ni
104 ) => IterativeSearch nid addr ni r -> STM Bool 138 ) => SearchState nid addr ni r -> STM Bool
105searchIsFinished IterativeSearch{..} = do 139searchIsFinished SearchState{ ..} = do
106 q <- readTVar searchQueued 140 q <- readTVar searchQueued
107 cnt <- readTVar searchPendingCount 141 cnt <- readTVar searchPendingCount
108 informants <- readTVar searchInformant 142 informants <- readTVar searchInformant
@@ -112,8 +146,8 @@ searchIsFinished IterativeSearch{..} = do
112 && ( PSQ.prio (fromJust $ MM.findMax informants) 146 && ( PSQ.prio (fromJust $ MM.findMax informants)
113 <= PSQ.prio (fromJust $ MM.findMin q)))) 147 <= PSQ.prio (fromJust $ MM.findMin q))))
114 148
115searchCancel :: IterativeSearch nid addr ni r -> IO () 149searchCancel :: SearchState nid addr ni r -> STM ()
116searchCancel IterativeSearch{..} = atomically $ do 150searchCancel SearchState{..} = do
117 writeTVar searchPendingCount 0 151 writeTVar searchPendingCount 0
118 writeTVar searchQueued MM.empty 152 writeTVar searchQueued MM.empty
119 153
@@ -122,8 +156,10 @@ search ::
122 , Ord addr 156 , Ord addr
123 , PSQKey nid 157 , PSQKey nid
124 , PSQKey ni 158 , PSQKey ni
125 ) => IterativeSearch nid addr ni r -> IO () 159 ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO ()
126search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do 160search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> do
161 let ns = R.kclosest searchSpace searchK target buckets
162 s@SearchState{..} <- newSearch sch target ns
127 fix $ \again -> do 163 fix $ \again -> do
128 join $ atomically $ do 164 join $ atomically $ do
129 cnt <- readTVar $ searchPendingCount 165 cnt <- readTVar $ searchPendingCount
@@ -141,7 +177,7 @@ search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do
141 do writeTVar searchQueued q 177 do writeTVar searchQueued q
142 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) 178 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
143 modifyTVar searchPendingCount succ 179 modifyTVar searchPendingCount succ
144 return $ withAsync g (sendQuery s (ni :-> d)) (const again) 180 return $ withAsync g (sendQuery sch target result s (ni :-> d)) (const again)
145 _ -> -- Otherwise, we are finished. 181 _ -> -- Otherwise, we are finished.
146 do check (cnt == 0) 182 do check (cnt == 0)
147 return $ return () 183 return $ return ()