import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Data.Bits import GHC.Event import Network.Kademlia import Network.Kademlia.Bootstrap import Network.Kademlia.CommonAPI import Network.Kademlia.Persistence import Network.Kademlia.Routing import Network.Kademlia.Search import Network.QueryResponse as QR import qualified Data.MinMaxPSQ as MM import qualified Data.Set as Set makeSchResults :: TVar (Maybe (SearchState Int Int () Int Int Int)) -> TVar Int -> IO ([Int],[Int],Maybe ()) makeSchResults mbv var = do (r,io) <- atomically $ do n <- readTVar var let ns = take 4 [n .. ] writeTVar var $! n + 4 let r = (ns, ns, Just ()) -- Maybe ([ni], [r], Maybe tok) stmio = if n > 490 then do ms <- readTVar mbv case ms of Just s -> do report <- showSearchState s return $ putStrLn $ "cnt=" ++ show n ++ " " ++ report _ -> return $ return () else return $ return () io <- stmio return (r,io) io return r kad :: KademliaSpace Int Int kad = KademliaSpace { kademliaLocation = id , kademliaTestBit = \x i -> testBit i x , kademliaXor = \x y -> abs (x - y) , kademliaSample = \_ x _ -> pure x } sch :: TVar (Maybe (SearchState Int Int () Int Int Int)) -> TVar Int -> Search Int Int () Int Int Int sch mbv var = Search { searchSpace = kad , searchNodeAddress = id -- searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk , searchQuery = \_ _ f -> do val <- atomically $ do n <- readTVar var writeTVar var $! n + 1 return n tm <- getSystemTimerManager k <- registerTimeout tm 200000 $ do putStrLn $ "makeSchResults " ++ show val r <- makeSchResults mbv var f val (Success r) return val , searchQueryCancel = \_ _ -> return () , searchAlpha = 4 , searchK = 8 } onResult :: Int -> STM Bool onResult k = return (k < 50000) showSearchState st = do pc <- readTVar (searchPendingCount st) q <- readTVar (searchQueued st) inf <- readTVar (searchInformant st) vset <- readTVar (searchVisited st) return $ unwords [ "pending=" ++ show pc , "|q|=" ++ show (maybe 0 MM.size q) , "|inf|=" ++ show (MM.size inf) , "|visited|=" ++ show (Set.size vset) ] main = do var <- newTVarIO 5 fin <- newTVarIO False mbstvar <- newTVarIO Nothing s <- atomically $ newSearch (sch mbstvar var) maxBound [1..4] atomically $ writeTVar mbstvar (Just s) t <- forkIO $ do searchLoop (sch mbstvar var) maxBound onResult s atomically $ writeTVar fin True putStrLn "Waiting on counter." (done,n) <- atomically $ do done <- readTVar fin n <- readTVar var if (not done && n < 500) then retry else return (done,n) putStrLn $ "(done,n) = " ++ show (done,n) report <- atomically $ do report <- showSearchState s searchCancel s return report putStrLn report putStrLn "Canceled. Awaiting fin. The program should quit shortly without much output after this." forkIO $ let loop = do (x,report) <- atomically $ (,) <$> readTVar var <*> showSearchState s putStrLn $ "query after cancel! " ++ show x ++ " " ++ report atomically $ readTVar var >>= \y -> if x == y then retry else return () loop in loop atomically $ check =<< readTVar fin