diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /dht/src/Network/Tox/DHT/Handlers.hs | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'dht/src/Network/Tox/DHT/Handlers.hs')
-rw-r--r-- | dht/src/Network/Tox/DHT/Handlers.hs | 573 |
1 files changed, 573 insertions, 0 deletions
diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs new file mode 100644 index 00000000..1eec93b9 --- /dev/null +++ b/dht/src/Network/Tox/DHT/Handlers.hs | |||
@@ -0,0 +1,573 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
3 | {-# LANGUAGE NamedFieldPuns #-} | ||
4 | {-# LANGUAGE PatternSynonyms #-} | ||
5 | {-# LANGUAGE TupleSections #-} | ||
6 | module Network.Tox.DHT.Handlers where | ||
7 | |||
8 | import Debug.Trace | ||
9 | import Network.Tox.DHT.Transport as DHTTransport | ||
10 | import Network.QueryResponse as QR hiding (Client) | ||
11 | import qualified Network.QueryResponse as QR (Client) | ||
12 | import Crypto.Tox | ||
13 | import Network.Kademlia.Search | ||
14 | import qualified Data.Wrapper.PSQInt as Int | ||
15 | import Network.Kademlia | ||
16 | import Network.Kademlia.Bootstrap | ||
17 | import Network.Address (WantIP (..), ipFamily, fromSockAddr, sockAddrPort) | ||
18 | import qualified Network.Kademlia.Routing as R | ||
19 | import Control.TriadCommittee | ||
20 | import System.Global6 | ||
21 | import DPut | ||
22 | import DebugTag | ||
23 | |||
24 | import qualified Data.ByteArray as BA | ||
25 | import qualified Data.ByteString.Char8 as C8 | ||
26 | import qualified Data.ByteString.Base16 as Base16 | ||
27 | import Control.Arrow | ||
28 | import Control.Monad | ||
29 | import Control.Concurrent.Lifted.Instrument | ||
30 | import Control.Concurrent.STM | ||
31 | import Data.Hashable | ||
32 | import Data.Ord | ||
33 | import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) | ||
34 | import Network.Socket | ||
35 | import qualified Data.HashMap.Strict as HashMap | ||
36 | ;import Data.HashMap.Strict (HashMap) | ||
37 | #if MIN_VERSION_iproute(1,7,4) | ||
38 | import Data.IP hiding (fromSockAddr) | ||
39 | #else | ||
40 | import Data.IP | ||
41 | #endif | ||
42 | import Data.Maybe | ||
43 | import Data.Serialize (Serialize) | ||
44 | import Data.Word | ||
45 | |||
46 | data TransactionId = TransactionId | ||
47 | { transactionKey :: Nonce8 -- ^ Used to lookup pending query. | ||
48 | , cryptoNonce :: Nonce24 -- ^ Used during the encryption layer. | ||
49 | } | ||
50 | deriving (Eq,Ord,Show) | ||
51 | |||
52 | newtype PacketKind = PacketKind Word8 | ||
53 | deriving (Eq, Ord, Serialize) | ||
54 | |||
55 | pattern OnionRequest0Type = PacketKind 128 -- 0x80 Onion Request 0 | ||
56 | pattern OnionRequest1Type = PacketKind 129 -- 0x81 Onion Request 1 | ||
57 | pattern OnionRequest2Type = PacketKind 130 -- 0x82 Onion Request 2 | ||
58 | pattern AnnounceType = PacketKind 131 -- 0x83 Announce Request | ||
59 | pattern AnnounceResponseType = PacketKind 132 -- 0x84 Announce Response | ||
60 | |||
61 | pattern DataRequestType = PacketKind 133 -- 0x85 Onion Data Request (data to route request packet) | ||
62 | pattern DataResponseType = PacketKind 134 -- 0x86 Onion Data Response (data to route response packet) | ||
63 | -- 0x8c Onion Response 3 | ||
64 | -- 0x8d Onion Response 2 | ||
65 | pattern OnionResponse3Type = PacketKind 140 -- 0x8c Onion Response 3 | ||
66 | pattern OnionResponse2Type = PacketKind 141 -- 0x8d Onion Response 2 | ||
67 | pattern OnionResponse1Type = PacketKind 142 -- 0x8e Onion Response 1 | ||
68 | -- 0xf0 Bootstrap Info | ||
69 | |||
70 | pattern DHTRequestType = PacketKind 32 -- 0x20 DHT Request | ||
71 | |||
72 | pattern CookieRequestType = PacketKind 0x18 | ||
73 | pattern CookieResponseType = PacketKind 0x19 | ||
74 | |||
75 | pattern PingType = PacketKind 0 -- 0x00 Ping Request | ||
76 | pattern PongType = PacketKind 1 -- 0x01 Ping Response | ||
77 | pattern GetNodesType = PacketKind 2 -- 0x02 Nodes Request | ||
78 | pattern SendNodesType = PacketKind 4 -- 0x04 Nodes Response | ||
79 | |||
80 | |||
81 | instance Show PacketKind where | ||
82 | showsPrec d PingType = mappend "PingType" | ||
83 | showsPrec d PongType = mappend "PongType" | ||
84 | showsPrec d GetNodesType = mappend "GetNodesType" | ||
85 | showsPrec d SendNodesType = mappend "SendNodesType" | ||
86 | showsPrec d DHTRequestType = mappend "DHTRequestType" | ||
87 | showsPrec d OnionRequest0Type = mappend "OnionRequest0Type" | ||
88 | showsPrec d OnionResponse1Type = mappend "OnionResponse1Type" | ||
89 | showsPrec d OnionResponse3Type = mappend "OnionResponse3Type" | ||
90 | showsPrec d AnnounceType = mappend "AnnounceType" | ||
91 | showsPrec d AnnounceResponseType = mappend "AnnounceResponseType" | ||
92 | showsPrec d DataRequestType = mappend "DataRequestType" | ||
93 | showsPrec d DataResponseType = mappend "DataResponseType" | ||
94 | showsPrec d CookieRequestType = mappend "CookieRequestType" | ||
95 | showsPrec d CookieResponseType = mappend "CookieResponseType" | ||
96 | showsPrec d (PacketKind x) = mappend "PacketKind " . showsPrec (d+1) x | ||
97 | |||
98 | msgType :: ( Serialize (f DHTRequest) | ||
99 | , Serialize (f (Cookie Encrypted)), Serialize (f CookieRequest) | ||
100 | , Serialize (f SendNodes), Serialize (f GetNodes) | ||
101 | , Serialize (f Pong), Serialize (f Ping) | ||
102 | ) => DHTMessage f -> PacketKind | ||
103 | msgType msg = PacketKind $ fst $ dhtMessageType msg | ||
104 | |||
105 | classify :: Client -> Message -> MessageClass String PacketKind TransactionId NodeInfo Message | ||
106 | classify client (DHTLanDiscovery {}) = IsUnsolicited (lanDiscoveryH client) | ||
107 | classify client msg = fromMaybe (IsUnknown "unknown") | ||
108 | $ mapMessage (\nonce24 (nonce8,_) -> go msg (TransactionId nonce8 nonce24)) msg | ||
109 | where | ||
110 | go (DHTPing {}) = IsQuery PingType | ||
111 | go (DHTGetNodes {}) = IsQuery GetNodesType | ||
112 | go (DHTPong {}) = IsResponse | ||
113 | go (DHTSendNodes {}) = IsResponse | ||
114 | go (DHTCookieRequest {}) = IsQuery CookieRequestType | ||
115 | go (DHTCookie {}) = IsResponse | ||
116 | go (DHTDHTRequest {}) = IsQuery DHTRequestType | ||
117 | |||
118 | data NodeInfoCallback = NodeInfoCallback | ||
119 | { interestingNodeId :: NodeId | ||
120 | , listenerId :: Int | ||
121 | , observedAddress :: POSIXTime -> NodeInfo -- Address and port for interestingNodeId | ||
122 | -> STM () | ||
123 | , rumoredAddress :: POSIXTime -> SockAddr -- source of information | ||
124 | -> NodeInfo -- Address and port for interestingNodeId | ||
125 | -> STM () | ||
126 | } | ||
127 | |||
128 | data Routing = Routing | ||
129 | { tentativeId :: NodeInfo | ||
130 | , committee4 :: TriadCommittee NodeId SockAddr | ||
131 | , committee6 :: TriadCommittee NodeId SockAddr | ||
132 | , refresher4 :: BucketRefresher NodeId NodeInfo | ||
133 | , refresher6 :: BucketRefresher NodeId NodeInfo | ||
134 | , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) | ||
135 | } | ||
136 | |||
137 | registerNodeCallback :: Routing -> NodeInfoCallback -> STM () | ||
138 | registerNodeCallback Routing{nodesOfInterest} cb = do | ||
139 | cbm <- readTVar nodesOfInterest | ||
140 | let ns = fromMaybe [] $ HashMap.lookup (interestingNodeId cb) cbm | ||
141 | bs = filter nonMatching ns | ||
142 | where nonMatching n = (listenerId n /= listenerId cb) | ||
143 | writeTVar nodesOfInterest $ HashMap.insert (interestingNodeId cb) | ||
144 | (cb : bs) | ||
145 | cbm | ||
146 | |||
147 | unregisterNodeCallback :: Int -> Routing -> NodeId -> STM () | ||
148 | unregisterNodeCallback callbackId Routing{nodesOfInterest} nid = do | ||
149 | cbm <- readTVar nodesOfInterest | ||
150 | let ns = fromMaybe [] $ HashMap.lookup nid cbm | ||
151 | bs = filter nonMatching ns | ||
152 | where nonMatching n = (listenerId n /= callbackId) | ||
153 | writeTVar nodesOfInterest | ||
154 | $ if null bs | ||
155 | then HashMap.delete nid cbm | ||
156 | else HashMap.insert nid bs cbm | ||
157 | |||
158 | |||
159 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
160 | sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue | ||
161 | |||
162 | sched6 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
163 | sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue | ||
164 | |||
165 | routing4 :: Routing -> TVar (R.BucketList NodeInfo) | ||
166 | routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
167 | |||
168 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) | ||
169 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
170 | |||
171 | newRouting :: SockAddr -> TransportCrypto | ||
172 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change | ||
173 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change | ||
174 | -> IO (Client -> Routing) | ||
175 | newRouting addr crypto update4 update6 = do | ||
176 | let tentative_ip4 = fromMaybe (IPv4 $ toEnum 0) (IPv4 <$> fromSockAddr addr) | ||
177 | tentative_ip6 = fromMaybe (IPv6 $ toEnum 0) (IPv6 <$> fromSockAddr addr) | ||
178 | tentative_info = NodeInfo | ||
179 | { nodeId = key2id $ transportPublic crypto | ||
180 | , nodeIP = fromMaybe (toEnum 0) (fromSockAddr addr) | ||
181 | , nodePort = fromMaybe 0 $ sockAddrPort addr | ||
182 | } | ||
183 | tentative_info4 = tentative_info { nodeIP = tentative_ip4 } | ||
184 | tentative_info6 <- | ||
185 | maybe (tentative_info { nodeIP = tentative_ip6 }) | ||
186 | (\ip6 -> tentative_info { nodeIP = IPv6 ip6 }) | ||
187 | <$> case addr of | ||
188 | SockAddrInet {} -> return Nothing | ||
189 | _ -> global6 | ||
190 | atomically $ do | ||
191 | -- We defer initializing the refreshSearch and refreshPing until we | ||
192 | -- have a client to send queries with. | ||
193 | let nullPing = const $ return False | ||
194 | nullSearch = Search | ||
195 | { searchSpace = toxSpace | ||
196 | , searchNodeAddress = nodeIP &&& nodePort | ||
197 | , searchQuery = Left $ \_ _ -> return Nothing | ||
198 | , searchAlpha = 1 | ||
199 | , searchK = 2 | ||
200 | } | ||
201 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount | ||
202 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount | ||
203 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing | ||
204 | refresher6 <- newBucketRefresher tbl6 nullSearch nullPing | ||
205 | committee4 <- newTriadCommittee (update4 tbl4) -- updateIPVote tbl4 addr4 | ||
206 | committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6 | ||
207 | cbvar <- newTVar HashMap.empty | ||
208 | return $ \client -> | ||
209 | -- Now we have a client, so tell the BucketRefresher how to search and ping. | ||
210 | let updIO r = updateRefresherIO (nodeSearch client cbvar) (ping client) r | ||
211 | in Routing { tentativeId = tentative_info | ||
212 | , committee4 = committee4 | ||
213 | , committee6 = committee6 | ||
214 | , refresher4 = updIO refresher4 | ||
215 | , refresher6 = updIO refresher6 | ||
216 | , nodesOfInterest = cbvar | ||
217 | } | ||
218 | |||
219 | |||
220 | -- TODO: This should cover more cases | ||
221 | isLocal :: IP -> Bool | ||
222 | isLocal (IPv6 ip6) = (ip6 == toEnum 0) | ||
223 | isLocal (IPv4 ip4) = (ip4 == toEnum 0) | ||
224 | |||
225 | isGlobal :: IP -> Bool | ||
226 | isGlobal = not . isLocal | ||
227 | |||
228 | prefer4or6 :: NodeInfo -> Maybe WantIP -> WantIP | ||
229 | prefer4or6 addr iptyp = fromMaybe (ipFamily $ nodeIP addr) iptyp | ||
230 | |||
231 | toxSpace :: R.KademliaSpace NodeId NodeInfo | ||
232 | toxSpace = R.KademliaSpace | ||
233 | { R.kademliaLocation = nodeId | ||
234 | , R.kademliaTestBit = testNodeIdBit | ||
235 | , R.kademliaXor = xorNodeId | ||
236 | , R.kademliaSample = sampleNodeId | ||
237 | } | ||
238 | |||
239 | |||
240 | pingH :: NodeInfo -> Ping -> IO Pong | ||
241 | pingH _ Ping = return Pong | ||
242 | |||
243 | getNodesH :: Routing -> NodeInfo -> GetNodes -> IO SendNodes | ||
244 | getNodesH routing addr (GetNodes nid) = do | ||
245 | let preferred = prefer4or6 addr Nothing | ||
246 | |||
247 | (append4,append6) <- atomically $ do | ||
248 | ni4 <- R.thisNode <$> readTVar (routing4 routing) | ||
249 | ni6 <- R.thisNode <$> readTVar (routing6 routing) | ||
250 | return $ case ipFamily (nodeIP addr) of | ||
251 | Want_IP4 | isGlobal (nodeIP ni6) -> (id, (++ [ni6])) | ||
252 | Want_IP6 | isGlobal (nodeIP ni4) -> ((++ [ni4]), id) | ||
253 | _ -> (id, id) | ||
254 | ks <- go append4 $ routing4 routing | ||
255 | ks6 <- go append6 $ routing6 routing | ||
256 | let (ns1,ns2) = case preferred of Want_IP6 -> (ks6,ks) | ||
257 | Want_IP4 -> (ks,ks6) | ||
258 | Want_Both -> error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ | ||
259 | return $ SendNodes | ||
260 | $ if null ns2 then ns1 | ||
261 | else take 4 (take 3 ns1 ++ ns2) | ||
262 | where | ||
263 | go f var = f . R.kclosest toxSpace k nid <$> atomically (readTVar var) | ||
264 | |||
265 | k = 4 | ||
266 | |||
267 | createCookie :: TransportCrypto -> NodeInfo -> PublicKey -> IO (Cookie Encrypted) | ||
268 | createCookie crypto ni remoteUserKey = do | ||
269 | (n24,sym) <- atomically $ do | ||
270 | n24 <- transportNewNonce crypto | ||
271 | sym <- transportSymmetric crypto | ||
272 | return (n24,sym) | ||
273 | timestamp <- round . (* 1000000) <$> getPOSIXTime | ||
274 | let dta = encodePlain $ CookieData | ||
275 | { cookieTime = timestamp | ||
276 | , longTermKey = remoteUserKey | ||
277 | , dhtKey = id2key $ nodeId ni -- transportPublic crypto | ||
278 | } | ||
279 | edta = encryptSymmetric sym n24 dta | ||
280 | return $ Cookie n24 edta | ||
281 | |||
282 | createCookieSTM :: POSIXTime -> TransportCrypto -> NodeInfo -> PublicKey -> STM (Cookie Encrypted) | ||
283 | createCookieSTM now crypto ni remoteUserKey = do | ||
284 | let dmsg msg = trace msg (return ()) | ||
285 | (n24,sym) <- do | ||
286 | n24 <- transportNewNonce crypto | ||
287 | sym <- transportSymmetric crypto | ||
288 | return (n24,sym) | ||
289 | let timestamp = round (now * 1000000) | ||
290 | let dta = encodePlain $ CookieData | ||
291 | { cookieTime = timestamp | ||
292 | , longTermKey = remoteUserKey | ||
293 | , dhtKey = id2key $ nodeId ni -- transportPublic crypto | ||
294 | } | ||
295 | edta = encryptSymmetric sym n24 dta | ||
296 | return $ Cookie n24 edta | ||
297 | |||
298 | cookieRequestH :: TransportCrypto -> NodeInfo -> CookieRequest -> IO (Cookie Encrypted) | ||
299 | cookieRequestH crypto ni (CookieRequest remoteUserKey) = do | ||
300 | dput XNetCrypto $ unlines | ||
301 | [ show (nodeAddr ni) ++ " --> request cookie: remoteUserKey=" ++ show (key2id remoteUserKey) | ||
302 | , show (nodeAddr ni) ++ " --> sender=" ++ show (nodeId ni) ] | ||
303 | x <- createCookie crypto ni remoteUserKey | ||
304 | dput XNetCrypto $ show (nodeAddr ni) ++ " <-- cookie " ++ show (key2id remoteUserKey) | ||
305 | return x | ||
306 | |||
307 | lanDiscoveryH :: Client -> NodeInfo -> NodeInfo -> IO (Maybe (Message -> Message)) | ||
308 | lanDiscoveryH client _ ni = do | ||
309 | dput XLan $ show (nodeAddr ni) ++ " --> LanAnnounce " ++ show (nodeId ni) | ||
310 | forkIO $ do | ||
311 | myThreadId >>= flip labelThread "lan-discover-ping" | ||
312 | ping client ni | ||
313 | return () | ||
314 | return Nothing | ||
315 | |||
316 | type Message = DHTMessage ((,) Nonce8) | ||
317 | |||
318 | type Client = QR.Client String PacketKind TransactionId NodeInfo Message | ||
319 | |||
320 | |||
321 | wrapAsymm :: TransactionId -> NodeInfo -> NodeInfo -> (Nonce8 -> dta) -> Asymm dta | ||
322 | wrapAsymm (TransactionId n8 n24) src dst dta = Asymm | ||
323 | { senderKey = id2key $ nodeId src | ||
324 | , asymmNonce = n24 | ||
325 | , asymmData = dta n8 | ||
326 | } | ||
327 | |||
328 | serializer :: PacketKind | ||
329 | -> (Asymm (Nonce8,ping) -> Message) | ||
330 | -> (Message -> Maybe (Asymm (Nonce8,pong))) | ||
331 | -> MethodSerializer TransactionId NodeInfo Message PacketKind ping (Maybe pong) | ||
332 | serializer pktkind mkping mkpong = MethodSerializer | ||
333 | { methodTimeout = \tid addr -> return (addr, 5000000) | ||
334 | , method = pktkind | ||
335 | -- wrapQuery :: tid -> addr -> addr -> qry -> x | ||
336 | , wrapQuery = \tid src dst ping -> mkping $ wrapAsymm tid src dst (, ping) | ||
337 | -- unwrapResponse :: x -> b | ||
338 | , unwrapResponse = fmap (snd . asymmData) . mkpong | ||
339 | } | ||
340 | |||
341 | |||
342 | unpong :: Message -> Maybe (Asymm (Nonce8,Pong)) | ||
343 | unpong (DHTPong asymm) = Just asymm | ||
344 | unpong _ = Nothing | ||
345 | |||
346 | ping :: Client -> NodeInfo -> IO Bool | ||
347 | ping client addr = do | ||
348 | dput XPing $ show addr ++ " <-- ping" | ||
349 | reply <- QR.sendQuery client (serializer PingType DHTPing unpong) Ping addr | ||
350 | dput XPing $ show addr ++ " -pong-> " ++ show reply | ||
351 | maybe (return False) (\Pong -> return True) $ join reply | ||
352 | |||
353 | |||
354 | saveCookieKey :: TVar [(SockAddr, (Int, PublicKey))] -> SockAddr -> PublicKey -> STM () | ||
355 | saveCookieKey var saddr pk = do | ||
356 | cookiekeys <- readTVar var | ||
357 | case break (\(stored,_) -> stored == saddr) cookiekeys of | ||
358 | (xs,[]) -> writeTVar var $ (saddr, (1 ,pk)) : xs | ||
359 | (xs,(_,(c,stored)):ys) | stored == pk -> writeTVar var $ (saddr, (c+1,pk)) : xs ++ ys | ||
360 | _ -> retry -- Wait for requests to this address | ||
361 | -- under a different key to time out | ||
362 | -- before we try this key. | ||
363 | |||
364 | loseCookieKey :: TVar [(SockAddr, (Int, PublicKey))] -> SockAddr -> PublicKey -> STM () | ||
365 | loseCookieKey var saddr pk = do | ||
366 | cookiekeys <- readTVar var | ||
367 | case break (\(stored,_) -> stored == saddr) cookiekeys of | ||
368 | (xs,(_,(1,stored)):ys) | stored == pk -> writeTVar var $ xs ++ ys | ||
369 | (xs,(_,(c,stored)):ys) | stored == pk -> writeTVar var $ (saddr, (c-1,pk)) : xs ++ ys | ||
370 | _ -> return () -- unreachable? | ||
371 | |||
372 | |||
373 | cookieRequest :: TransportCrypto -> Client -> PublicKey -> NodeInfo -> IO (Maybe (Cookie Encrypted)) | ||
374 | cookieRequest crypto client localUserKey addr = do | ||
375 | let sockAddr = nodeAddr addr | ||
376 | nid = id2key $ nodeId addr | ||
377 | cookieSerializer | ||
378 | = MethodSerializer | ||
379 | { methodTimeout = \tid addr -> return (addr, 5000000) | ||
380 | , method = CookieRequestType | ||
381 | , wrapQuery = \tid src dst cr -> DHTCookieRequest $ wrapAsymm tid src dst (, cr) | ||
382 | , unwrapResponse = fmap snd . unCookie | ||
383 | } | ||
384 | cookieRequest = CookieRequest localUserKey | ||
385 | atomically $ saveCookieKey (pendingCookies crypto) sockAddr nid | ||
386 | dput XNetCrypto $ show addr ++ " <-- cookieRequest" | ||
387 | reply <- QR.sendQuery client cookieSerializer cookieRequest addr | ||
388 | atomically $ loseCookieKey (pendingCookies crypto) sockAddr nid | ||
389 | dput XNetCrypto $ show addr ++ " -cookieResponse-> " ++ show reply | ||
390 | return $ join reply | ||
391 | |||
392 | unCookie :: DHTMessage t -> Maybe (t (Cookie Encrypted)) | ||
393 | unCookie (DHTCookie n24 fcookie) = Just fcookie | ||
394 | unCookie _ = Nothing | ||
395 | |||
396 | unsendNodes :: Message -> Maybe (Asymm (Nonce8,SendNodes)) | ||
397 | unsendNodes (DHTSendNodes asymm) = Just asymm | ||
398 | unsendNodes _ = Nothing | ||
399 | |||
400 | unwrapNodes :: SendNodes -> ( [NodeInfo], [NodeInfo], Maybe () ) | ||
401 | unwrapNodes (SendNodes ns) = (ns,ns,Just ()) | ||
402 | |||
403 | data SendableQuery x a b = SendableQuery | ||
404 | { sendableSerializer :: MethodSerializer TransactionId NodeInfo Message PacketKind a (Maybe x) | ||
405 | , sendableQuery :: NodeId -> a | ||
406 | , sendableResult :: Maybe (Maybe x) -> IO b | ||
407 | } | ||
408 | |||
409 | sendQ :: SendableQuery x a b | ||
410 | -> QR.Client err PacketKind TransactionId NodeInfo Message | ||
411 | -> NodeId | ||
412 | -> NodeInfo | ||
413 | -> IO b | ||
414 | sendQ s client nid addr = do | ||
415 | reply <- QR.sendQuery client (sendableSerializer s) (sendableQuery s nid) addr | ||
416 | sendableResult s reply | ||
417 | |||
418 | asyncQ :: SendableQuery x a b | ||
419 | -> QR.Client err PacketKind TransactionId NodeInfo Message | ||
420 | -> NodeId | ||
421 | -> NodeInfo | ||
422 | -> (b -> IO ()) | ||
423 | -> IO () | ||
424 | asyncQ s client nid addr go = do | ||
425 | QR.asyncQuery client (sendableSerializer s) (sendableQuery s nid) addr | ||
426 | $ sendableResult s >=> go | ||
427 | |||
428 | getNodesSendable :: TVar (HashMap NodeId [NodeInfoCallback]) | ||
429 | -> NodeInfo | ||
430 | -> SendableQuery SendNodes GetNodes (Maybe ([NodeInfo], [NodeInfo], Maybe ())) | ||
431 | getNodesSendable cbvar addr = SendableQuery (serializer GetNodesType DHTGetNodes unsendNodes) | ||
432 | GetNodes | ||
433 | go | ||
434 | where | ||
435 | go reply = do | ||
436 | forM_ (join reply) $ \(SendNodes ns) -> | ||
437 | forM_ ns $ \n -> do | ||
438 | now <- getPOSIXTime | ||
439 | atomically $ do | ||
440 | mcbs <- HashMap.lookup (nodeId n) <$> readTVar cbvar | ||
441 | forM_ mcbs $ \cbs -> do | ||
442 | forM_ cbs $ \cb -> do | ||
443 | rumoredAddress cb now (nodeAddr addr) n | ||
444 | return $ fmap unwrapNodes $ join reply | ||
445 | |||
446 | getNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) | ||
447 | getNodes client cbvar nid addr = | ||
448 | sendQ (getNodesSendable cbvar addr) client nid addr | ||
449 | |||
450 | asyncGetNodes :: QR.Client err PacketKind TransactionId NodeInfo Message | ||
451 | -> TVar (HashMap NodeId [NodeInfoCallback]) | ||
452 | -> NodeId | ||
453 | -> NodeInfo | ||
454 | -> (Maybe ([NodeInfo], [NodeInfo], Maybe ()) -> IO ()) | ||
455 | -> IO () | ||
456 | asyncGetNodes client cbvar nid addr go = | ||
457 | asyncQ (getNodesSendable cbvar addr) client nid addr go | ||
458 | |||
459 | updateRouting :: Client -> Routing | ||
460 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) | ||
461 | -> NodeInfo | ||
462 | -> Message | ||
463 | -> IO () | ||
464 | updateRouting client routing orouter naddr msg | ||
465 | | PacketKind 0x21 <- msgType msg = -- dput XLan "(tox)updateRouting: ignoring lan discovery" -- ignore lan discovery | ||
466 | -- Ignore lan announcements until they reply to our ping. | ||
467 | -- We do this because the lan announce is not authenticated. | ||
468 | return () | ||
469 | | otherwise = do | ||
470 | now <- getPOSIXTime | ||
471 | atomically $ do | ||
472 | m <- HashMap.lookup (nodeId naddr) <$> readTVar (nodesOfInterest routing) | ||
473 | forM_ m $ mapM_ $ \NodeInfoCallback{interestingNodeId,observedAddress} -> do | ||
474 | when (interestingNodeId == nodeId naddr) | ||
475 | $ observedAddress now naddr | ||
476 | case prefer4or6 naddr Nothing of | ||
477 | Want_IP4 -> updateTable client naddr orouter (committee4 routing) (refresher4 routing) | ||
478 | Want_IP6 -> updateTable client naddr orouter (committee6 routing) (refresher6 routing) | ||
479 | Want_Both -> do dput XMisc "BUG:unreachable" | ||
480 | error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ | ||
481 | |||
482 | updateTable :: Client -> NodeInfo | ||
483 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) | ||
484 | -> TriadCommittee NodeId SockAddr | ||
485 | -> BucketRefresher NodeId NodeInfo | ||
486 | -> IO () | ||
487 | updateTable client naddr orouter committee refresher = do | ||
488 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) | ||
489 | -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) | ||
490 | when (self /= naddr) $ do | ||
491 | -- TODO: IP address vote? | ||
492 | insertNode (toxKademlia client committee orouter refresher) naddr | ||
493 | |||
494 | toxKademlia :: Client | ||
495 | -> TriadCommittee NodeId SockAddr | ||
496 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) | ||
497 | -> BucketRefresher NodeId NodeInfo | ||
498 | -> Kademlia NodeId NodeInfo | ||
499 | toxKademlia client committee orouter refresher | ||
500 | = Kademlia quietInsertions | ||
501 | toxSpace | ||
502 | (vanillaIO (refreshBuckets refresher) $ ping client) | ||
503 | { tblTransition = \tr -> do | ||
504 | io1 <- transitionCommittee committee tr | ||
505 | io2 <- touchBucket refresher tr -- toxSpace (15*60) var sched tr | ||
506 | -- hookBucketList toxSpace (refreshBuckets refresher) orouter tr | ||
507 | orouter (refreshBuckets refresher) tr | ||
508 | return $ do | ||
509 | io1 >> io2 | ||
510 | {- | ||
511 | dput XMisc $ unwords | ||
512 | [ show (transitionedTo tr) | ||
513 | , show (transitioningNode tr) | ||
514 | ] | ||
515 | -} | ||
516 | return () | ||
517 | } | ||
518 | |||
519 | transitionCommittee :: TriadCommittee NodeId SockAddr -> RoutingTransition NodeInfo -> STM (IO ()) | ||
520 | transitionCommittee committee (RoutingTransition ni Stranger) = do | ||
521 | delVote committee (nodeId ni) | ||
522 | return $ do | ||
523 | -- dput XMisc $ "delVote "++show (nodeId ni) | ||
524 | return () | ||
525 | transitionCommittee committee _ = return $ return () | ||
526 | |||
527 | type Handler = MethodHandler String TransactionId NodeInfo Message | ||
528 | |||
529 | isPing :: (f Ping -> Ping) -> DHTMessage f -> Either String Ping | ||
530 | isPing unpack (DHTPing a) = Right $ unpack $ asymmData a | ||
531 | isPing _ _ = Left "Bad ping" | ||
532 | |||
533 | mkPong :: TransactionId -> NodeInfo -> NodeInfo -> Pong -> DHTMessage ((,) Nonce8) | ||
534 | mkPong tid src dst pong = DHTPong $ wrapAsymm tid src dst (, pong) | ||
535 | |||
536 | isGetNodes :: (f GetNodes -> GetNodes) -> DHTMessage f -> Either String GetNodes | ||
537 | isGetNodes unpack (DHTGetNodes a) = Right $ unpack $ asymmData a | ||
538 | isGetNodes _ _ = Left "Bad GetNodes" | ||
539 | |||
540 | mkSendNodes :: TransactionId -> NodeInfo -> NodeInfo -> SendNodes -> DHTMessage ((,) Nonce8) | ||
541 | mkSendNodes tid src dst sendnodes = DHTSendNodes $ wrapAsymm tid src dst (, sendnodes) | ||
542 | |||
543 | isCookieRequest :: (f CookieRequest -> CookieRequest) -> DHTMessage f -> Either String CookieRequest | ||
544 | isCookieRequest unpack (DHTCookieRequest a) = Right $ unpack $ asymmData a | ||
545 | isCookieRequest _ _ = Left "Bad cookie request" | ||
546 | |||
547 | mkCookie :: TransactionId -> NodeInfo -> NodeInfo -> Cookie Encrypted -> DHTMessage ((,) Nonce8) | ||
548 | mkCookie (TransactionId n8 n24) src dst cookie = DHTCookie n24 (n8,cookie) | ||
549 | |||
550 | isDHTRequest :: (f DHTRequest -> DHTRequest) -> DHTMessage f -> Either String DHTRequest | ||
551 | isDHTRequest unpack (DHTDHTRequest pubkey a) = Right $ unpack $ asymmData a | ||
552 | isDHTRequest _ _ = Left "Bad dht relay request" | ||
553 | |||
554 | dhtRequestH :: NodeInfo -> DHTRequest -> IO () | ||
555 | dhtRequestH ni req = do | ||
556 | dput XMisc $ "Unhandled DHT Request: " ++ show req | ||
557 | |||
558 | handlers :: TransportCrypto -> Routing -> PacketKind -> Maybe Handler | ||
559 | handlers _ routing PingType = Just $ MethodHandler (isPing snd) mkPong $ pingH | ||
560 | handlers _ routing GetNodesType = Just $ MethodHandler (isGetNodes snd) mkSendNodes $ getNodesH routing | ||
561 | handlers crypto _ CookieRequestType = Just $ MethodHandler (isCookieRequest snd) mkCookie $ cookieRequestH crypto | ||
562 | handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH | ||
563 | handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ | ||
564 | |||
565 | nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo | ||
566 | nodeSearch client cbvar = Search | ||
567 | { searchSpace = toxSpace | ||
568 | , searchNodeAddress = nodeIP &&& nodePort | ||
569 | , searchQuery = Right $ asyncGetNodes client cbvar | ||
570 | , searchAlpha = 8 | ||
571 | , searchK = 16 | ||
572 | |||
573 | } | ||