diff options
author | joe <joe@jerkface.net> | 2017-07-27 00:09:36 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-07-27 00:09:36 -0400 |
commit | 0e20eb6683761362ee282e3188fccdab46b02ee4 (patch) | |
tree | 05043c1b75ba331ffd7d645b544badcecad6657c /src/Network | |
parent | aee5037c333abc77174d4867b75b1ef068fbaf1b (diff) |
peer search.
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 36 |
1 files changed, 21 insertions, 15 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index 8c441cb0..b263e339 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -21,7 +21,7 @@ import qualified Data.Set as Set | |||
21 | import System.IO | 21 | import System.IO |
22 | 22 | ||
23 | import qualified Data.MinMaxPSQ as MM | 23 | import qualified Data.MinMaxPSQ as MM |
24 | ;import Data.MinMaxPSQ (MinMaxPSQ) | 24 | ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') |
25 | import qualified Data.Wrapper.PSQ as PSQ | 25 | import qualified Data.Wrapper.PSQ as PSQ |
26 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) | 26 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) |
27 | import Network.Address hiding (NodeId) | 27 | import Network.Address hiding (NodeId) |
@@ -34,13 +34,13 @@ import Control.Concurrent.Lifted | |||
34 | import GHC.Conc (labelThread) | 34 | import GHC.Conc (labelThread) |
35 | #endif | 35 | #endif |
36 | 36 | ||
37 | data Search nid addr ni r = Search | 37 | data Search nid addr tok ni r = Search |
38 | { searchSpace :: KademliaSpace nid ni | 38 | { searchSpace :: KademliaSpace nid ni |
39 | , searchNodeAddress :: ni -> addr | 39 | , searchNodeAddress :: ni -> addr |
40 | , searchQuery :: nid -> ni -> IO ([ni], [r]) | 40 | , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok)) |
41 | } | 41 | } |
42 | 42 | ||
43 | data SearchState nid addr ni r = SearchState | 43 | data SearchState nid addr tok ni r = SearchState |
44 | {- | 44 | {- |
45 | { searchParams :: Search nid addr ni r | 45 | { searchParams :: Search nid addr ni r |
46 | 46 | ||
@@ -60,7 +60,7 @@ data SearchState nid addr ni r = SearchState | |||
60 | -- | Nodes scheduled to be queried. | 60 | -- | Nodes scheduled to be queried. |
61 | , searchQueued :: TVar (MinMaxPSQ ni nid) | 61 | , searchQueued :: TVar (MinMaxPSQ ni nid) |
62 | -- | The nearest K nodes that issued a reply. | 62 | -- | The nearest K nodes that issued a reply. |
63 | , searchInformant :: TVar (MinMaxPSQ ni nid) | 63 | , searchInformant :: TVar (MinMaxPSQ' ni nid tok) |
64 | -- | This tracks already-queried addresses so we avoid bothering them | 64 | -- | This tracks already-queried addresses so we avoid bothering them |
65 | -- again. XXX: We could probably keep only the pending queries in this | 65 | -- again. XXX: We could probably keep only the pending queries in this |
66 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | 66 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha |
@@ -79,10 +79,10 @@ newSearch :: ( Ord addr | |||
79 | -> (r -> STM Bool) -- receives search results. | 79 | -> (r -> STM Bool) -- receives search results. |
80 | -> nid -- target of search | 80 | -> nid -- target of search |
81 | -} | 81 | -} |
82 | Search nid addr ni r | 82 | Search nid addr tok ni r |
83 | -> nid | 83 | -> nid |
84 | -> [ni] -- Initial nodes to query. | 84 | -> [ni] -- Initial nodes to query. |
85 | -> IO (SearchState nid addr ni r) | 85 | -> IO (SearchState nid addr tok ni r) |
86 | newSearch (Search space nAddr qry) target ns = atomically $ do | 86 | newSearch (Search space nAddr qry) target ns = atomically $ do |
87 | c <- newTVar 0 | 87 | c <- newTVar 0 |
88 | q <- newTVar $ MM.fromList | 88 | q <- newTVar $ MM.fromList |
@@ -99,25 +99,31 @@ searchAlpha = 8 | |||
99 | searchK :: Int | 99 | searchK :: Int |
100 | searchK = 8 | 100 | searchK = 8 |
101 | 101 | ||
102 | sendQuery :: forall addr nid ni r. | 102 | sendQuery :: forall addr nid tok ni r. |
103 | ( Ord addr | 103 | ( Ord addr |
104 | , Ord r | 104 | , Ord r |
105 | , PSQKey nid | 105 | , PSQKey nid |
106 | , PSQKey ni | 106 | , PSQKey ni |
107 | , Show nid | 107 | , Show nid |
108 | ) => | 108 | ) => |
109 | Search nid addr ni r | 109 | Search nid addr tok ni r |
110 | -> nid | 110 | -> nid |
111 | -> (r -> STM Bool) | 111 | -> (r -> STM Bool) |
112 | -> SearchState nid addr ni r | 112 | -> SearchState nid addr tok ni r |
113 | -> Binding ni nid | 113 | -> Binding ni nid |
114 | -> IO () | 114 | -> IO () |
115 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do | 115 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do |
116 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | 116 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) |
117 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | 117 | -- TODO: Should we really be catching ThreadKilled ? |
118 | reply <- handle (\(SomeException e) -> return Nothing) | ||
118 | (searchQuery searchTarget ni) | 119 | (searchQuery searchTarget ni) |
120 | -- (ns,rs) | ||
121 | let tok = error "TODO: token" | ||
119 | atomically $ do | 122 | atomically $ do |
120 | modifyTVar searchPendingCount pred | 123 | modifyTVar searchPendingCount pred |
124 | maybe (return ()) go reply | ||
125 | where | ||
126 | go (ns,rs,tok) = do | ||
121 | vs <- readTVar searchVisited | 127 | vs <- readTVar searchVisited |
122 | -- We only queue a node if it is not yet visited | 128 | -- We only queue a node if it is not yet visited |
123 | let insertFoundNode :: ni | 129 | let insertFoundNode :: ni |
@@ -130,7 +136,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
130 | $ kademliaLocation searchSpace n ) | 136 | $ kademliaLocation searchSpace n ) |
131 | q | 137 | q |
132 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns | 138 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns |
133 | modifyTVar searchInformant $ MM.insertTake searchK ni d | 139 | modifyTVar searchInformant $ MM.insertTake' searchK ni tok d |
134 | flip fix rs $ \loop -> \case | 140 | flip fix rs $ \loop -> \case |
135 | r:rs' -> do | 141 | r:rs' -> do |
136 | wanting <- searchResult r | 142 | wanting <- searchResult r |
@@ -142,7 +148,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
142 | searchIsFinished :: ( Ord addr | 148 | searchIsFinished :: ( Ord addr |
143 | , PSQKey nid | 149 | , PSQKey nid |
144 | , PSQKey ni | 150 | , PSQKey ni |
145 | ) => SearchState nid addr ni r -> STM Bool | 151 | ) => SearchState nid addr tok ni r -> STM Bool |
146 | searchIsFinished SearchState{ ..} = do | 152 | searchIsFinished SearchState{ ..} = do |
147 | q <- readTVar searchQueued | 153 | q <- readTVar searchQueued |
148 | cnt <- readTVar searchPendingCount | 154 | cnt <- readTVar searchPendingCount |
@@ -153,7 +159,7 @@ searchIsFinished SearchState{ ..} = do | |||
153 | && ( PSQ.prio (fromJust $ MM.findMax informants) | 159 | && ( PSQ.prio (fromJust $ MM.findMax informants) |
154 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 160 | <= PSQ.prio (fromJust $ MM.findMin q)))) |
155 | 161 | ||
156 | searchCancel :: SearchState nid addr ni r -> STM () | 162 | searchCancel :: SearchState nid addr tok ni r -> STM () |
157 | searchCancel SearchState{..} = do | 163 | searchCancel SearchState{..} = do |
158 | writeTVar searchPendingCount 0 | 164 | writeTVar searchPendingCount 0 |
159 | writeTVar searchQueued MM.empty | 165 | writeTVar searchQueued MM.empty |
@@ -164,7 +170,7 @@ search :: | |||
164 | , PSQKey nid | 170 | , PSQKey nid |
165 | , PSQKey ni | 171 | , PSQKey ni |
166 | , Show nid | 172 | , Show nid |
167 | ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r) | 173 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) |
168 | search sch@Search{..} buckets target result = do | 174 | search sch@Search{..} buckets target result = do |
169 | let ns = R.kclosest searchSpace searchK target buckets | 175 | let ns = R.kclosest searchSpace searchK target buckets |
170 | st <- newSearch sch target ns | 176 | st <- newSearch sch target ns |