summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-01-17 03:42:32 -0500
committerJoe Crayne <joe@jerkface.net>2019-01-17 03:42:32 -0500
commit46b1ebb81838dc7ecf94533b25cd51e84bd0cf04 (patch)
treedd92015778558c13b4fba70c310c8ffd08e37820
parent6ebe91b686ca8bef893f9a3dd704e45c04124b8f (diff)
Use async queries for all UDP kademlia searches.
-rw-r--r--src/Network/BitTorrent/MainlineDHT.hs54
-rw-r--r--src/Network/Tox/DHT/Handlers.hs69
-rw-r--r--src/Network/Tox/Onion/Handlers.hs46
3 files changed, 146 insertions, 23 deletions
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs
index 83865c98..89851e88 100644
--- a/src/Network/BitTorrent/MainlineDHT.hs
+++ b/src/Network/BitTorrent/MainlineDHT.hs
@@ -563,7 +563,7 @@ newClient swarms addr = do
563 -- We defer initializing the refreshSearch and refreshPing until we 563 -- We defer initializing the refreshSearch and refreshPing until we
564 -- have a client to send queries with. 564 -- have a client to send queries with.
565 let nullPing = const $ return False 565 let nullPing = const $ return False
566 nullSearch = mainlineSearch $ \_ _ -> return Nothing 566 nullSearch = mainlineSearch $ Left $ \_ _ -> return Nothing
567 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount 567 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount
568 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing 568 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing
569 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount 569 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount
@@ -1035,13 +1035,36 @@ mainlineSend :: ( BEncode a
1035 -> NodeInfo 1035 -> NodeInfo
1036 -> IO (Maybe b) 1036 -> IO (Maybe b)
1037mainlineSend meth unwrap msg client nid addr = do 1037mainlineSend meth unwrap msg client nid addr = do
1038 reply <- sendQuery client serializer (msg nid) addr 1038 reply <- sendQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr
1039 -- sendQuery will return (Just (Left _)) on a parse error. We're going to 1039 -- sendQuery will return (Just (Left _)) on a parse error. We're going to
1040 -- blow it away with the join-either sequence. 1040 -- blow it away with the join-either sequence.
1041 -- TODO: Do something with parse errors. 1041 -- TODO: Do something with parse errors.
1042 return $ join $ either (const Nothing) Just <$> reply 1042 return $ join $ either (const Nothing) Just <$> reply
1043 where 1043
1044 serializer = MethodSerializer 1044mainlineAsync :: (BEncode a1, BEncode a2) =>
1045 Method
1046 -> (a2 -> a3)
1047 -> (t -> a1)
1048 -> Client String Method TransactionId NodeInfo (Message BValue)
1049 -> t
1050 -> NodeInfo
1051 -> (Maybe a3 -> IO ())
1052 -> IO ()
1053mainlineAsync meth unwrap msg client nid addr onresult = do
1054 asyncQuery client (mainlineSerializeer meth unwrap client) (msg nid) addr
1055 $ \reply ->
1056 -- sendQuery will return (Just (Left _)) on a parse error. We're going to
1057 -- blow it away with the join-either sequence.
1058 -- TODO: Do something with parse errors.
1059 onresult $ join $ either (const Nothing) Just <$> reply
1060
1061mainlineSerializeer :: (BEncode a2, BEncode a1) =>
1062 Method
1063 -> (a2 -> b)
1064 -> MainlineClient
1065 -> MethodSerializer
1066 TransactionId NodeInfo (Message BValue) Method a1 (Either Error b)
1067mainlineSerializeer meth unwrap client = MethodSerializer
1045 { methodTimeout = \_ ni -> return (ni, 5000000) 1068 { methodTimeout = \_ ni -> return (ni, 5000000)
1046 , method = meth 1069 , method = meth
1047 , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) 1070 , wrapQuery = encodeQueryPayload meth (isReadonlyClient client)
@@ -1060,30 +1083,45 @@ ping client addr =
1060getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) 1083getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ()))
1061getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) 1084getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1062 1085
1086asyncGetNodes :: Client String Method TransactionId NodeInfo (Message BValue)
1087 -> NodeId
1088 -> NodeInfo
1089 -> (Maybe ([NodeInfo], [NodeInfo], Maybe ()) -> IO ())
1090 -> IO ()
1091asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1092
1063unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) 1093unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ())
1064unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) 1094unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ())
1065 1095
1066getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Maybe Token)) 1096getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Maybe Token))
1067getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce 1097getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1068 1098
1099asyncGetPeers :: Client String Method TransactionId NodeInfo (Message BValue)
1100 -> NodeId
1101 -> NodeInfo
1102 -> (Maybe ([NodeInfo], [PeerAddr], Maybe Token) -> IO ())
1103 -> IO ()
1104asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1105
1069unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) 1106unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token)
1070unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) 1107unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok)
1071 1108
1072mainlineSearch :: (NodeId -> NodeInfo -> IO (Maybe ([NodeInfo], [r], Maybe tok))) 1109mainlineSearch :: Either (NodeId -> NodeInfo -> IO (Maybe ([NodeInfo], [r], Maybe tok)))
1110 (NodeId -> NodeInfo -> (Maybe ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO ())
1073 -> Search NodeId (IP, PortNumber) tok NodeInfo r 1111 -> Search NodeId (IP, PortNumber) tok NodeInfo r
1074mainlineSearch qry = Search 1112mainlineSearch qry = Search
1075 { searchSpace = mainlineSpace 1113 { searchSpace = mainlineSpace
1076 , searchNodeAddress = nodeIP &&& nodePort 1114 , searchNodeAddress = nodeIP &&& nodePort
1077 , searchQuery = Left qry 1115 , searchQuery = qry
1078 , searchAlpha = 8 1116 , searchAlpha = 8
1079 , searchK = 16 1117 , searchK = 16
1080 } 1118 }
1081 1119
1082nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo 1120nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo
1083nodeSearch client = mainlineSearch (getNodes client) 1121nodeSearch client = mainlineSearch (Right $ asyncGetNodes client)
1084 1122
1085peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr 1123peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr
1086peerSearch client = mainlineSearch (getPeers client) 1124peerSearch client = mainlineSearch (Right $ asyncGetPeers client)
1087 1125
1088-- | List of bootstrap nodes maintained by different bittorrent 1126-- | List of bootstrap nodes maintained by different bittorrent
1089-- software authors. 1127-- software authors.
diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs
index abd607c3..1eec93b9 100644
--- a/src/Network/Tox/DHT/Handlers.hs
+++ b/src/Network/Tox/DHT/Handlers.hs
@@ -400,20 +400,61 @@ unsendNodes _ = Nothing
400unwrapNodes :: SendNodes -> ( [NodeInfo], [NodeInfo], Maybe () ) 400unwrapNodes :: SendNodes -> ( [NodeInfo], [NodeInfo], Maybe () )
401unwrapNodes (SendNodes ns) = (ns,ns,Just ()) 401unwrapNodes (SendNodes ns) = (ns,ns,Just ())
402 402
403data SendableQuery x a b = SendableQuery
404 { sendableSerializer :: MethodSerializer TransactionId NodeInfo Message PacketKind a (Maybe x)
405 , sendableQuery :: NodeId -> a
406 , sendableResult :: Maybe (Maybe x) -> IO b
407 }
408
409sendQ :: SendableQuery x a b
410 -> QR.Client err PacketKind TransactionId NodeInfo Message
411 -> NodeId
412 -> NodeInfo
413 -> IO b
414sendQ s client nid addr = do
415 reply <- QR.sendQuery client (sendableSerializer s) (sendableQuery s nid) addr
416 sendableResult s reply
417
418asyncQ :: SendableQuery x a b
419 -> QR.Client err PacketKind TransactionId NodeInfo Message
420 -> NodeId
421 -> NodeInfo
422 -> (b -> IO ())
423 -> IO ()
424asyncQ s client nid addr go = do
425 QR.asyncQuery client (sendableSerializer s) (sendableQuery s nid) addr
426 $ sendableResult s >=> go
427
428getNodesSendable :: TVar (HashMap NodeId [NodeInfoCallback])
429 -> NodeInfo
430 -> SendableQuery SendNodes GetNodes (Maybe ([NodeInfo], [NodeInfo], Maybe ()))
431getNodesSendable cbvar addr = SendableQuery (serializer GetNodesType DHTGetNodes unsendNodes)
432 GetNodes
433 go
434 where
435 go reply = do
436 forM_ (join reply) $ \(SendNodes ns) ->
437 forM_ ns $ \n -> do
438 now <- getPOSIXTime
439 atomically $ do
440 mcbs <- HashMap.lookup (nodeId n) <$> readTVar cbvar
441 forM_ mcbs $ \cbs -> do
442 forM_ cbs $ \cb -> do
443 rumoredAddress cb now (nodeAddr addr) n
444 return $ fmap unwrapNodes $ join reply
445
403getNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) 446getNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ()))
404getNodes client cbvar nid addr = do 447getNodes client cbvar nid addr =
405 -- dput XMisc $ show addr ++ " <-- getnodes " ++ show nid 448 sendQ (getNodesSendable cbvar addr) client nid addr
406 reply <- QR.sendQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr 449
407 -- dput XMisc $ show addr ++ " -sendnodes-> " ++ show reply 450asyncGetNodes :: QR.Client err PacketKind TransactionId NodeInfo Message
408 forM_ (join reply) $ \(SendNodes ns) -> 451 -> TVar (HashMap NodeId [NodeInfoCallback])
409 forM_ ns $ \n -> do 452 -> NodeId
410 now <- getPOSIXTime 453 -> NodeInfo
411 atomically $ do 454 -> (Maybe ([NodeInfo], [NodeInfo], Maybe ()) -> IO ())
412 mcbs <- HashMap.lookup (nodeId n) <$> readTVar cbvar 455 -> IO ()
413 forM_ mcbs $ \cbs -> do 456asyncGetNodes client cbvar nid addr go =
414 forM_ cbs $ \cb -> do 457 asyncQ (getNodesSendable cbvar addr) client nid addr go
415 rumoredAddress cb now (nodeAddr addr) n
416 return $ fmap unwrapNodes $ join reply
417 458
418updateRouting :: Client -> Routing 459updateRouting :: Client -> Routing
419 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) 460 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ())
@@ -525,7 +566,7 @@ nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeI
525nodeSearch client cbvar = Search 566nodeSearch client cbvar = Search
526 { searchSpace = toxSpace 567 { searchSpace = toxSpace
527 , searchNodeAddress = nodeIP &&& nodePort 568 , searchNodeAddress = nodeIP &&& nodePort
528 , searchQuery = Left $ getNodes client cbvar 569 , searchQuery = Right $ asyncGetNodes client cbvar
529 , searchAlpha = 8 570 , searchAlpha = 8
530 , searchK = 16 571 , searchK = 16
531 572
diff --git a/src/Network/Tox/Onion/Handlers.hs b/src/Network/Tox/Onion/Handlers.hs
index edbbbb49..f44dd79c 100644
--- a/src/Network/Tox/Onion/Handlers.hs
+++ b/src/Network/Tox/Onion/Handlers.hs
@@ -218,7 +218,7 @@ toxidSearch :: (TransactionId -> OnionDestination r -> STM (OnionDestination r,
218toxidSearch getTimeout crypto client = Search 218toxidSearch getTimeout crypto client = Search
219 { searchSpace = toxSpace 219 { searchSpace = toxSpace
220 , searchNodeAddress = nodeIP &&& nodePort 220 , searchNodeAddress = nodeIP &&& nodePort
221 , searchQuery = Left $ getRendezvous getTimeout crypto client 221 , searchQuery = Right $ asyncGetRendezvous getTimeout crypto client
222 , searchAlpha = 3 222 , searchAlpha = 3
223 , searchK = 6 223 , searchK = 6
224 } 224 }
@@ -290,6 +290,30 @@ sendOnion getTimeout client req oaddr unwrap =
290 (return . Just . unwrap (onionNodeInfo oaddr)) 290 (return . Just . unwrap (onionNodeInfo oaddr))
291 $ join mb 291 $ join mb
292 292
293asyncOnion :: (TransactionId
294 -> OnionDestination r -> STM (OnionDestination r, Int))
295 -> QR.Client
296 err
297 PacketKind
298 TransactionId
299 (OnionDestination r)
300 (OnionMessage Identity)
301 -> AnnounceRequest
302 -> OnionDestination r
303 -> (NodeInfo -> AnnounceResponse -> a)
304 -> (Maybe a -> IO ())
305 -> IO ()
306asyncOnion getTimeout client req oaddr unwrap go =
307 -- Four tries and then we tap out.
308 flip fix 4 $ \loop n -> do
309 QR.asyncQuery client (announceSerializer getTimeout) req oaddr
310 $ \mb -> do
311 forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " sent response: " ++ show r
312 maybe (if n>0 then loop $! n - 1 else go Nothing)
313 (go . Just . unwrap (onionNodeInfo oaddr))
314 $ join mb
315
316
293-- | Lookup the secret counterpart for a given alias key. 317-- | Lookup the secret counterpart for a given alias key.
294getRendezvous :: (TransactionId -> OnionDestination r -> STM (OnionDestination r, Int)) 318getRendezvous :: (TransactionId -> OnionDestination r -> STM (OnionDestination r, Int))
295 -> TransportCrypto 319 -> TransportCrypto
@@ -308,6 +332,26 @@ getRendezvous getTimeout crypto client nid ni = do
308 oaddr 332 oaddr
309 (unwrapAnnounceResponse rkey) 333 (unwrapAnnounceResponse rkey)
310 334
335asyncGetRendezvous
336 :: (TransactionId -> OnionDestination r -> STM (OnionDestination r, Int))
337 -> TransportCrypto
338 -> Client r
339 -> NodeId
340 -> NodeInfo
341 -> (Maybe ([NodeInfo], [Rendezvous], Maybe Nonce32) -> IO ())
342 -> IO ()
343asyncGetRendezvous getTimeout crypto client nid ni go = do
344 asel <- atomically $ selectAlias crypto nid
345 let oaddr = OnionDestination asel ni Nothing
346 rkey = case asel of
347 SearchingAlias -> Nothing
348 _ -> Just $ key2id $ rendezvousPublic crypto
349 asyncOnion getTimeout client
350 (AnnounceRequest zeros32 nid $ fromMaybe zeroID rkey)
351 oaddr
352 (unwrapAnnounceResponse rkey)
353 go
354
311putRendezvous :: (TransactionId -> OnionDestination r -> STM (OnionDestination r, Int)) 355putRendezvous :: (TransactionId -> OnionDestination r -> STM (OnionDestination r, Int))
312 -> TransportCrypto 356 -> TransportCrypto
313 -> Client r 357 -> Client r