From 0e20eb6683761362ee282e3188fccdab46b02ee4 Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 27 Jul 2017 00:09:36 -0400 Subject: peer search. --- Kademlia.hs | 6 ++-- Mainline.hs | 65 ++++++++++++++++++------------------ src/Network/BitTorrent/DHT/Search.hs | 36 +++++++++++--------- 3 files changed, 57 insertions(+), 50 deletions(-) diff --git a/Kademlia.hs b/Kademlia.hs index 57afb7fc..531b533b 100644 --- a/Kademlia.hs +++ b/Kademlia.hs @@ -266,9 +266,9 @@ forkPollForRefresh interval psq refresh = do seconds -> threadDelay ( seconds * 1000000 ) again -refreshBucket :: forall nid ni addr. +refreshBucket :: forall nid tok ni addr. ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => - Search nid addr ni ni -> TVar (BucketList ni) -> nid -> Int -> IO Int + Search nid addr tok ni ni -> TVar (BucketList ni) -> nid -> Int -> IO Int refreshBucket sch var nid n = do tbl <- atomically (readTVar var) let count = bktCount tbl @@ -320,7 +320,7 @@ bootstrap :: , Ord nid , Traversable t1 , Traversable t - ) => Search nid addr ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () + ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () bootstrap sch var ping ns ns0 = do gotPing <- atomically $ newTVar False diff --git a/Mainline.hs b/Mainline.hs index 30e18a09..7c54a096 100644 --- a/Mainline.hs +++ b/Mainline.hs @@ -945,30 +945,38 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do isReadonlyClient client = False -- TODO -ping :: MainlineClient -> NodeInfo -> IO Bool -ping client addr = fromMaybe False <$> sendQuery client serializer Ping addr +mainlineSend meth unwrap msg client nid addr = do + reply <- sendQuery client serializer (msg nid) addr + -- sendQuery will return (Just (Left _)) on a parse error. We're going to + -- blow it away with the join-either sequence. + -- TODO: Do something with parse errors. + return $ join $ either (const Nothing) Just <$> reply where serializer = MethodSerializer { methodTimeout = 5 - , method = Method "ping" - , wrapQuery = encodeQueryPayload (Method "ping") (isReadonlyClient client) - , unwrapResponse = const True + , method = meth + , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) + , unwrapResponse = (>>= either (Left . Error GenericError . Char8.pack) + (Right . unwrap) + . BE.fromBEncode) + . rspPayload } --- searchQuery :: ni -> IO ([ni], [r]) -getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO ([NodeInfo],[NodeInfo]) -getNodes client nid addr = - fromMaybe ([],[]) <$> sendQuery client serializer (FindNode nid (Just Want_Both)) addr +ping :: MainlineClient -> NodeInfo -> IO Bool +ping client addr = + fromMaybe False + <$> mainlineSend (Method "ping") (\Ping -> True) (const Ping) client () addr + +-- searchQuery :: ni -> IO (Maybe [ni], [r], tok)) +getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],())) +getNodes = mainlineSend (Method "find_node") unwrap $ flip FindNode (Just Want_Both) where - serializer = MethodSerializer - { methodTimeout = 5 - , method = Method "find_node" - , wrapQuery = encodeQueryPayload (Method "find_node") (isReadonlyClient client) - , unwrapResponse = \case - R { rspPayload = Right bval } | Right (NodeFound ns4 ns6) <- BE.fromBEncode bval - -> (ns4++ns6, ns4++ns6) - _ -> ([],[]) - } + unwrap (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6,()) + +getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Token)) +getPeers = mainlineSend (Method "get_peers") unwrap $ flip GetPeers (Just Want_Both) . coerce + where + unwrap (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, tok) data TriadSlot = SlotA | SlotB | SlotC deriving (Eq,Ord,Enum,Show,Read) @@ -1048,23 +1056,16 @@ delVote triad voter = do writeTVar (triadSlot slot triad) Nothing triadCountVotes prior triad -nodeSearch client = Search - { searchSpace = mainlineSpace +mainlineSearch qry = Search + { searchSpace = mainlineSpace , searchNodeAddress = nodeIP &&& nodePort - , searchQuery = \nid ni -> do - hPutStrLn stderr $ "findNodes "++show nid++" --> "++show ni - handle (\(SomeException e) -> do - hPutStrLn stderr $ "got "++show e - -- threadDelay 1000000 - return ([],[])) - $ do - (xs,y) <- getNodes client nid ni - forM_ xs $ \x -> do - hPutStrLn stderr $ "got "++show x - -- threadDelay 1000000 - return (xs,y) + , searchQuery = qry } +nodeSearch client = mainlineSearch (getNodes client) + +peerSearch client = mainlineSearch (getPeers client) + -- | List of bootstrap nodes maintained by different bittorrent -- software authors. bootstrapNodes :: WantIP -> IO [NodeInfo] diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index 8c441cb0..b263e339 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -21,7 +21,7 @@ import qualified Data.Set as Set import System.IO import qualified Data.MinMaxPSQ as MM - ;import Data.MinMaxPSQ (MinMaxPSQ) + ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) import Network.Address hiding (NodeId) @@ -34,13 +34,13 @@ import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif -data Search nid addr ni r = Search +data Search nid addr tok ni r = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr - , searchQuery :: nid -> ni -> IO ([ni], [r]) + , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok)) } -data SearchState nid addr ni r = SearchState +data SearchState nid addr tok ni r = SearchState {- { searchParams :: Search nid addr ni r @@ -60,7 +60,7 @@ data SearchState nid addr ni r = SearchState -- | Nodes scheduled to be queried. , searchQueued :: TVar (MinMaxPSQ ni nid) -- | The nearest K nodes that issued a reply. - , searchInformant :: TVar (MinMaxPSQ ni nid) + , searchInformant :: TVar (MinMaxPSQ' ni nid tok) -- | This tracks already-queried addresses so we avoid bothering them -- again. XXX: We could probably keep only the pending queries in this -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha @@ -79,10 +79,10 @@ newSearch :: ( Ord addr -> (r -> STM Bool) -- receives search results. -> nid -- target of search -} - Search nid addr ni r + Search nid addr tok ni r -> nid -> [ni] -- Initial nodes to query. - -> IO (SearchState nid addr ni r) + -> IO (SearchState nid addr tok ni r) newSearch (Search space nAddr qry) target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList @@ -99,25 +99,31 @@ searchAlpha = 8 searchK :: Int searchK = 8 -sendQuery :: forall addr nid ni r. +sendQuery :: forall addr nid tok ni r. ( Ord addr , Ord r , PSQKey nid , PSQKey ni , Show nid ) => - Search nid addr ni r + Search nid addr tok ni r -> nid -> (r -> STM Bool) - -> SearchState nid addr ni r + -> SearchState nid addr tok ni r -> Binding ni nid -> IO () sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) - (ns,rs) <- handle (\(SomeException e) -> return ([],[])) + -- TODO: Should we really be catching ThreadKilled ? + reply <- handle (\(SomeException e) -> return Nothing) (searchQuery searchTarget ni) + -- (ns,rs) + let tok = error "TODO: token" atomically $ do modifyTVar searchPendingCount pred + maybe (return ()) go reply + where + go (ns,rs,tok) = do vs <- readTVar searchVisited -- We only queue a node if it is not yet visited let insertFoundNode :: ni @@ -130,7 +136,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = $ kademliaLocation searchSpace n ) q modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns - modifyTVar searchInformant $ MM.insertTake searchK ni d + modifyTVar searchInformant $ MM.insertTake' searchK ni tok d flip fix rs $ \loop -> \case r:rs' -> do wanting <- searchResult r @@ -142,7 +148,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = searchIsFinished :: ( Ord addr , PSQKey nid , PSQKey ni - ) => SearchState nid addr ni r -> STM Bool + ) => SearchState nid addr tok ni r -> STM Bool searchIsFinished SearchState{ ..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount @@ -153,7 +159,7 @@ searchIsFinished SearchState{ ..} = do && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) -searchCancel :: SearchState nid addr ni r -> STM () +searchCancel :: SearchState nid addr tok ni r -> STM () searchCancel SearchState{..} = do writeTVar searchPendingCount 0 writeTVar searchQueued MM.empty @@ -164,7 +170,7 @@ search :: , PSQKey nid , PSQKey ni , Show nid - ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r) + ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) search sch@Search{..} buckets target result = do let ns = R.kclosest searchSpace searchK target buckets st <- newSearch sch target ns -- cgit v1.2.3