From 45a1baad8bf90a07654b6ade1a9ed2e5a2d5c92b Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 28 Jul 2017 03:11:02 -0400 Subject: rewrite: search feature. --- examples/dhtd.hs | 171 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 157 insertions(+), 14 deletions(-) (limited to 'examples') diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 6477fac4..33fd30c9 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -20,9 +20,13 @@ import Control.DeepSeq import Control.Exception import Control.Monad import Data.Char +import Data.Hashable import Data.List -import qualified Data.Map as Map +import qualified Data.Map as Map +import Data.Maybe +import qualified Data.Set as Set import Data.Time.Clock +import GHC.Conc (threadStatus,ThreadStatus(..)) import GHC.Stats import Network.Socket import System.Environment @@ -53,6 +57,7 @@ import Control.Concurrent.Async.Pool import System.IO.Error import qualified Data.Serialize as S import Network.BitTorrent.DHT.ContactInfo as Peers +import qualified Data.MinMaxPSQ as MM showReport :: [(String,String)] -> String showReport kvs = do @@ -72,23 +77,36 @@ hPutClient h s = hPutStr h ('.' : marshalForClient s) hPutClientChunk :: Handle -> String -> IO () hPutClientChunk h s = hPutStr h (' ' : marshalForClient s) -data DHTQuery nid ni = forall addr r tok. DHTQuery +data DHTQuery nid ni = forall addr r tok. Ord addr => DHTQuery { qsearch :: Search nid addr tok ni r , qhandler :: ni -> nid -> IO ([ni], [r], tok) , qshowR :: r -> String , qshowTok :: tok -> Maybe String } +data DHTSearch nid ni = forall addr tok r. DHTSearch + { searchThread :: ThreadId + , searchState :: SearchState nid addr tok ni r + , searchShowTok :: tok -> Maybe String + , searchResults :: TVar (Set.Set String) + } + data DHT = forall nid ni. ( Show ni , Read ni , ToJSON ni , FromJSON ni + , Ord ni + , Hashable ni + , Show nid + , Ord nid + , Hashable nid ) => DHT - { dhtBuckets :: TVar (BucketList ni) - , dhtPing :: ni -> IO Bool - , dhtQuery :: Map.Map String (DHTQuery nid ni) - , dhtParseId :: String -> Either String nid + { dhtBuckets :: TVar (BucketList ni) + , dhtPing :: ni -> IO Bool + , dhtQuery :: Map.Map String (DHTQuery nid ni) + , dhtParseId :: String -> Either String nid + , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni)) } nodesFileName :: String -> String @@ -135,26 +153,102 @@ reportTable bkts = map (show *** show . fst) $ R.toList $ bkts -reportResult :: Show ni => +reportResult :: String -> (r -> String) -> (tok -> Maybe String) + -> (ni -> String) -> Handle -> Either String ([ni],[r],tok) -> IO () -reportResult meth showR showTok h (Left e) = hPutClient h e -reportResult meth showR showTok h (Right (ns,rs,tok)) = do +reportResult meth showR showTok showN h (Left e) = hPutClient h e +reportResult meth showR showTok showN h (Right (ns,rs,tok)) = do hPutClient h $ showReport report where report = intercalate [("","")] [ tok_r , node_r , result_r ] tok_r = maybe [] (pure . ("token:",)) $ showTok tok - node_r = map ( ("n",) . show ) ns + node_r = map ( ("n",) . showN ) ns result_r | (meth=="node") = [] | otherwise = map ( (take 1 meth,) . showR ) rs +-- example: +-- * 10 peer 141d6c6ee2810f46d28bbe8373d4f454a4122535 +-- - 1 peer 141d6c6ee2810f46d28bbe8373d4f454a4122535 +-- 22 node 141d6c6ee2810f46d28bbe8373d4f454a4122535 +-- +-- key: '*' in progress +-- '-' stopped +-- ' ' finished +showSearches :: ( Show nid + , Ord nid + , Hashable nid + , Ord ni + , Hashable ni + ) => Map.Map (String,nid) (DHTSearch nid ni) -> IO String +showSearches searches = do + tups <- forM (Map.toList searches) $ \((meth,nid),DHTSearch{..}) -> do + (is'fin, cnt) <- atomically $ + (,) <$> searchIsFinished searchState + <*> (Set.size <$> readTVar searchResults) + tstat <- threadStatus searchThread + let stat = case tstat of + _ | is'fin -> ' ' + ThreadFinished -> '-' + ThreadDied -> '-' + _ -> '*' + return (stat,show cnt,meth,show nid) + let cnt'width = maximum $ map (\(_,cnt,_,_)->length cnt) tups + mth'width = maximum $ map (\(_,_,mth,_)->length mth) tups + return $ do -- List monad. + (stat,cnt,meth,nid) <- tups + printf " %c %-*s %-*s %s\n" stat cnt'width cnt mth'width meth nid + +forkSearch :: + ( Ord nid + , Hashable nid + , Ord ni + , Hashable ni + , Show nid + ) => + String + -> nid + -> DHTQuery nid ni + -> TVar (Map.Map (String,nid) (DHTSearch nid ni)) + -> TVar (BucketList ni) + -> ThreadId + -> TVar (Maybe (IO ())) + -> STM () +forkSearch method nid DHTQuery{qsearch,qshowTok,qshowR} dhtSearches dhtBuckets tid kvar = do + ns <- R.kclosest (searchSpace qsearch) searchK nid <$> readTVar dhtBuckets + st <- newSearch qsearch nid ns + results <- newTVar Set.empty + let storeResult r = modifyTVar' results (Set.insert (qshowR r)) + >> return True + new = DHTSearch + { searchThread = tid + , searchState = st + , searchShowTok = qshowTok + , searchResults = results + } + modifyTVar' dhtSearches $ Map.insert (method,nid) new + writeTVar kvar $ Just $ searchLoop qsearch nid storeResult st + +reportSearchResults meth h DHTSearch{searchShowTok,searchState,searchResults} = do + (ns,rs) <- atomically $ do + mm <- readTVar $ searchInformant searchState + rset <- readTVar searchResults + let ns = map (\(MM.Binding ni tok _) -> (ni,tok)) + $ MM.toList mm + rs = Set.toList rset + return (ns,rs) + let n'width = succ $ maximum $ map (length . show . fst) ns + showN (n,tok) = take n'width (show n ++ repeat ' ') ++ (fromMaybe "" $ searchShowTok tok) + ns' = map showN ns + reportResult meth id (const Nothing) id h (Right (ns',rs,())) + data Session = Session { netname :: String , dhts :: Map.Map String DHT @@ -189,6 +283,7 @@ clientSession s@Session{..} sock cnum h = do tm <- getCurrentTime let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts hPutClient h $ showReport r +#endif ("mem", s) -> cmd0 $ do case s of "gc" -> do hPutClient h "Performing garbage collection..." @@ -221,7 +316,6 @@ clientSession s@Session{..} sock cnum h = do else hPutClient h "Run with +RTS -T to obtain live memory-usage information." _ -> hPutClient h "error." -#endif ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts -> cmd0 $ do bkts <- atomically $ readTVar dhtBuckets @@ -252,7 +346,7 @@ clientSession s@Session{..} sock cnum h = do (goTarget qry) $ dhtParseId nidstr goTarget DHTQuery{..} nid = - go nid >>= reportResult method qshowR qshowTok h + go nid >>= reportResult method qshowR qshowTok show h where go | null destination = fmap Right . qhandler self | otherwise = case readEither destination of @@ -263,6 +357,49 @@ clientSession s@Session{..} sock cnum h = do goQuery $ Map.lookup method dhtQuery + ("s", s) | Just dht@DHT{..} <- Map.lookup netname dhts + -> cmd0 $ do + let (method,xs) = break isSpace s + (nidstr,ys) = break isSpace $ dropWhile isSpace xs + presentSearches = hPutClient h + =<< showSearches + =<< atomically (readTVar dhtSearches) + goTarget qry nid = do + kvar <- atomically $ newTVar Nothing + -- Forking a thread, but it may ubruptly quit if the following + -- STM action decides not to add a new search. This is so that + -- I can store the ThreadId into new DHTSearch structure. + tid <- fork $ join $ atomically (readTVar kvar >>= maybe retry return) + join $ atomically $ do + schs <- readTVar dhtSearches + case Map.lookup (method,nid) schs of + Nothing -> do forkSearch method nid qry dhtSearches dhtBuckets tid kvar + return $ presentSearches + Just sch -> do writeTVar kvar (Just $ return ()) + return $ reportSearchResults method h sch + goQuery qry = either (hPutClient h . ("Bad search target: "++)) + (goTarget qry) + $ dhtParseId nidstr + if null method then presentSearches + else maybe (hPutClient h ("Unsupported method: "++method)) + goQuery + $ Map.lookup method dhtQuery + + ("x", s) | Just DHT{..} <- Map.lookup netname dhts + -> cmd0 $ do + let (method,xs) = break isSpace s + (nidstr,ys) = break isSpace $ dropWhile isSpace xs + go nid = join $ atomically $ do + schs <- readTVar dhtSearches + case Map.lookup (method,nid) schs of + Nothing -> return $ hPutClient h "No match." + Just DHTSearch{searchThread} -> do + modifyTVar' dhtSearches (Map.delete (method,nid)) + return $ do + killThread searchThread + hPutClient h "Removed search." + either (hPutClient h . ("Bad search target: "++)) go $ dhtParseId nidstr + ("save", _) | Just dht <- Map.lookup netname dhts -> cmd0 $ do saveNodes netname dht @@ -298,8 +435,11 @@ clientSession s@Session{..} sock cnum h = do readExternals :: [TVar (BucketList Mainline.NodeInfo)] -> IO [SockAddr] readExternals vars = do as <- atomically $ mapM (fmap (Mainline.nodeAddr . selfNode) . readTVar) vars + let unspecified (SockAddrInet _ 0) = True + unspecified (SockAddrInet6 _ _ (0,0,0,0) _) = True + unspecified _ = False -- TODO: Filter to only global addresses? - return as + return $ filter (not . unspecified) as defaultPort = "6881" @@ -324,6 +464,8 @@ main = do tox <- return $ error "TODO: Tox.newClient" quitTox <- return $ return () -- TODO: forkListener tox + mainlineSearches <- atomically $ newTVar Map.empty + let mainlineDHT bkts = DHT { dhtBuckets = bkts btR , dhtPing = Mainline.ping bt @@ -342,7 +484,8 @@ main = do show (Just . show)) ] - , dhtParseId = readEither :: String -> Either String Mainline.NodeId + , dhtParseId = readEither :: String -> Either String Mainline.NodeId + , dhtSearches = mainlineSearches } dhts = Map.fromList [ ("bt4", mainlineDHT Mainline.routing4) -- cgit v1.2.3