summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 21:27:50 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-07 13:24:59 -0500
commitc7fb8cfe16f821e4e148d1855a18cb81255743bc (patch)
treec035afc9ff870ea3bfc5b1dc7c4254ad0c0bf4b3
parent5ea2de4e858cc89282561922bae257b6f9041d2e (diff)
Async search.
-rw-r--r--dht/Announcer/Tox.hs28
-rw-r--r--dht/TCPProber.hs28
-rw-r--r--dht/examples/dhtd.hs16
-rw-r--r--dht/src/Network/BitTorrent/MainlineDHT.hs76
-rw-r--r--dht/src/Network/Tox.hs2
-rw-r--r--dht/src/Network/Tox/DHT/Handlers.hs58
-rw-r--r--dht/src/Network/Tox/Onion/Handlers.hs49
-rw-r--r--dht/src/Network/Tox/Onion/Routes.hs83
-rw-r--r--dht/src/Network/Tox/TCP.hs49
-rw-r--r--kad/src/Network/Kademlia/Search.hs35
-rw-r--r--kad/tests/searchCancel.hs2
-rw-r--r--server/src/Network/QueryResponse.hs10
12 files changed, 320 insertions, 116 deletions
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs
index e2459e0e..00eb219b 100644
--- a/dht/Announcer/Tox.hs
+++ b/dht/Announcer/Tox.hs
@@ -27,22 +27,23 @@ import Data.Time.Clock.POSIX
27announceK :: Int 27announceK :: Int
28announceK = 8 28announceK = 8
29 29
30data AnnounceState = forall nid addr tok ni r. AnnounceState 30data AnnounceState = forall nid addr tok ni r qk. AnnounceState
31 { aState :: SearchState nid addr tok ni r 31 { aState :: SearchState nid addr tok ni r qk
32 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) 32 , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime))
33 } 33 }
34 34
35-- | This type specifies an item that can be announced on appropriate nodes in 35-- | This type specifies an item that can be announced on appropriate nodes in
36-- a Kademlia network. 36-- a Kademlia network.
37data AnnounceMethod r = forall nid ni sr addr tok a. 37data AnnounceMethod r = forall nid ni sr addr tok a qk.
38 ( Show nid 38 ( Show nid
39 , Hashable nid 39 , Hashable nid
40 , Hashable ni 40 , Hashable ni
41 , Ord addr 41 , Ord addr
42 , Ord nid 42 , Ord nid
43 , Ord ni 43 , Ord ni
44 , Ord qk
44 ) => AnnounceMethod 45 ) => AnnounceMethod
45 { aSearch :: Search nid addr tok ni sr 46 { aSearch :: Search nid addr tok ni sr qk
46 -- ^ This is the Kademlia search to run repeatedly to find the 47 -- ^ This is the Kademlia search to run repeatedly to find the
47 -- nearby nodes. A new search is started whenever one is not 48 -- nearby nodes. A new search is started whenever one is not
48 -- already in progress at announce time. Repeated searches are 49 -- already in progress at announce time. Repeated searches are
@@ -72,15 +73,16 @@ data AnnounceMethod r = forall nid ni sr addr tok a.
72 } 73 }
73 74
74-- | This type specifies a Kademlia search and an action to perform upon the result. 75-- | This type specifies a Kademlia search and an action to perform upon the result.
75data SearchMethod r = forall nid ni sr addr tok a. 76data SearchMethod r = forall nid ni sr addr tok a qk.
76 ( Show nid 77 ( Show nid
77 , Hashable nid 78 , Hashable nid
78 , Hashable ni 79 , Hashable ni
79 , Ord addr 80 , Ord addr
80 , Ord nid 81 , Ord nid
81 , Ord ni 82 , Ord ni
83 , Ord qk
82 ) => SearchMethod 84 ) => SearchMethod
83 { sSearch :: Search nid addr tok ni sr 85 { sSearch :: Search nid addr tok ni sr qk
84 -- ^ This is the Kademlia search to run repeatedly to find the 86 -- ^ This is the Kademlia search to run repeatedly to find the
85 -- nearby nodes. A new search is started whenever one is not 87 -- nearby nodes. A new search is started whenever one is not
86 -- already in progress at announce time. Repeated searches are 88 -- already in progress at announce time. Repeated searches are
@@ -155,8 +157,6 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar
155 publishToNodes is 157 publishToNodes is
156 onResult sr = return True 158 onResult sr = return True
157 searchAgain = do 159 searchAgain = do
158 -- Canceling a pending search here seems to make announcements more reliable.
159 searchCancel st
160 return $ void $ do 160 return $ void $ do
161 t <- fork search 161 t <- fork search
162 labelThread t ("scheduleAnnounce.sch." ++ show aTarget) 162 labelThread t ("scheduleAnnounce.sch." ++ show aTarget)
@@ -164,7 +164,10 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar
164 got <- tryTakeMVar mutex 164 got <- tryTakeMVar mutex
165 case got of 165 case got of
166 Just () -> do 166 Just () -> do
167 atomically $ reset aNearestNodes aSearch aTarget st 167 me <- myThreadId
168 labelThread me "scheduleAnnounce.reset"
169 reset aNearestNodes aSearch aTarget st
170 labelThread me "scheduleAnnounce.searchLoop"
168 searchLoop aSearch aTarget onResult st 171 searchLoop aSearch aTarget onResult st
169 -- Announce to any nodes we haven't already announced to. 172 -- Announce to any nodes we haven't already announced to.
170 is <- atomically $ do 173 is <- atomically $ do
@@ -202,8 +205,6 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge
202 return () 205 return ()
203 return True -- True to keep searching. 206 return True -- True to keep searching.
204 searchAgain = do 207 searchAgain = do
205 -- Canceling a pending search here seems to make announcements more reliable.
206 searchCancel st
207 return $ void $ do 208 return $ void $ do
208 t <- fork search 209 t <- fork search
209 labelThread t ("scheduleSearch.sch." ++ show sTarget) 210 labelThread t ("scheduleSearch.sch." ++ show sTarget)
@@ -211,7 +212,10 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge
211 got <- tryTakeMVar mutex 212 got <- tryTakeMVar mutex
212 case got of 213 case got of
213 Just () -> do 214 Just () -> do
214 atomically $ reset sNearestNodes sSearch sTarget st 215 me <- myThreadId
216 labelThread me "scheduleSearch.reset"
217 reset sNearestNodes sSearch sTarget st
218 labelThread me "scheduleSearch.searchLoop"
215 searchLoop sSearch sTarget onResult st 219 searchLoop sSearch sTarget onResult st
216 putMVar mutex () 220 putMVar mutex ()
217 Nothing -> do 221 Nothing -> do
diff --git a/dht/TCPProber.hs b/dht/TCPProber.hs
index 17b68f64..ccdbd8d1 100644
--- a/dht/TCPProber.hs
+++ b/dht/TCPProber.hs
@@ -176,11 +176,35 @@ getNodes prober tcp seeking dst = do
176 return $ Success ts 176 return $ Success ts
177 _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r 177 _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r
178 178
179nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo 179asyncGetNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo
180 -> (Nonce8 -> Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()) -> IO ())
181 -> IO Nonce8
182asyncGetNodes prober tcp seeking dst withResponse = do
183 TCP.asyncUDPNodes tcp seeking (TCP.udpNodeInfo dst) $ \qid r -> do
184 dput XTCP $ "Got via TCP nodes: " ++ show r
185 let tcps (ns,_,mb) = (ns',ns',mb)
186 where ns' = do
187 n <- ns
188 [ TCP.NodeInfo n 0 ]
189 r' <- case r of
190 Success (ns,gw) -> do
191 let ts = tcps ns
192 if TCP.nodeId gw == TCP.nodeId dst
193 then return $ Success ts
194 else do
195 enqueueProbe prober (TCP.udpNodeInfo dst)
196 return $ Success ts
197 return $ Success ts
198 _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r
199 withResponse qid r'
200
201
202nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo Nonce8
180nodeSearch prober tcp = Search 203nodeSearch prober tcp = Search
181 { searchSpace = TCP.tcpSpace 204 { searchSpace = TCP.tcpSpace
182 , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort 205 , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort
183 , searchQuery = getNodes prober tcp 206 , searchQuery = asyncGetNodes prober tcp
207 , searchQueryCancel = cancelQuery (TCP.tcpClient tcp)
184 , searchAlpha = 8 208 , searchAlpha = 8
185 , searchK = 16 209 , searchK = 16
186 } 210 }
diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs
index 6b057af9..3078831d 100644
--- a/dht/examples/dhtd.hs
+++ b/dht/examples/dhtd.hs
@@ -811,8 +811,14 @@ clientSession s@Session{..} sock cnum h = do
811 where 811 where
812 go | null destination = fmap Right . qhandler self 812 go | null destination = fmap Right . qhandler self
813 | otherwise = case readEither destination of 813 | otherwise = case readEither destination of
814 Right ni -> fmap (maybe (Left "Timeout.") Right . resultToMaybe) 814 Right ni -> \nid -> do
815 . flip (searchQuery qsearch) ni -- TODO report canceled 815 v <- newEmptyMVar
816 _ <- searchQuery qsearch nid ni $ \_ r -> putMVar v r
817 r <- takeMVar v
818 return $ case r of
819 Success x -> Right x
820 Canceled -> Left "Canceled."
821 TimedOut -> Left "Timeout."
816 Left e -> const $ return $ Left ("Bad destination: "++e) 822 Left e -> const $ return $ Left ("Bad destination: "++e)
817 maybe (hPutClient h ("Unsupported method: "++method)) 823 maybe (hPutClient h ("Unsupported method: "++method))
818 goQuery 824 goQuery
@@ -938,14 +944,14 @@ clientSession s@Session{..} sock cnum h = do
938 , Typeable ptok 944 , Typeable ptok
939 , Typeable sni 945 , Typeable sni
940 , Typeable pni ) 946 , Typeable pni )
941 => Search nid addr stok sni sr 947 => Search nid addr stok sni sr qk
942 -> (pr -> ptok -> Maybe pni -> IO (Maybe pubr)) 948 -> (pr -> ptok -> Maybe pni -> IO (Maybe pubr))
943 -> Maybe (stok :~: ptok, sni :~: pni) 949 -> Maybe (stok :~: ptok, sni :~: pni)
944 matchingResult _ _ = liftA2 (,) eqT eqT 950 matchingResult _ _ = liftA2 (,) eqT eqT
945 matchingResult2 :: 951 matchingResult2 ::
946 ( Typeable sr 952 ( Typeable sr
947 , Typeable pr ) 953 , Typeable pr )
948 => Search nid addr stok sni sr 954 => Search nid addr stok sni sr qk
949 -> (PublicKey -> pdta -> pr -> IO ()) 955 -> (PublicKey -> pdta -> pr -> IO ())
950 -> (pdta -> nid) 956 -> (pdta -> nid)
951 -> Maybe (pr :~: sr) 957 -> Maybe (pr :~: sr)
@@ -1913,7 +1919,7 @@ main = do
1913 btSaved <- loadNodes netname -- :: IO [Mainline.NodeInfo] 1919 btSaved <- loadNodes netname -- :: IO [Mainline.NodeInfo]
1914 putStrLn $ "Loaded "++show (length btSaved)++" nodes for "++netname++"." 1920 putStrLn $ "Loaded "++show (length btSaved)++" nodes for "++netname++"."
1915 fallbackNodes <- getBootstrapNodes 1921 fallbackNodes <- getBootstrapNodes
1916 let isNodesSearch :: ni :~: r -> Search nid addr tok ni r -> Search nid addr tok ni ni 1922 let isNodesSearch :: ni :~: r -> Search nid addr tok ni r qk -> Search nid addr tok ni ni qk
1917 isNodesSearch Refl sch = sch 1923 isNodesSearch Refl sch = sch
1918 ping = maybe (const $ return False) 1924 ping = maybe (const $ return False)
1919 (\DHTPing{pingQuery} -> fmap (maybe False (const True)) . pingQuery []) 1925 (\DHTPing{pingQuery} -> fmap (maybe False (const True)) . pingQuery [])
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs
index 8532b492..d3904c40 100644
--- a/dht/src/Network/BitTorrent/MainlineDHT.hs
+++ b/dht/src/Network/BitTorrent/MainlineDHT.hs
@@ -512,8 +512,8 @@ data Routing = Routing
512 { tentativeId :: NodeInfo 512 { tentativeId :: NodeInfo
513 , committee4 :: TriadCommittee NodeId SockAddr 513 , committee4 :: TriadCommittee NodeId SockAddr
514 , committee6 :: TriadCommittee NodeId SockAddr 514 , committee6 :: TriadCommittee NodeId SockAddr
515 , refresher4 :: BucketRefresher NodeId NodeInfo 515 , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId
516 , refresher6 :: BucketRefresher NodeId NodeInfo 516 , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId
517 } 517 }
518 518
519sched4 :: Routing -> TVar (Int.PSQ POSIXTime) 519sched4 :: Routing -> TVar (Int.PSQ POSIXTime)
@@ -569,7 +569,6 @@ newClient swarms addr = do
569 -- We defer initializing the refreshSearch and refreshPing until we 569 -- We defer initializing the refreshSearch and refreshPing until we
570 -- have a client to send queries with. 570 -- have a client to send queries with.
571 let nullPing = const $ return False 571 let nullPing = const $ return False
572 nullSearch = mainlineSearch $ \_ _ -> return Canceled
573 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount 572 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount
574 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing 573 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing
575 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount 574 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount
@@ -730,7 +729,7 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError
730 729
731mainlineKademlia :: MainlineClient 730mainlineKademlia :: MainlineClient
732 -> TriadCommittee NodeId SockAddr 731 -> TriadCommittee NodeId SockAddr
733 -> BucketRefresher NodeId NodeInfo 732 -> BucketRefresher NodeId NodeInfo TransactionId
734 -> Kademlia NodeId NodeInfo 733 -> Kademlia NodeId NodeInfo
735mainlineKademlia client committee refresher 734mainlineKademlia client committee refresher
736 = Kademlia quietInsertions 735 = Kademlia quietInsertions
@@ -1037,6 +1036,35 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do
1037isReadonlyClient :: MainlineClient -> Bool 1036isReadonlyClient :: MainlineClient -> Bool
1038isReadonlyClient client = False -- TODO 1037isReadonlyClient client = False -- TODO
1039 1038
1039mainlineAsync :: ( BEncode xqry
1040 , BEncode xrsp
1041 ) => Method
1042 -> (xrsp -> rsp)
1043 -> (qry -> xqry)
1044 -> MainlineClient
1045 -> qry
1046 -> NodeInfo
1047 -> (TransactionId -> QR.Result rsp -> IO ())
1048 -> IO TransactionId
1049mainlineAsync meth unwrap msg client nid addr withResult = do
1050 asyncQuery client serializer (msg nid) addr $ \qid reply -> do
1051 withResult qid $ case reply of
1052 Success (Right x) -> Success x
1053 Success (Left e) -> Canceled -- TODO: Do something with parse errors.
1054 Canceled -> Canceled
1055 TimedOut -> TimedOut
1056 where
1057 serializer = MethodSerializer
1058 { methodTimeout = \ni -> return (ni, 5000000)
1059 , method = meth
1060 , wrapQuery = encodeQueryPayload meth (isReadonlyClient client)
1061 , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack)
1062 (Right . unwrap)
1063 . BE.fromBEncode)
1064 . rspPayload
1065 }
1066
1067
1040mainlineSend :: ( BEncode xqry 1068mainlineSend :: ( BEncode xqry
1041 , BEncode xrsp 1069 , BEncode xrsp
1042 ) => Method 1070 ) => Method
@@ -1073,30 +1101,54 @@ ping client addr =
1073getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) 1101getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ()))
1074getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) 1102getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1075 1103
1104asyncGetNodes :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ())
1105 -> IO TransactionId
1106asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1107
1076unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) 1108unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ())
1077unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) 1109unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ())
1078 1110
1079getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) 1111getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token))
1080getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce 1112getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1081 1113
1114asyncGetPeers :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[PeerAddr],Maybe Token) -> IO ())
1115 -> IO TransactionId
1116asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1117
1082unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) 1118unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token)
1083unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) 1119unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok)
1084 1120
1085mainlineSearch :: (NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo], [r], Maybe tok))) 1121nullTransactionId :: TransactionId
1086 -> Search NodeId (IP, PortNumber) tok NodeInfo r 1122nullTransactionId = TransactionId B.empty
1087mainlineSearch qry = Search 1123
1124nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId
1125nullSearch = Search
1126 { searchSpace = mainlineSpace
1127 , searchNodeAddress = nodeIP &&& nodePort
1128 , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId
1129 , searchQueryCancel = \_ _ -> return ()
1130 , searchAlpha = 8
1131 , searchK = 16
1132 }
1133
1134mainlineSearch :: MainlineClient
1135 -> (MainlineClient -> NodeId -> NodeInfo
1136 -> (TransactionId -> QR.Result ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO TransactionId)
1137 -> Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId
1138mainlineSearch client qry = Search
1088 { searchSpace = mainlineSpace 1139 { searchSpace = mainlineSpace
1089 , searchNodeAddress = nodeIP &&& nodePort 1140 , searchNodeAddress = nodeIP &&& nodePort
1090 , searchQuery = qry 1141 , searchQuery = qry client
1142 , searchQueryCancel = cancelQuery client
1091 , searchAlpha = 8 1143 , searchAlpha = 8
1092 , searchK = 16 1144 , searchK = 16
1093 } 1145 }
1094 1146
1095nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo 1147nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo TransactionId
1096nodeSearch client = mainlineSearch (getNodes client) 1148nodeSearch client = mainlineSearch client asyncGetNodes
1097 1149
1098peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr 1150peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr TransactionId
1099peerSearch client = mainlineSearch (getPeers client) 1151peerSearch client = mainlineSearch client asyncGetPeers
1100 1152
1101-- | List of bootstrap nodes maintained by different bittorrent 1153-- | List of bootstrap nodes maintained by different bittorrent
1102-- software authors. 1154-- software authors.
diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs
index f9f35ea4..4aed1c43 100644
--- a/dht/src/Network/Tox.hs
+++ b/dht/src/Network/Tox.hs
@@ -480,6 +480,6 @@ announceToLan sock nid = do
480 saferSendTo sock bs broadcast 480 saferSendTo sock bs broadcast
481 481
482 482
483toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous 483toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous DHT.TransactionId
484toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) 484toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox)
485 485
diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs
index d132da88..f4563a3b 100644
--- a/dht/src/Network/Tox/DHT/Handlers.hs
+++ b/dht/src/Network/Tox/DHT/Handlers.hs
@@ -133,8 +133,8 @@ data Routing = Routing
133 { tentativeId :: NodeInfo 133 { tentativeId :: NodeInfo
134 , committee4 :: TriadCommittee NodeId SockAddr 134 , committee4 :: TriadCommittee NodeId SockAddr
135 , committee6 :: TriadCommittee NodeId SockAddr 135 , committee6 :: TriadCommittee NodeId SockAddr
136 , refresher4 :: BucketRefresher NodeId NodeInfo 136 , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId
137 , refresher6 :: BucketRefresher NodeId NodeInfo 137 , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId
138 , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) 138 , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback])
139 } 139 }
140 140
@@ -172,6 +172,20 @@ routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBu
172routing6 :: Routing -> TVar (R.BucketList NodeInfo) 172routing6 :: Routing -> TVar (R.BucketList NodeInfo)
173routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets 173routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets
174 174
175nullTransactionId :: TransactionId
176nullTransactionId = TransactionId (Nonce8 0) (Nonce24 zeros24)
177
178nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId
179nullSearch = Search
180 { searchSpace = toxSpace
181 , searchNodeAddress = nodeIP &&& nodePort
182 , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId
183 , searchQueryCancel = \_ _ -> return ()
184 , searchAlpha = 1
185 , searchK = 2
186 }
187
188
175newRouting :: SockAddr -> TransportCrypto 189newRouting :: SockAddr -> TransportCrypto
176 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change 190 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change
177 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change 191 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change
@@ -195,13 +209,6 @@ newRouting addr crypto update4 update6 = do
195 -- We defer initializing the refreshSearch and refreshPing until we 209 -- We defer initializing the refreshSearch and refreshPing until we
196 -- have a client to send queries with. 210 -- have a client to send queries with.
197 let nullPing = const $ return False 211 let nullPing = const $ return False
198 nullSearch = Search
199 { searchSpace = toxSpace
200 , searchNodeAddress = nodeIP &&& nodePort
201 , searchQuery = \_ _ -> return Canceled
202 , searchAlpha = 1
203 , searchK = 2
204 }
205 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount 212 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount
206 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount 213 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount
207 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing 214 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing
@@ -432,6 +439,30 @@ getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> N
432 -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) 439 -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ()))
433getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) 440getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr)
434 441
442asyncGetNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> Multi.NodeInfo
443 -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ())
444 -> IO TransactionId
445asyncGetNodes client cbvar nid addr withResult = do
446 QR.asyncQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr $
447 \qid reply -> do
448 forM_ (join $ resultToMaybe reply) $ \(SendNodes ns) ->
449 forM_ ns $ \n -> do
450 now <- getPOSIXTime
451 atomically $ do
452 mcbs <- HashMap.lookup (nodeId . udpNodeInfo $ n) <$> readTVar cbvar
453 forM_ mcbs $ \cbs -> do
454 forM_ cbs $ \cb -> do
455 rumoredAddress cb now addr (udpNodeInfo n)
456 withResult qid $ case reply of
457 Success x -> maybe Canceled (Success . unwrapNodes) x
458 _ -> fmap (error "Network.Tox.DHT.Handlers.getNodes: the impossible happened!") reply
459
460asyncGetNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo
461 -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ())
462 -> IO TransactionId
463asyncGetNodesUDP client cbvar nid addr go = asyncGetNodes client cbvar nid (Multi.UDP ==> addr) go
464
465
435updateRouting :: Client -> Routing 466updateRouting :: Client -> Routing
436 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) 467 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ())
437 -> Multi.NodeInfo 468 -> Multi.NodeInfo
@@ -462,7 +493,7 @@ updateTable client routing orouter naddr = do
462 Want_Both -> do dput XMisc "BUG:unreachable" 493 Want_Both -> do dput XMisc "BUG:unreachable"
463 error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ 494 error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__
464 where 495 where
465 go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo -> IO () 496 go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo TransactionId -> IO ()
466 go committee refresher = do 497 go committee refresher = do
467 self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) 498 self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher)
468 -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) 499 -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr)
@@ -473,7 +504,7 @@ updateTable client routing orouter naddr = do
473toxKademlia :: Client 504toxKademlia :: Client
474 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) 505 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ())
475 -> TriadCommittee NodeId SockAddr 506 -> TriadCommittee NodeId SockAddr
476 -> BucketRefresher NodeId NodeInfo 507 -> BucketRefresher NodeId NodeInfo TransactionId
477 -> Kademlia NodeId NodeInfo 508 -> Kademlia NodeId NodeInfo
478toxKademlia client orouter committee refresher 509toxKademlia client orouter committee refresher
479 = Kademlia quietInsertions 510 = Kademlia quietInsertions
@@ -541,11 +572,12 @@ handlers crypto _ CookieRequestType = Just $ MethodHandler (isCookieReques
541handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH 572handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH
542handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ 573handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ
543 574
544nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo 575nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo TransactionId
545nodeSearch client cbvar = Search 576nodeSearch client cbvar = Search
546 { searchSpace = toxSpace 577 { searchSpace = toxSpace
547 , searchNodeAddress = nodeIP &&& nodePort 578 , searchNodeAddress = nodeIP &&& nodePort
548 , searchQuery = getNodesUDP client cbvar 579 , searchQuery = asyncGetNodesUDP client cbvar
580 , searchQueryCancel = cancelQuery client
549 , searchAlpha = 8 581 , searchAlpha = 8
550 , searchK = 16 582 , searchK = 16
551 } 583 }
diff --git a/dht/src/Network/Tox/Onion/Handlers.hs b/dht/src/Network/Tox/Onion/Handlers.hs
index 015c758c..45795312 100644
--- a/dht/src/Network/Tox/Onion/Handlers.hs
+++ b/dht/src/Network/Tox/Onion/Handlers.hs
@@ -218,13 +218,14 @@ handlers net _ _ keydb _ = Just $ NoReply Right $ dataToRouteH keydb net
218toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int)) 218toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int))
219 -> TransportCrypto 219 -> TransportCrypto
220 -> Client r 220 -> Client r
221 -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous 221 -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous TransactionId
222toxidSearch getTimeout crypto client = Search 222toxidSearch getTimeout crypto client = Search
223 { searchSpace = toxSpace 223 { searchSpace = toxSpace
224 , searchNodeAddress = nodeIP &&& nodePort 224 , searchNodeAddress = nodeIP &&& nodePort
225 , searchQuery = getRendezvous getTimeout crypto client 225 , searchQuery = asyncGetRendezvous getTimeout crypto client
226 , searchAlpha = 3 226 , searchQueryCancel = cancelQuery client
227 , searchK = 6 227 , searchAlpha = 3
228 , searchK = 6
228 } 229 }
229 230
230announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int)) 231announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int))
@@ -289,6 +290,25 @@ sendOnion getTimeout client req oaddr unwrap =
289 Canceled -> return Canceled 290 Canceled -> return Canceled
290 TimedOut -> re 291 TimedOut -> re
291 292
293asyncOnion :: (OnionDestination r -> STM (OnionDestination r, Int))
294 -> Client r
295 -> AnnounceRequest
296 -> OnionDestination r
297 -> (NodeInfo -> AnnounceResponse -> t)
298 -> (TransactionId -> QR.Result t -> IO ())
299 -> IO TransactionId
300asyncOnion getTimeout client req oaddr unwrap withResult = do
301 -- TODO: Restore "Four tries and then we tap out" behavior.
302 qid <- QR.asyncQuery client (announceSerializer getTimeout) req oaddr $ \k mb -> do
303 forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " async sent response: " ++ show r
304 withResult k $ case mb of
305 Success x -> maybe (TimedOut)
306 (Success . unwrap (onionNodeInfo oaddr))
307 (x :: Maybe AnnounceResponse)
308 Canceled -> Canceled
309 TimedOut -> TimedOut
310 return qid
311
292 312
293-- | Lookup the secret counterpart for a given alias key. 313-- | Lookup the secret counterpart for a given alias key.
294getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) 314getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int))
@@ -308,6 +328,27 @@ getRendezvous getTimeout crypto client nid ni = do
308 oaddr 328 oaddr
309 (unwrapAnnounceResponse rkey) 329 (unwrapAnnounceResponse rkey)
310 330
331asyncGetRendezvous ::
332 (OnionDestination r -> STM (OnionDestination r, Int))
333 -> TransportCrypto
334 -> Client r
335 -> NodeId
336 -> NodeInfo
337 -> (TransactionId -> Result ([NodeInfo],[Rendezvous],Maybe Nonce32) -> IO ())
338 -> IO TransactionId
339asyncGetRendezvous getTimeout crypto client nid ni withResult = do
340 asel <- atomically $ selectAlias crypto nid
341 let oaddr = OnionDestination asel ni Nothing
342 rkey = case asel of
343 SearchingAlias -> Nothing
344 _ -> Just $ key2id $ rendezvousPublic crypto
345 asyncOnion getTimeout client
346 (AnnounceRequest zeros32 nid $ fromMaybe zeroID rkey)
347 oaddr
348 (unwrapAnnounceResponse rkey)
349 withResult
350
351
311putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) 352putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int))
312 -> TransportCrypto 353 -> TransportCrypto
313 -> Client r 354 -> Client r
diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs
index 9ce4e316..2f13a513 100644
--- a/dht/src/Network/Tox/Onion/Routes.hs
+++ b/dht/src/Network/Tox/Onion/Routes.hs
@@ -88,7 +88,7 @@ data OnionRouter = OnionRouter
88 , tcpProber :: TCP.TCPProber 88 , tcpProber :: TCP.TCPProber
89 , tcpProberThread :: ThreadId 89 , tcpProberThread :: ThreadId
90 -- | Kademlia table of TCP relays. 90 -- | Kademlia table of TCP relays.
91 , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo 91 , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo Nonce8
92 , tcpRelayPinger :: RelayPinger 92 , tcpRelayPinger :: RelayPinger
93 -- | Debug prints are written to this channel which is then flushed to 93 -- | Debug prints are written to this channel which is then flushed to
94 -- 'routeLogger'. 94 -- 'routeLogger'.
@@ -601,44 +601,49 @@ hookQueries or t8 tmethods = TransactionMethods
601 modifyTVar' (pendingQueries or) (W64.insert w8 pq) 601 modifyTVar' (pendingQueries or) (W64.insert w8 pq)
602 writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] 602 writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ]
603 return (tid,d') 603 return (tid,d')
604 , dispatchResponse = \tid x d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) 604 , dispatchResponse = \tid rx d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ())
605 let Nonce8 w8 = t8 tid 605 case rx of
606 mb <- W64.lookup w8 <$> readTVar (pendingQueries or) 606 Success x -> do
607 modifyTVar' (pendingQueries or) (W64.delete w8) 607 let Nonce8 w8 = t8 tid
608 forM_ mb $ \pq -> do 608 mb <- W64.lookup w8 <$> readTVar (pendingQueries or)
609 let od = pendingDestination pq 609 modifyTVar' (pendingQueries or) (W64.delete w8)
610 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) 610 forM_ mb $ \pq -> do
611 $ onionRouteSpec od 611 let od = pendingDestination pq
612 modifyArray (routeMap or) (fmap gotResponse) rid 612 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od)))
613 writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) 613 $ onionRouteSpec od
614 dispatchResponse tmethods tid x d 614 modifyArray (routeMap or) (fmap gotResponse) rid
615 , dispatchCancel = \tid d -> {-# SCC "hookQ.dispatchCancel" #-} do -- :: tid -> d -> STM d 615 writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8)
616 let Nonce8 w8 = t8 tid 616 dispatchResponse tmethods tid rx d
617 mb <- W64.lookup w8 <$> readTVar (pendingQueries or) 617 _ -> do -- Timed out or canceled...
618 modifyTVar' (pendingQueries or) (W64.delete w8) 618 let Nonce8 w8 = t8 tid
619 forM_ mb $ \pq -> do 619 mb <- W64.lookup w8 <$> readTVar (pendingQueries or)
620 let od = pendingDestination pq 620 modifyTVar' (pendingQueries or) (W64.delete w8)
621 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) 621 forM_ mb $ \pq -> do
622 $ onionRouteSpec od 622 let od = pendingDestination pq
623 mrr <- readArray (routeMap or) rid 623 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od)))
624 forM_ mrr $ \rr -> do 624 $ onionRouteSpec od
625 when (routeVersion rr == pendingVersion pq) $ do 625 mrr <- readArray (routeMap or) rid
626 let expireRoute = modifyArray (pendingRoutes or) expire rid 626 forM_ mrr $ \rr -> do
627 expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) 627 when (routeVersion rr == pendingVersion pq) $ do
628 | otherwise = ver 628 let expireRoute = modifyArray (pendingRoutes or) expire rid
629 modifyArray (routeMap or) (fmap gotTimeout) rid 629 expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq)
630 case rr of 630 | otherwise = ver
631 RouteRecord{ responseCount = 0 631 case rx of
632 , timeoutCount = c 632 TimedOut -> do
633 , routeVersion = v } | c >= 5 -> expireRoute 633 modifyArray (routeMap or) (fmap gotTimeout) rid
634 RouteRecord{ responseCount = 1 634 case rr of
635 , timeoutCount = c 635 RouteRecord{ responseCount = 0
636 , routeVersion = v } | c >= 10 -> expireRoute 636 , timeoutCount = c
637 RouteRecord{ timeoutCount = c 637 , routeVersion = v } | c >= 5 -> expireRoute
638 , routeVersion = v } | c >= 20 -> expireRoute 638 RouteRecord{ responseCount = 1
639 _ -> return () 639 , timeoutCount = c
640 writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) 640 , routeVersion = v } | c >= 10 -> expireRoute
641 dispatchCancel tmethods tid d 641 RouteRecord{ timeoutCount = c
642 , routeVersion = v } | c >= 20 -> expireRoute
643 _ -> return ()
644 _ -> return () -- Don't penalize route for canceled queries.
645 writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8)
646 dispatchResponse tmethods tid rx d
642 } 647 }
643 648
644 649
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs
index 932b4ab3..a37c0310 100644
--- a/dht/src/Network/Tox/TCP.hs
+++ b/dht/src/Network/Tox/TCP.hs
@@ -223,11 +223,53 @@ getTCPNodes tcp seeking dst = do
223getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) 223getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()))
224getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst 224getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst
225 225
226
226getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) 227getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo))
227getUDPNodes' tcp seeking dst0 = do 228getUDPNodes' tcp seeking dst0 = do
229 goGetUDPNodes tcp seeking dst0 (return Canceled) $ \meth gateway dst -> do
230 r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway
231 forM r $ \response -> do
232 let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response
233 return ( (ns,ns, const () <$> mb), gateway )
234
235-- Failure case, currently not treated as special.
236-- The current searchQuery type demands a valid Nonce8 is returned
237-- even if we were unable to send a query.
238fixmeNonce :: Nonce8
239fixmeNonce = Nonce8 0
240
241asyncUDPNodes :: TCPClient err Nonce8
242 -> NodeId
243 -> UDP.NodeInfo
244 -> (Nonce8
245 -> QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)
246 -> IO ())
247 -> IO Nonce8
248asyncUDPNodes tcp seeking dst0 withResult =
249 goGetUDPNodes tcp seeking dst0 (return fixmeNonce) $ \meth gateway dst -> do
250 asyncQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway $
251 \qid response -> do
252 let wut response =
253 let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response
254 in ( (ns,ns, const () <$> mb), gateway )
255 withResult qid $ fmap wut response
256
257type Meth x = MethodSerializer
258 Nonce8
259 x -- NodeInfo
260 (Bool, RelayPacket)
261 PacketNumber
262 AnnounceRequest
263 (Either String AnnounceResponse)
264
265goGetUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo
266 -> IO a
267 -> (Meth x -> NodeInfo -> UDP.NodeInfo -> IO a)
268 -> IO a
269goGetUDPNodes tcp seeking dst0 fail go = do
228 mgateway <- atomically $ tcpGetGateway tcp dst0 270 mgateway <- atomically $ tcpGetGateway tcp dst0
229 case mgateway of 271 case mgateway of
230 Nothing -> return Canceled 272 Nothing -> fail
231 Just gateway -> do 273 Just gateway -> do
232 (b,c,n24) <- atomically $ do 274 (b,c,n24) <- atomically $ do
233 b <- transportNewKey (tcpCrypto tcp) 275 b <- transportNewKey (tcpCrypto tcp)
@@ -267,10 +309,7 @@ getUDPNodes' tcp seeking dst0 = do
267 -> decrypt (wrap0 n24') r >>= decodePlain 309 -> decrypt (wrap0 n24') r >>= decodePlain
268 x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x 310 x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x
269 } 311 }
270 r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway 312 go meth gateway dst
271 forM r $ \response -> do
272 let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response
273 return ( (ns,ns, const () <$> mb), gateway )
274 313
275 314
276handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) 315handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x))
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
index e30c40da..19d0df69 100644
--- a/kad/src/Network/Kademlia/Search.hs
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -44,7 +44,7 @@ data Search nid addr tok ni r qk = Search
44 { searchSpace :: KademliaSpace nid ni 44 { searchSpace :: KademliaSpace nid ni
45 , searchNodeAddress :: ni -> addr 45 , searchNodeAddress :: ni -> addr
46 , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk 46 , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk
47 , searchQueryCancel :: qk -> STM () 47 , searchQueryCancel :: (IO () -> STM ()) -> qk -> STM ()
48 , searchAlpha :: Int -- α = 8 48 , searchAlpha :: Int -- α = 8
49 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on 49 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
50 -- how fast the queries are. For Tox's much slower onion-routed queries, we 50 -- how fast the queries are. For Tox's much slower onion-routed queries, we
@@ -118,19 +118,21 @@ reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
118 -> Search nid addr1 tok1 ni r1 qk 118 -> Search nid addr1 tok1 ni r1 qk
119 -> nid 119 -> nid
120 -> SearchState nid addr tok ni r qk 120 -> SearchState nid addr tok ni r qk
121 -> STM (SearchState nid addr tok ni r qk) 121 -> IO (SearchState nid addr tok ni r qk)
122reset nearestNodes qsearch target st = do 122reset nearestNodes qsearch target st = do
123 pc <- readTVar (searchPendingCount st) 123 atomically $ searchCancel st
124 check (pc == 0) 124 atomically $ do
125 searchIsFinished st >>= check -- Wait for a search to finish before resetting. 125 pc <- readTVar (searchPendingCount st)
126 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) 126 check (pc == 0)
127 <$> nearestNodes target 127 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
128 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) 128 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
129 writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes 129 <$> nearestNodes target
130 writeTVar (searchInformant st) MM.empty 130 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st)
131 writeTVar (searchVisited st) Set.empty 131 writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes
132 writeTVar (searchPendingCount st) 0 132 writeTVar (searchInformant st) MM.empty
133 return st 133 writeTVar (searchVisited st) Set.empty
134 writeTVar (searchPendingCount st) 0
135 return st
134 136
135grokQuery :: forall addr nid tok ni r qk. 137grokQuery :: forall addr nid tok ni r qk.
136 ( Ord addr 138 ( Ord addr
@@ -233,8 +235,9 @@ searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni,
233 -> IO () 235 -> IO ()
234searchLoop sch@Search{..} target result s@SearchState{..} = do 236searchLoop sch@Search{..} target result s@SearchState{..} = do
235 myThreadId >>= flip labelThread ("search."++show target) 237 myThreadId >>= flip labelThread ("search."++show target)
236 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do 238 iochan <- atomically newTChan
237 join $ atomically $ do 239 fix $ \again -> do
240 join $ atomically $ orElse (fmap (>> again) $ readTChan iochan) $ do
238 cnt <- readTVar $ searchPendingCount 241 cnt <- readTVar $ searchPendingCount
239 check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. 242 check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time.
240 informants <- readTVar searchInformant 243 informants <- readTVar searchInformant
@@ -256,6 +259,6 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
256 return $ do 259 return $ do
257 qk <- searchQuery target ni $ 260 qk <- searchQuery target ni $
258 \qk reply -> grokQuery sch target result s (ni :-> d) qk reply 261 \qk reply -> grokQuery sch target result s (ni :-> d) qk reply
259 atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) 262 atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel (writeTChan iochan) qk)
260 again 263 again
261 _ -> searchIsFinished s >>= check >> return (return ()) 264 _ -> searchIsFinished s >>= check >> return (return ())
diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs
index 33860a2f..e8aa33c7 100644
--- a/kad/tests/searchCancel.hs
+++ b/kad/tests/searchCancel.hs
@@ -52,7 +52,7 @@ sch mbv var = Search
52 let qk = maybe 0 (\(ns,_,_) -> head ns) r 52 let qk = maybe 0 (\(ns,_,_) -> head ns) r
53 f qk $ maybe TimedOut Success r 53 f qk $ maybe TimedOut Success r
54 return qk 54 return qk
55 , searchQueryCancel = \_ -> return () 55 , searchQueryCancel = \_ _ -> return ()
56 , searchAlpha = 4 56 , searchAlpha = 4
57 , searchK = 8 57 , searchK = 8
58 } 58 }
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs
index 4f14ea3c..94eb4796 100644
--- a/server/src/Network/QueryResponse.hs
+++ b/server/src/Network/QueryResponse.hs
@@ -332,9 +332,6 @@ data TransactionMethods d qid addr x = TransactionMethods
332 -- will write the packet to the correct 'MVar' thus completing the 332 -- will write the packet to the correct 'MVar' thus completing the
333 -- dispatch. 333 -- dispatch.
334 , dispatchResponse :: qid -> Result x -> d -> STM (d, IO ()) 334 , dispatchResponse :: qid -> Result x -> d -> STM (d, IO ())
335 -- | When a timeout interval elapses, this method is called to remove the
336 -- transaction from the table.
337 , dispatchCancel :: qid -> d -> STM d
338 } 335 }
339 336
340-- | A set of methods necessary for dispatching incoming packets. 337-- | A set of methods necessary for dispatching incoming packets.
@@ -429,7 +426,9 @@ asyncQuery c@(Client net d err pending whoami _) meth q addr0 withResponse = do
429 tm_key <- registerTimeout tm expiry $ do 426 tm_key <- registerTimeout tm expiry $ do
430 atomically $ do 427 atomically $ do
431 tbl <- readTVar pending 428 tbl <- readTVar pending
432 v <- dispatchCancel (tableMethods d) qid tbl 429 -- Below, we discard the returned IO action since we will call
430 -- withResponse directly later.
431 (v,_) <- dispatchResponse (tableMethods d) qid TimedOut tbl
433 writeTVar pending v 432 writeTVar pending v
434 m <- takeMVar keyvar 433 m <- takeMVar keyvar
435 forM_ m $ \_ -> withResponse qid TimedOut 434 forM_ m $ \_ -> withResponse qid TimedOut
@@ -505,8 +504,7 @@ transactionMethods' ::
505 -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. 504 -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
506 -> TransactionMethods (g,t a) qid addr x 505 -> TransactionMethods (g,t a) qid addr x
507transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods 506transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
508 { dispatchCancel = \tid (g,t) -> return (g, delete tid t) 507 { dispatchRegister = \nowPlusExpiry v a (g,t) -> do
509 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do
510 let (tid,g') = generate g 508 let (tid,g') = generate g
511 let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t 509 let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t
512 return ( tid, (g',t') ) 510 return ( tid, (g',t') )