summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-07-27 00:09:36 -0400
committerjoe <joe@jerkface.net>2017-07-27 00:09:36 -0400
commit0e20eb6683761362ee282e3188fccdab46b02ee4 (patch)
tree05043c1b75ba331ffd7d645b544badcecad6657c /src/Network
parentaee5037c333abc77174d4867b75b1ef068fbaf1b (diff)
peer search.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs36
1 files changed, 21 insertions, 15 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
index 8c441cb0..b263e339 100644
--- a/src/Network/BitTorrent/DHT/Search.hs
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -21,7 +21,7 @@ import qualified Data.Set as Set
21import System.IO 21import System.IO
22 22
23import qualified Data.MinMaxPSQ as MM 23import qualified Data.MinMaxPSQ as MM
24 ;import Data.MinMaxPSQ (MinMaxPSQ) 24 ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ')
25import qualified Data.Wrapper.PSQ as PSQ 25import qualified Data.Wrapper.PSQ as PSQ
26 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) 26 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey)
27import Network.Address hiding (NodeId) 27import Network.Address hiding (NodeId)
@@ -34,13 +34,13 @@ import Control.Concurrent.Lifted
34import GHC.Conc (labelThread) 34import GHC.Conc (labelThread)
35#endif 35#endif
36 36
37data Search nid addr ni r = Search 37data Search nid addr tok ni r = Search
38 { searchSpace :: KademliaSpace nid ni 38 { searchSpace :: KademliaSpace nid ni
39 , searchNodeAddress :: ni -> addr 39 , searchNodeAddress :: ni -> addr
40 , searchQuery :: nid -> ni -> IO ([ni], [r]) 40 , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok))
41 } 41 }
42 42
43data SearchState nid addr ni r = SearchState 43data SearchState nid addr tok ni r = SearchState
44 {- 44 {-
45 { searchParams :: Search nid addr ni r 45 { searchParams :: Search nid addr ni r
46 46
@@ -60,7 +60,7 @@ data SearchState nid addr ni r = SearchState
60 -- | Nodes scheduled to be queried. 60 -- | Nodes scheduled to be queried.
61 , searchQueued :: TVar (MinMaxPSQ ni nid) 61 , searchQueued :: TVar (MinMaxPSQ ni nid)
62 -- | The nearest K nodes that issued a reply. 62 -- | The nearest K nodes that issued a reply.
63 , searchInformant :: TVar (MinMaxPSQ ni nid) 63 , searchInformant :: TVar (MinMaxPSQ' ni nid tok)
64 -- | This tracks already-queried addresses so we avoid bothering them 64 -- | This tracks already-queried addresses so we avoid bothering them
65 -- again. XXX: We could probably keep only the pending queries in this 65 -- again. XXX: We could probably keep only the pending queries in this
66 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha 66 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
@@ -79,10 +79,10 @@ newSearch :: ( Ord addr
79 -> (r -> STM Bool) -- receives search results. 79 -> (r -> STM Bool) -- receives search results.
80 -> nid -- target of search 80 -> nid -- target of search
81 -} 81 -}
82 Search nid addr ni r 82 Search nid addr tok ni r
83 -> nid 83 -> nid
84 -> [ni] -- Initial nodes to query. 84 -> [ni] -- Initial nodes to query.
85 -> IO (SearchState nid addr ni r) 85 -> IO (SearchState nid addr tok ni r)
86newSearch (Search space nAddr qry) target ns = atomically $ do 86newSearch (Search space nAddr qry) target ns = atomically $ do
87 c <- newTVar 0 87 c <- newTVar 0
88 q <- newTVar $ MM.fromList 88 q <- newTVar $ MM.fromList
@@ -99,25 +99,31 @@ searchAlpha = 8
99searchK :: Int 99searchK :: Int
100searchK = 8 100searchK = 8
101 101
102sendQuery :: forall addr nid ni r. 102sendQuery :: forall addr nid tok ni r.
103 ( Ord addr 103 ( Ord addr
104 , Ord r 104 , Ord r
105 , PSQKey nid 105 , PSQKey nid
106 , PSQKey ni 106 , PSQKey ni
107 , Show nid 107 , Show nid
108 ) => 108 ) =>
109 Search nid addr ni r 109 Search nid addr tok ni r
110 -> nid 110 -> nid
111 -> (r -> STM Bool) 111 -> (r -> STM Bool)
112 -> SearchState nid addr ni r 112 -> SearchState nid addr tok ni r
113 -> Binding ni nid 113 -> Binding ni nid
114 -> IO () 114 -> IO ()
115sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do 115sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do
116 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) 116 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
117 (ns,rs) <- handle (\(SomeException e) -> return ([],[])) 117 -- TODO: Should we really be catching ThreadKilled ?
118 reply <- handle (\(SomeException e) -> return Nothing)
118 (searchQuery searchTarget ni) 119 (searchQuery searchTarget ni)
120 -- (ns,rs)
121 let tok = error "TODO: token"
119 atomically $ do 122 atomically $ do
120 modifyTVar searchPendingCount pred 123 modifyTVar searchPendingCount pred
124 maybe (return ()) go reply
125 where
126 go (ns,rs,tok) = do
121 vs <- readTVar searchVisited 127 vs <- readTVar searchVisited
122 -- We only queue a node if it is not yet visited 128 -- We only queue a node if it is not yet visited
123 let insertFoundNode :: ni 129 let insertFoundNode :: ni
@@ -130,7 +136,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
130 $ kademliaLocation searchSpace n ) 136 $ kademliaLocation searchSpace n )
131 q 137 q
132 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns 138 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
133 modifyTVar searchInformant $ MM.insertTake searchK ni d 139 modifyTVar searchInformant $ MM.insertTake' searchK ni tok d
134 flip fix rs $ \loop -> \case 140 flip fix rs $ \loop -> \case
135 r:rs' -> do 141 r:rs' -> do
136 wanting <- searchResult r 142 wanting <- searchResult r
@@ -142,7 +148,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
142searchIsFinished :: ( Ord addr 148searchIsFinished :: ( Ord addr
143 , PSQKey nid 149 , PSQKey nid
144 , PSQKey ni 150 , PSQKey ni
145 ) => SearchState nid addr ni r -> STM Bool 151 ) => SearchState nid addr tok ni r -> STM Bool
146searchIsFinished SearchState{ ..} = do 152searchIsFinished SearchState{ ..} = do
147 q <- readTVar searchQueued 153 q <- readTVar searchQueued
148 cnt <- readTVar searchPendingCount 154 cnt <- readTVar searchPendingCount
@@ -153,7 +159,7 @@ searchIsFinished SearchState{ ..} = do
153 && ( PSQ.prio (fromJust $ MM.findMax informants) 159 && ( PSQ.prio (fromJust $ MM.findMax informants)
154 <= PSQ.prio (fromJust $ MM.findMin q)))) 160 <= PSQ.prio (fromJust $ MM.findMin q))))
155 161
156searchCancel :: SearchState nid addr ni r -> STM () 162searchCancel :: SearchState nid addr tok ni r -> STM ()
157searchCancel SearchState{..} = do 163searchCancel SearchState{..} = do
158 writeTVar searchPendingCount 0 164 writeTVar searchPendingCount 0
159 writeTVar searchQueued MM.empty 165 writeTVar searchQueued MM.empty
@@ -164,7 +170,7 @@ search ::
164 , PSQKey nid 170 , PSQKey nid
165 , PSQKey ni 171 , PSQKey ni
166 , Show nid 172 , Show nid
167 ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r) 173 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r)
168search sch@Search{..} buckets target result = do 174search sch@Search{..} buckets target result = do
169 let ns = R.kclosest searchSpace searchK target buckets 175 let ns = R.kclosest searchSpace searchK target buckets
170 st <- newSearch sch target ns 176 st <- newSearch sch target ns