summaryrefslogtreecommitdiff
path: root/examples/dhtd.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-07-28 03:11:02 -0400
committerjoe <joe@jerkface.net>2017-07-28 03:11:02 -0400
commit45a1baad8bf90a07654b6ade1a9ed2e5a2d5c92b (patch)
treeb3a3eab385fb8621c2fa5dcaca0639f8fac77ecf /examples/dhtd.hs
parent81b49153a856d497a562bc1bb7867d319a26a830 (diff)
rewrite: search feature.
Diffstat (limited to 'examples/dhtd.hs')
-rw-r--r--examples/dhtd.hs171
1 files changed, 157 insertions, 14 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 6477fac4..33fd30c9 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -20,9 +20,13 @@ import Control.DeepSeq
20import Control.Exception 20import Control.Exception
21import Control.Monad 21import Control.Monad
22import Data.Char 22import Data.Char
23import Data.Hashable
23import Data.List 24import Data.List
24import qualified Data.Map as Map 25import qualified Data.Map as Map
26import Data.Maybe
27import qualified Data.Set as Set
25import Data.Time.Clock 28import Data.Time.Clock
29import GHC.Conc (threadStatus,ThreadStatus(..))
26import GHC.Stats 30import GHC.Stats
27import Network.Socket 31import Network.Socket
28import System.Environment 32import System.Environment
@@ -53,6 +57,7 @@ import Control.Concurrent.Async.Pool
53import System.IO.Error 57import System.IO.Error
54import qualified Data.Serialize as S 58import qualified Data.Serialize as S
55import Network.BitTorrent.DHT.ContactInfo as Peers 59import Network.BitTorrent.DHT.ContactInfo as Peers
60import qualified Data.MinMaxPSQ as MM
56 61
57showReport :: [(String,String)] -> String 62showReport :: [(String,String)] -> String
58showReport kvs = do 63showReport kvs = do
@@ -72,23 +77,36 @@ hPutClient h s = hPutStr h ('.' : marshalForClient s)
72hPutClientChunk :: Handle -> String -> IO () 77hPutClientChunk :: Handle -> String -> IO ()
73hPutClientChunk h s = hPutStr h (' ' : marshalForClient s) 78hPutClientChunk h s = hPutStr h (' ' : marshalForClient s)
74 79
75data DHTQuery nid ni = forall addr r tok. DHTQuery 80data DHTQuery nid ni = forall addr r tok. Ord addr => DHTQuery
76 { qsearch :: Search nid addr tok ni r 81 { qsearch :: Search nid addr tok ni r
77 , qhandler :: ni -> nid -> IO ([ni], [r], tok) 82 , qhandler :: ni -> nid -> IO ([ni], [r], tok)
78 , qshowR :: r -> String 83 , qshowR :: r -> String
79 , qshowTok :: tok -> Maybe String 84 , qshowTok :: tok -> Maybe String
80 } 85 }
81 86
87data DHTSearch nid ni = forall addr tok r. DHTSearch
88 { searchThread :: ThreadId
89 , searchState :: SearchState nid addr tok ni r
90 , searchShowTok :: tok -> Maybe String
91 , searchResults :: TVar (Set.Set String)
92 }
93
82data DHT = forall nid ni. ( Show ni 94data DHT = forall nid ni. ( Show ni
83 , Read ni 95 , Read ni
84 , ToJSON ni 96 , ToJSON ni
85 , FromJSON ni 97 , FromJSON ni
98 , Ord ni
99 , Hashable ni
100 , Show nid
101 , Ord nid
102 , Hashable nid
86 ) => 103 ) =>
87 DHT 104 DHT
88 { dhtBuckets :: TVar (BucketList ni) 105 { dhtBuckets :: TVar (BucketList ni)
89 , dhtPing :: ni -> IO Bool 106 , dhtPing :: ni -> IO Bool
90 , dhtQuery :: Map.Map String (DHTQuery nid ni) 107 , dhtQuery :: Map.Map String (DHTQuery nid ni)
91 , dhtParseId :: String -> Either String nid 108 , dhtParseId :: String -> Either String nid
109 , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni))
92 } 110 }
93 111
94nodesFileName :: String -> String 112nodesFileName :: String -> String
@@ -135,26 +153,102 @@ reportTable bkts = map (show *** show . fst)
135 $ R.toList 153 $ R.toList
136 $ bkts 154 $ bkts
137 155
138reportResult :: Show ni => 156reportResult ::
139 String 157 String
140 -> (r -> String) 158 -> (r -> String)
141 -> (tok -> Maybe String) 159 -> (tok -> Maybe String)
160 -> (ni -> String)
142 -> Handle 161 -> Handle
143 -> Either String ([ni],[r],tok) 162 -> Either String ([ni],[r],tok)
144 -> IO () 163 -> IO ()
145reportResult meth showR showTok h (Left e) = hPutClient h e 164reportResult meth showR showTok showN h (Left e) = hPutClient h e
146reportResult meth showR showTok h (Right (ns,rs,tok)) = do 165reportResult meth showR showTok showN h (Right (ns,rs,tok)) = do
147 hPutClient h $ showReport report 166 hPutClient h $ showReport report
148 where 167 where
149 report = intercalate [("","")] [ tok_r , node_r , result_r ] 168 report = intercalate [("","")] [ tok_r , node_r , result_r ]
150 169
151 tok_r = maybe [] (pure . ("token:",)) $ showTok tok 170 tok_r = maybe [] (pure . ("token:",)) $ showTok tok
152 171
153 node_r = map ( ("n",) . show ) ns 172 node_r = map ( ("n",) . showN ) ns
154 173
155 result_r | (meth=="node") = [] 174 result_r | (meth=="node") = []
156 | otherwise = map ( (take 1 meth,) . showR ) rs 175 | otherwise = map ( (take 1 meth,) . showR ) rs
157 176
177-- example:
178-- * 10 peer 141d6c6ee2810f46d28bbe8373d4f454a4122535
179-- - 1 peer 141d6c6ee2810f46d28bbe8373d4f454a4122535
180-- 22 node 141d6c6ee2810f46d28bbe8373d4f454a4122535
181--
182-- key: '*' in progress
183-- '-' stopped
184-- ' ' finished
185showSearches :: ( Show nid
186 , Ord nid
187 , Hashable nid
188 , Ord ni
189 , Hashable ni
190 ) => Map.Map (String,nid) (DHTSearch nid ni) -> IO String
191showSearches searches = do
192 tups <- forM (Map.toList searches) $ \((meth,nid),DHTSearch{..}) -> do
193 (is'fin, cnt) <- atomically $
194 (,) <$> searchIsFinished searchState
195 <*> (Set.size <$> readTVar searchResults)
196 tstat <- threadStatus searchThread
197 let stat = case tstat of
198 _ | is'fin -> ' '
199 ThreadFinished -> '-'
200 ThreadDied -> '-'
201 _ -> '*'
202 return (stat,show cnt,meth,show nid)
203 let cnt'width = maximum $ map (\(_,cnt,_,_)->length cnt) tups
204 mth'width = maximum $ map (\(_,_,mth,_)->length mth) tups
205 return $ do -- List monad.
206 (stat,cnt,meth,nid) <- tups
207 printf " %c %-*s %-*s %s\n" stat cnt'width cnt mth'width meth nid
208
209forkSearch ::
210 ( Ord nid
211 , Hashable nid
212 , Ord ni
213 , Hashable ni
214 , Show nid
215 ) =>
216 String
217 -> nid
218 -> DHTQuery nid ni
219 -> TVar (Map.Map (String,nid) (DHTSearch nid ni))
220 -> TVar (BucketList ni)
221 -> ThreadId
222 -> TVar (Maybe (IO ()))
223 -> STM ()
224forkSearch method nid DHTQuery{qsearch,qshowTok,qshowR} dhtSearches dhtBuckets tid kvar = do
225 ns <- R.kclosest (searchSpace qsearch) searchK nid <$> readTVar dhtBuckets
226 st <- newSearch qsearch nid ns
227 results <- newTVar Set.empty
228 let storeResult r = modifyTVar' results (Set.insert (qshowR r))
229 >> return True
230 new = DHTSearch
231 { searchThread = tid
232 , searchState = st
233 , searchShowTok = qshowTok
234 , searchResults = results
235 }
236 modifyTVar' dhtSearches $ Map.insert (method,nid) new
237 writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st
238
239reportSearchResults meth h DHTSearch{searchShowTok,searchState,searchResults} = do
240 (ns,rs) <- atomically $ do
241 mm <- readTVar $ searchInformant searchState
242 rset <- readTVar searchResults
243 let ns = map (\(MM.Binding ni tok _) -> (ni,tok))
244 $ MM.toList mm
245 rs = Set.toList rset
246 return (ns,rs)
247 let n'width = succ $ maximum $ map (length . show . fst) ns
248 showN (n,tok) = take n'width (show n ++ repeat ' ') ++ (fromMaybe "" $ searchShowTok tok)
249 ns' = map showN ns
250 reportResult meth id (const Nothing) id h (Right (ns',rs,()))
251
158data Session = Session 252data Session = Session
159 { netname :: String 253 { netname :: String
160 , dhts :: Map.Map String DHT 254 , dhts :: Map.Map String DHT
@@ -189,6 +283,7 @@ clientSession s@Session{..} sock cnum h = do
189 tm <- getCurrentTime 283 tm <- getCurrentTime
190 let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts 284 let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts
191 hPutClient h $ showReport r 285 hPutClient h $ showReport r
286#endif
192 ("mem", s) -> cmd0 $ do 287 ("mem", s) -> cmd0 $ do
193 case s of 288 case s of
194 "gc" -> do hPutClient h "Performing garbage collection..." 289 "gc" -> do hPutClient h "Performing garbage collection..."
@@ -221,7 +316,6 @@ clientSession s@Session{..} sock cnum h = do
221 else hPutClient h "Run with +RTS -T to obtain live memory-usage information." 316 else hPutClient h "Run with +RTS -T to obtain live memory-usage information."
222 _ -> hPutClient h "error." 317 _ -> hPutClient h "error."
223 318
224#endif
225 ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts 319 ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts
226 -> cmd0 $ do 320 -> cmd0 $ do
227 bkts <- atomically $ readTVar dhtBuckets 321 bkts <- atomically $ readTVar dhtBuckets
@@ -252,7 +346,7 @@ clientSession s@Session{..} sock cnum h = do
252 (goTarget qry) 346 (goTarget qry)
253 $ dhtParseId nidstr 347 $ dhtParseId nidstr
254 goTarget DHTQuery{..} nid = 348 goTarget DHTQuery{..} nid =
255 go nid >>= reportResult method qshowR qshowTok h 349 go nid >>= reportResult method qshowR qshowTok show h
256 where 350 where
257 go | null destination = fmap Right . qhandler self 351 go | null destination = fmap Right . qhandler self
258 | otherwise = case readEither destination of 352 | otherwise = case readEither destination of
@@ -263,6 +357,49 @@ clientSession s@Session{..} sock cnum h = do
263 goQuery 357 goQuery
264 $ Map.lookup method dhtQuery 358 $ Map.lookup method dhtQuery
265 359
360 ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts
361 -> cmd0 $ do
362 let (method,xs) = break isSpace s
363 (nidstr,ys) = break isSpace $ dropWhile isSpace xs
364 presentSearches = hPutClient h
365 =<< showSearches
366 =<< atomically (readTVar dhtSearches)
367 goTarget qry nid = do
368 kvar <- atomically $ newTVar Nothing
369 -- Forking a thread, but it may ubruptly quit if the following
370 -- STM action decides not to add a new search. This is so that
371 -- I can store the ThreadId into new DHTSearch structure.
372 tid <- fork $ join $ atomically (readTVar kvar >>= maybe retry return)
373 join $ atomically $ do
374 schs <- readTVar dhtSearches
375 case Map.lookup (method,nid) schs of
376 Nothing -> do forkSearch method nid qry dhtSearches dhtBuckets tid kvar
377 return $ presentSearches
378 Just sch -> do writeTVar kvar (Just $ return ())
379 return $ reportSearchResults method h sch
380 goQuery qry = either (hPutClient h . ("Bad search target: "++))
381 (goTarget qry)
382 $ dhtParseId nidstr
383 if null method then presentSearches
384 else maybe (hPutClient h ("Unsupported method: "++method))
385 goQuery
386 $ Map.lookup method dhtQuery
387
388 ("x", s) | Just DHT{..} <- Map.lookup netname dhts
389 -> cmd0 $ do
390 let (method,xs) = break isSpace s
391 (nidstr,ys) = break isSpace $ dropWhile isSpace xs
392 go nid = join $ atomically $ do
393 schs <- readTVar dhtSearches
394 case Map.lookup (method,nid) schs of
395 Nothing -> return $ hPutClient h "No match."
396 Just DHTSearch{searchThread} -> do
397 modifyTVar' dhtSearches (Map.delete (method,nid))
398 return $ do
399 killThread searchThread
400 hPutClient h "Removed search."
401 either (hPutClient h . ("Bad search target: "++)) go $ dhtParseId nidstr
402
266 ("save", _) | Just dht <- Map.lookup netname dhts 403 ("save", _) | Just dht <- Map.lookup netname dhts
267 -> cmd0 $ do 404 -> cmd0 $ do
268 saveNodes netname dht 405 saveNodes netname dht
@@ -298,8 +435,11 @@ clientSession s@Session{..} sock cnum h = do
298readExternals :: [TVar (BucketList Mainline.NodeInfo)] -> IO [SockAddr] 435readExternals :: [TVar (BucketList Mainline.NodeInfo)] -> IO [SockAddr]
299readExternals vars = do 436readExternals vars = do
300 as <- atomically $ mapM (fmap (Mainline.nodeAddr . selfNode) . readTVar) vars 437 as <- atomically $ mapM (fmap (Mainline.nodeAddr . selfNode) . readTVar) vars
438 let unspecified (SockAddrInet _ 0) = True
439 unspecified (SockAddrInet6 _ _ (0,0,0,0) _) = True
440 unspecified _ = False
301 -- TODO: Filter to only global addresses? 441 -- TODO: Filter to only global addresses?
302 return as 442 return $ filter (not . unspecified) as
303 443
304defaultPort = "6881" 444defaultPort = "6881"
305 445
@@ -324,6 +464,8 @@ main = do
324 tox <- return $ error "TODO: Tox.newClient" 464 tox <- return $ error "TODO: Tox.newClient"
325 quitTox <- return $ return () -- TODO: forkListener tox 465 quitTox <- return $ return () -- TODO: forkListener tox
326 466
467 mainlineSearches <- atomically $ newTVar Map.empty
468
327 let mainlineDHT bkts = DHT 469 let mainlineDHT bkts = DHT
328 { dhtBuckets = bkts btR 470 { dhtBuckets = bkts btR
329 , dhtPing = Mainline.ping bt 471 , dhtPing = Mainline.ping bt
@@ -342,7 +484,8 @@ main = do
342 show 484 show
343 (Just . show)) 485 (Just . show))
344 ] 486 ]
345 , dhtParseId = readEither :: String -> Either String Mainline.NodeId 487 , dhtParseId = readEither :: String -> Either String Mainline.NodeId
488 , dhtSearches = mainlineSearches
346 } 489 }
347 dhts = Map.fromList 490 dhts = Map.fromList
348 [ ("bt4", mainlineDHT Mainline.routing4) 491 [ ("bt4", mainlineDHT Mainline.routing4)