diff options
Diffstat (limited to 'dht/src/Network')
-rw-r--r-- | dht/src/Network/BitTorrent/MainlineDHT.hs | 10 | ||||
-rw-r--r-- | dht/src/Network/Lossless.hs | 13 | ||||
-rw-r--r-- | dht/src/Network/SessionTransports.hs | 12 | ||||
-rw-r--r-- | dht/src/Network/Tox.hs | 9 | ||||
-rw-r--r-- | dht/src/Network/Tox/AggregateSession.hs | 5 | ||||
-rw-r--r-- | dht/src/Network/Tox/DHT/Transport.hs | 31 | ||||
-rw-r--r-- | dht/src/Network/Tox/Onion/Routes.hs | 8 | ||||
-rw-r--r-- | dht/src/Network/Tox/Session.hs | 9 | ||||
-rw-r--r-- | dht/src/Network/Tox/TCP.hs | 34 |
9 files changed, 72 insertions, 59 deletions
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index 705d7291..e0715d4a 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -431,11 +431,11 @@ showPacket f addr flow bs = L8.unpack $ L8.unlines es | |||
431 | -- Add detailed printouts for every packet. | 431 | -- Add detailed printouts for every packet. |
432 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString | 432 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString |
433 | addVerbosity tr = | 433 | addVerbosity tr = |
434 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do | 434 | tr { awaitMessage = do |
435 | (m,io) <- awaitMessage tr | ||
435 | case m of | 436 | case m of |
436 | Arrival addr msg -> dput XBitTorrent (showPacket id addr " --> " msg) | 437 | Arrival addr msg -> return (m, io >> dput XBitTorrent (showPacket id addr " --> " msg)) |
437 | _ -> return () | 438 | _ -> return (m, io) |
438 | kont m | ||
439 | , sendMessage = \addr msg -> do | 439 | , sendMessage = \addr msg -> do |
440 | dput XBitTorrent (showPacket id addr " <-- " msg) | 440 | dput XBitTorrent (showPacket id addr " <-- " msg) |
441 | sendMessage tr addr msg | 441 | sendMessage tr addr msg |
@@ -603,7 +603,7 @@ newClient swarms addr udp = do | |||
603 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which | 603 | -- recursive since 'updateRouting' does not invoke 'awaitMessage' which |
604 | -- which was modified by 'onInbound'. However, I'm going to avoid the | 604 | -- which was modified by 'onInbound'. However, I'm going to avoid the |
605 | -- mutual reference just to be safe. | 605 | -- mutual reference just to be safe. |
606 | outgoingClient = client { clientNet = net { awaitMessage = pure . ($ Terminated) } } | 606 | outgoingClient = client { clientNet = net { awaitMessage = pure (Terminated, return ()) } } |
607 | 607 | ||
608 | dispatch = DispatchMethods | 608 | dispatch = DispatchMethods |
609 | { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x | 609 | { classifyInbound = classify -- :: x -> MessageClass err meth tid addr x |
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs index 41203ca5..7ccceec1 100644 --- a/dht/src/Network/Lossless.hs +++ b/dht/src/Network/Lossless.hs | |||
@@ -61,7 +61,9 @@ lossless lbl isLossless encode saddr udp = do | |||
61 | rloop <- forkIO $ do | 61 | rloop <- forkIO $ do |
62 | -- This thread enqueues inbound packets or writes them to the oob | 62 | -- This thread enqueues inbound packets or writes them to the oob |
63 | -- channel. | 63 | -- channel. |
64 | fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do | 64 | fix $ \loop -> do |
65 | (m,io) <- atomically $ awaitMessage udp | ||
66 | io | ||
65 | m' <- case m of Terminated -> return Nothing | 67 | m' <- case m of Terminated -> return Nothing |
66 | ParseError e -> return $ Just (Left e) | 68 | ParseError e -> return $ Just (Left e) |
67 | Arrival a x -> Just . Right <$> isLossless x a | 69 | Arrival a x -> Just . Right <$> isLossless x a |
@@ -87,15 +89,14 @@ lossless lbl isLossless encode saddr udp = do | |||
87 | -- we will use this STM action stop it from waiting on the oob TChan. | 89 | -- we will use this STM action stop it from waiting on the oob TChan. |
88 | -- XXX: This shouldn't be neccessary and might be costly. | 90 | -- XXX: This shouldn't be neccessary and might be costly. |
89 | let tr = Transport | 91 | let tr = Transport |
90 | { awaitMessage = \kont -> | 92 | { awaitMessage = |
91 | orElse | 93 | orElse |
92 | (do x <- readTChan oob `orElse` join (readTVar term) | 94 | (do x <- readTChan oob `orElse` join (readTVar term) |
93 | return $ kont $! x) | 95 | return (x, return ())) |
94 | (do x <- PB.awaitReadyPacket pb | 96 | (do x <- PB.awaitReadyPacket pb |
95 | report <- pbReport "dequeued" pb | 97 | report <- pbReport "dequeued" pb |
96 | return $ do | 98 | return $ (,) (uncurry (flip Arrival) x) $ do |
97 | atomically $ writeTChan oob (ParseError report) | 99 | atomically $ writeTChan oob (ParseError report)) |
98 | kont $! uncurry (flip Arrival) x) | ||
99 | , sendMessage = \a' x' -> do | 100 | , sendMessage = \a' x' -> do |
100 | seqno <- atomically $ do | 101 | seqno <- atomically $ do |
101 | seqno <- PB.nextToSendSequenceNumber pb | 102 | seqno <- PB.nextToSendSequenceNumber pb |
diff --git a/dht/src/Network/SessionTransports.hs b/dht/src/Network/SessionTransports.hs index b6d02f36..68233cd4 100644 --- a/dht/src/Network/SessionTransports.hs +++ b/dht/src/Network/SessionTransports.hs | |||
@@ -1,5 +1,6 @@ | |||
1 | {-# LANGUAGE LambdaCase #-} | 1 | {-# LANGUAGE LambdaCase #-} |
2 | {-# LANGUAGE NamedFieldPuns #-} | 2 | {-# LANGUAGE NamedFieldPuns #-} |
3 | {-# LANGUAGE TupleSections #-} | ||
3 | module Network.SessionTransports | 4 | module Network.SessionTransports |
4 | ( Sessions | 5 | ( Sessions |
5 | , initSessions | 6 | , initSessions |
@@ -73,9 +74,9 @@ newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwr | |||
73 | return sid | 74 | return sid |
74 | forM msid $ \sid -> do | 75 | forM msid $ \sid -> do |
75 | let tr = Transport | 76 | let tr = Transport |
76 | { awaitMessage = \kont -> do | 77 | { awaitMessage = do |
77 | x <- takeTMVar mvar | 78 | x <- takeTMVar mvar |
78 | return $ kont $! maybe Terminated (uncurry $ flip Arrival) x | 79 | return $ (, return ()) $ maybe Terminated (uncurry $ flip Arrival) x |
79 | , sendMessage = \addr x -> do | 80 | , sendMessage = \addr x -> do |
80 | x' <- unwrap addr x | 81 | x' <- unwrap addr x |
81 | sessionsSendRaw saddr x' | 82 | sessionsSendRaw saddr x' |
@@ -92,8 +93,9 @@ newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwr | |||
92 | } | 93 | } |
93 | return (sid,tr) | 94 | return (sid,tr) |
94 | 95 | ||
95 | sessionHandler :: Sessions x -> (Multi.SessionAddress -> x -> IO (Maybe (x -> x))) | 96 | sessionHandler :: Sessions x -> Arrival err Multi.SessionAddress x |
96 | sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do | 97 | -> STM (Arrival err Multi.SessionAddress x, IO ()) |
98 | sessionHandler Sessions{sessionsByAddr} (Arrival addr0 x) = return $ (,) Discarded $ do | ||
97 | let addr = -- Canonical in case of 6-mapped-4 addresses. | 99 | let addr = -- Canonical in case of 6-mapped-4 addresses. |
98 | Multi.canonize addr0 | 100 | Multi.canonize addr0 |
99 | dispatch [] = return () | 101 | dispatch [] = return () |
@@ -101,4 +103,4 @@ sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do | |||
101 | when (not b) $ dispatch fs | 103 | when (not b) $ dispatch fs |
102 | fs <- atomically $ Map.lookup addr <$> readTVar sessionsByAddr | 104 | fs <- atomically $ Map.lookup addr <$> readTVar sessionsByAddr |
103 | mapM_ (dispatch . IntMap.elems) fs | 105 | mapM_ (dispatch . IntMap.elems) fs |
104 | return Nothing -- consume all packets. | 106 | sessionHandler _ m = return (m, return ()) |
diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs index 6b39d57a..3dd1d48e 100644 --- a/dht/src/Network/Tox.hs +++ b/dht/src/Network/Tox.hs | |||
@@ -245,14 +245,15 @@ isLocalHost _ = False | |||
245 | 245 | ||
246 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString | 246 | addVerbosity :: Transport err SockAddr ByteString -> Transport err SockAddr ByteString |
247 | addVerbosity tr = | 247 | addVerbosity tr = |
248 | tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do | 248 | tr { awaitMessage = do |
249 | (m,io) <- awaitMessage tr | ||
249 | case m of | 250 | case m of |
250 | Arrival addr msg -> do | 251 | Arrival addr msg -> return $ (,) m $ do |
252 | io | ||
251 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x82,0x8c,0x8d])) $ do | 253 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x82,0x8c,0x8d])) $ do |
252 | mapM_ (\x -> dput XMisc ( (show addr) ++ " --> " ++ x)) | 254 | mapM_ (\x -> dput XMisc ( (show addr) ++ " --> " ++ x)) |
253 | $ xxd 0 msg | 255 | $ xxd 0 msg |
254 | _ -> return () | 256 | _ -> return (m,io) |
255 | kont m | ||
256 | , sendMessage = \addr msg -> do | 257 | , sendMessage = \addr msg -> do |
257 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x8c,0x8d])) $ do | 258 | when (not (B.null msg || elem (B.head msg) [0,1,2,4,0x81,0x8c,0x8d])) $ do |
258 | mapM_ (\x -> dput XMisc ( (show addr) ++ " <-- " ++ x)) | 259 | mapM_ (\x -> dput XMisc ( (show addr) ++ " <-- " ++ x)) |
diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs index feb634f0..33b1fafb 100644 --- a/dht/src/Network/Tox/AggregateSession.hs +++ b/dht/src/Network/Tox/AggregateSession.hs | |||
@@ -196,7 +196,10 @@ forkSession c s setStatus = forkIO $ do | |||
196 | onPacket body loop (ParseError e) = inPrint e >> loop | 196 | onPacket body loop (ParseError e) = inPrint e >> loop |
197 | onPacket body loop (Arrival _ x) = body loop x | 197 | onPacket body loop (Arrival _ x) = body loop x |
198 | 198 | ||
199 | awaitPacket body = fix $ join . atomically . awaitMessage (sTransport s) . onPacket body | 199 | awaitPacket body = fix $ \loop -> do |
200 | (m,io) <- atomically $ awaitMessage (sTransport s) | ||
201 | io | ||
202 | onPacket body loop m | ||
200 | 203 | ||
201 | atomically $ setStatus $ InProgress AwaitingSessionPacket | 204 | atomically $ setStatus $ InProgress AwaitingSessionPacket |
202 | awaitPacket $ \_ online -> do | 205 | awaitPacket $ \_ online -> do |
diff --git a/dht/src/Network/Tox/DHT/Transport.hs b/dht/src/Network/Tox/DHT/Transport.hs index 5de92916..5f0deea8 100644 --- a/dht/src/Network/Tox/DHT/Transport.hs +++ b/dht/src/Network/Tox/DHT/Transport.hs | |||
@@ -103,7 +103,7 @@ parseDHTAddr :: (Eq saddr, Show ni) => | |||
103 | (saddr -> STM (Maybe ni)) | 103 | (saddr -> STM (Maybe ni)) |
104 | -> (NodeId -> saddr -> Either String ni) | 104 | -> (NodeId -> saddr -> Either String ni) |
105 | -> (ByteString, saddr) | 105 | -> (ByteString, saddr) |
106 | -> IO (Either (DHTMessage Encrypted8,ni) (ByteString,saddr)) | 106 | -> STM (Either (DHTMessage Encrypted8,ni) (ByteString,saddr)) |
107 | parseDHTAddr pendingCookies nodeInfo (msg,saddr) | 107 | parseDHTAddr pendingCookies nodeInfo (msg,saddr) |
108 | | Just (typ,bs) <- B.uncons msg | 108 | | Just (typ,bs) <- B.uncons msg |
109 | , let right = return $ Right (msg,saddr) | 109 | , let right = return $ Right (msg,saddr) |
@@ -115,9 +115,11 @@ parseDHTAddr pendingCookies nodeInfo (msg,saddr) | |||
115 | 0x04 -> left $ direct nodeInfo bs saddr DHTSendNodes | 115 | 0x04 -> left $ direct nodeInfo bs saddr DHTSendNodes |
116 | 0x18 -> left $ direct nodeInfo bs saddr DHTCookieRequest | 116 | 0x18 -> left $ direct nodeInfo bs saddr DHTCookieRequest |
117 | 0x19 -> do | 117 | 0x19 -> do |
118 | mni <- atomically $ pendingCookies saddr | 118 | mni <- pendingCookies saddr |
119 | let ni = fromMaybe (noReplyAddr nodeInfo saddr) mni | 119 | let ni = fromMaybe (noReplyAddr nodeInfo saddr) mni |
120 | dput XMan $ "Got encrypted cookie! mni="++show mni | 120 | runio :: IO () -> STM () |
121 | runio _ = return () -- TODO: run IO action | ||
122 | runio $ dput XMan $ "Got encrypted cookie! mni="++show mni | ||
121 | left $ fanGet bs getCookie (uncurry DHTCookie) (const $ ni) | 123 | left $ fanGet bs getCookie (uncurry DHTCookie) (const $ ni) |
122 | 0x20 -> left $ fanGet bs getDHTReqest (uncurry DHTDHTRequest) (asymNodeInfo nodeInfo saddr . snd) | 124 | 0x20 -> left $ fanGet bs getDHTReqest (uncurry DHTDHTRequest) (asymNodeInfo nodeInfo saddr . snd) |
123 | 0x21 -> left $ do | 125 | 0x21 -> left $ do |
@@ -409,13 +411,16 @@ forwardDHTRequests :: TransportCrypto -> (PublicKey -> IO (Maybe ni)) -> DHTTran | |||
409 | forwardDHTRequests crypto closeLookup dht = dht { awaitMessage = await' } | 411 | forwardDHTRequests crypto closeLookup dht = dht { awaitMessage = await' } |
410 | where | 412 | where |
411 | -- await' :: HandleHi ni a -> STM (IO a) | 413 | -- await' :: HandleHi ni a -> STM (IO a) |
412 | await' pass = awaitMessage dht $ \case | 414 | await' = do |
413 | Arrival src m@(DHTDHTRequest target payload) | target /= transportPublic crypto | 415 | (m, io) <- awaitMessage dht |
414 | -> do mni <- closeLookup target | 416 | return $ case m of |
415 | -- Forward the message if the target is in our close list. | 417 | Arrival src m@(DHTDHTRequest target payload) | target /= transportPublic crypto |
416 | forM_ mni $ \ni -> sendMessage dht ni m | 418 | -> (,) Discarded $ do |
417 | join $ atomically (await' pass) | 419 | io |
418 | m -> pass m | 420 | mni <- closeLookup target |
421 | -- Forward the message if the target is in our close list. | ||
422 | forM_ mni $ \ni -> sendMessage dht ni m | ||
423 | _ -> (m,io) | ||
419 | 424 | ||
420 | encrypt :: TransportCrypto -> (ni -> NodeId) -> DHTMessage ((,) Nonce8) -> ni -> IO (DHTMessage Encrypted8, ni) | 425 | encrypt :: TransportCrypto -> (ni -> NodeId) -> DHTMessage ((,) Nonce8) -> ni -> IO (DHTMessage Encrypted8, ni) |
421 | encrypt crypto nodeId msg ni = do | 426 | encrypt crypto nodeId msg ni = do |
@@ -432,7 +437,7 @@ encryptMessage crypto destKey n arg = do | |||
432 | secret <- lookupSharedSecret crypto (transportSecret crypto) destKey n | 437 | secret <- lookupSharedSecret crypto (transportSecret crypto) destKey n |
433 | return $ E8 $ ToxCrypto.encrypt secret plain | 438 | return $ E8 $ ToxCrypto.encrypt secret plain |
434 | 439 | ||
435 | decrypt :: TransportCrypto -> (ni -> NodeId) -> DHTMessage Encrypted8 -> ni -> IO (Either String (DHTMessage ((,) Nonce8), ni)) | 440 | decrypt :: TransportCrypto -> (ni -> NodeId) -> DHTMessage Encrypted8 -> ni -> STM (Either String (DHTMessage ((,) Nonce8), ni)) |
436 | decrypt crypto nodeId msg ni = do | 441 | decrypt crypto nodeId msg ni = do |
437 | let decipher n c = Composed $ decryptMessage crypto n . left ((,) $ id2key $ nodeId ni) $ c | 442 | let decipher n c = Composed $ decryptMessage crypto n . left ((,) $ id2key $ nodeId ni) $ c |
438 | msg' <- sequenceMessage $ transcode decipher msg | 443 | msg' <- sequenceMessage $ transcode decipher msg |
@@ -442,11 +447,11 @@ decryptMessage :: Serialize x => | |||
442 | TransportCrypto | 447 | TransportCrypto |
443 | -> Nonce24 | 448 | -> Nonce24 |
444 | -> Either (PublicKey, Encrypted8 x) (Asymm (Encrypted8 x)) | 449 | -> Either (PublicKey, Encrypted8 x) (Asymm (Encrypted8 x)) |
445 | -> IO ((Either String ∘ ((,) Nonce8)) x) | 450 | -> STM ((Either String ∘ ((,) Nonce8)) x) |
446 | decryptMessage crypto n arg = do | 451 | decryptMessage crypto n arg = do |
447 | let (remotekey,E8 e) = either id (senderKey &&& asymmData) arg | 452 | let (remotekey,E8 e) = either id (senderKey &&& asymmData) arg |
448 | plain8 = Composed . fmap swap . (>>= decodePlain) | 453 | plain8 = Composed . fmap swap . (>>= decodePlain) |
449 | secret <- lookupSharedSecret crypto (transportSecret crypto) remotekey n | 454 | secret <- lookupSharedSecretSTM crypto (transportSecret crypto) remotekey n |
450 | return $ plain8 $ ToxCrypto.decrypt secret e | 455 | return $ plain8 $ ToxCrypto.decrypt secret e |
451 | 456 | ||
452 | sequenceMessage :: Applicative m => DHTMessage (m ∘ f) -> m (DHTMessage f) | 457 | sequenceMessage :: Applicative m => DHTMessage (m ∘ f) -> m (DHTMessage f) |
diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs index 46ded48d..93e9bfcd 100644 --- a/dht/src/Network/Tox/Onion/Routes.hs +++ b/dht/src/Network/Tox/Onion/Routes.hs | |||
@@ -539,16 +539,16 @@ handleEvent getnodes or e@(BuildRoute (RouteId rid)) = do | |||
539 | Nothing -> routeLogger or $ "ONION Failed RouteId " ++ show rid | 539 | Nothing -> routeLogger or $ "ONION Failed RouteId " ++ show rid |
540 | 540 | ||
541 | 541 | ||
542 | lookupSender :: OnionRouter -> SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId)) | 542 | lookupSender :: OnionRouter -> SockAddr -> Nonce8 -> STM (Maybe (OnionDestination RouteId)) |
543 | lookupSender or = lookupSender' (pendingQueries or) (routeLog or) | 543 | lookupSender or saddr n8 = lookupSender' (pendingQueries or) (routeLog or) saddr n8 |
544 | 544 | ||
545 | lookupSender' :: TVar (Word64Map PendingQuery) | 545 | lookupSender' :: TVar (Word64Map PendingQuery) |
546 | -> TChan String | 546 | -> TChan String |
547 | -> SockAddr | 547 | -> SockAddr |
548 | -> Nonce8 | 548 | -> Nonce8 |
549 | -> IO (Maybe (OnionDestination RouteId)) | 549 | -> STM (Maybe (OnionDestination RouteId)) |
550 | lookupSender' pending log saddr (Nonce8 w8) = do | 550 | lookupSender' pending log saddr (Nonce8 w8) = do |
551 | result <- atomically $ do | 551 | result <- do |
552 | ks <- readTVar pending | 552 | ks <- readTVar pending |
553 | let r = W64.lookup w8 ks | 553 | let r = W64.lookup w8 ks |
554 | writeTChan log $ "ONION lookupSender " ++ unwords [show w8, "->", show r] | 554 | writeTChan log $ "ONION lookupSender " ++ unwords [show w8, "->", show r] |
diff --git a/dht/src/Network/Tox/Session.hs b/dht/src/Network/Tox/Session.hs index d34dfc7a..53d63287 100644 --- a/dht/src/Network/Tox/Session.hs +++ b/dht/src/Network/Tox/Session.hs | |||
@@ -106,14 +106,13 @@ sClose s = do | |||
106 | -- negotiated. It always returns Nothing which makes it convenient to use with | 106 | -- negotiated. It always returns Nothing which makes it convenient to use with |
107 | -- 'Network.QueryResponse.addHandler'. | 107 | -- 'Network.QueryResponse.addHandler'. |
108 | handshakeH :: SessionParams | 108 | handshakeH :: SessionParams |
109 | -> Multi.SessionAddress | 109 | -> Arrival err Multi.SessionAddress (Handshake Encrypted) |
110 | -> Handshake Encrypted | 110 | -> STM (Arrival err Multi.SessionAddress (Handshake Encrypted), IO ()) |
111 | -> IO (Maybe a) | 111 | handshakeH sp (Arrival saddr handshake) = return $ (,) Discarded $ do |
112 | handshakeH sp saddr handshake = do | ||
113 | decryptHandshake (spCrypto sp) handshake | 112 | decryptHandshake (spCrypto sp) handshake |
114 | >>= either (\err -> return ()) | 113 | >>= either (\err -> return ()) |
115 | (uncurry $ plainHandshakeH sp saddr) | 114 | (uncurry $ plainHandshakeH sp saddr) |
116 | return Nothing | 115 | handshakeH _ m = return (m, return ()) |
117 | 116 | ||
118 | 117 | ||
119 | plainHandshakeH :: SessionParams | 118 | plainHandshakeH :: SessionParams |
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs index 1da302b6..626d4714 100644 --- a/dht/src/Network/Tox/TCP.hs +++ b/dht/src/Network/Tox/TCP.hs | |||
@@ -160,22 +160,22 @@ tcpStream crypto mkst = StreamHandshake | |||
160 | , streamAddr = nodeAddr | 160 | , streamAddr = nodeAddr |
161 | } | 161 | } |
162 | 162 | ||
163 | newtype SessionData = SessionData (MVar (IntMap.IntMap NodeId)) | 163 | newtype SessionData = SessionData (TMVar (IntMap.IntMap NodeId)) |
164 | 164 | ||
165 | newSessionData :: NodeInfo -> IO SessionData | 165 | newSessionData :: NodeInfo -> IO SessionData |
166 | newSessionData _ = SessionData <$> newMVar IntMap.empty | 166 | newSessionData _ = atomically $ SessionData <$> newTMVar IntMap.empty |
167 | 167 | ||
168 | getRelayedRemote :: SessionData -> ConId -> IO NodeId | 168 | getRelayedRemote :: SessionData -> ConId -> STM NodeId |
169 | getRelayedRemote (SessionData keymapVar) (ConId i) = do | 169 | getRelayedRemote (SessionData keymapVar) (ConId i) = do |
170 | keymap <- takeMVar keymapVar | 170 | keymap <- takeTMVar keymapVar |
171 | let k = fromMaybe UDP.zeroID $ IntMap.lookup (fromIntegral i) keymap | 171 | let k = fromMaybe UDP.zeroID $ IntMap.lookup (fromIntegral i) keymap |
172 | putMVar keymapVar keymap | 172 | putTMVar keymapVar keymap |
173 | return k | 173 | return k |
174 | 174 | ||
175 | setRelayedRemote :: SessionData -> ConId -> NodeId -> IO () | 175 | setRelayedRemote :: SessionData -> ConId -> NodeId -> STM () |
176 | setRelayedRemote (SessionData keymapVar) (ConId conid) nid = do | 176 | setRelayedRemote (SessionData keymapVar) (ConId conid) nid = do |
177 | keymap <- takeMVar keymapVar | 177 | keymap <- takeTMVar keymapVar |
178 | putMVar keymapVar $ IntMap.insert (fromIntegral conid) nid keymap | 178 | putTMVar keymapVar $ IntMap.insert (fromIntegral conid) nid keymap |
179 | 179 | ||
180 | toxTCP :: TransportCrypto -> IO ( TCPCache (SessionProtocol (SessionData,RelayPacket) RelayPacket) | 180 | toxTCP :: TransportCrypto -> IO ( TCPCache (SessionProtocol (SessionData,RelayPacket) RelayPacket) |
181 | , TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket) ) | 181 | , TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket) ) |
@@ -367,7 +367,7 @@ type RelayCache = TCPCache (SessionProtocol (SessionData,RelayPacket) RelayPacke | |||
367 | newClient :: TransportCrypto | 367 | newClient :: TransportCrypto |
368 | -> ((Nonce8 -> QR.Result (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query | 368 | -> ((Nonce8 -> QR.Result (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query |
369 | -> (a -> Nonce8 -> RelayPacket -> IO void) -- ^ load mvar for relay query | 369 | -> (a -> Nonce8 -> RelayPacket -> IO void) -- ^ load mvar for relay query |
370 | -> (SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId))) -- ^ lookup sender of onion query | 370 | -> (SockAddr -> Nonce8 -> STM (Maybe (OnionDestination RouteId))) -- ^ lookup sender of onion query |
371 | -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) -- ^ lookup OnionRoute by id | 371 | -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) -- ^ lookup OnionRoute by id |
372 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) | 372 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) |
373 | , RelayCache | 373 | , RelayCache |
@@ -375,8 +375,9 @@ newClient :: TransportCrypto | |||
375 | , Transport String (OnionDestination RouteId) (OnionMessage Encrypted) ) | 375 | , Transport String (OnionDestination RouteId) (OnionMessage Encrypted) ) |
376 | , RelayClient ) | 376 | , RelayClient ) |
377 | newClient crypto store load lookupSender getRoute = do | 377 | newClient crypto store load lookupSender getRoute = do |
378 | let runio io = return () -- TODO: run IO action | ||
378 | (tcpcache,net0) <- toxTCP crypto | 379 | (tcpcache,net0) <- toxTCP crypto |
379 | (relaynet,net1) <- partitionRelay net0 | 380 | (relaynet,net1) <- partitionRelay runio net0 |
380 | (onionnet,net2) <- partitionOnion crypto lookupSender getRoute net1 | 381 | (onionnet,net2) <- partitionOnion crypto lookupSender getRoute net1 |
381 | let net3 = {- XXX: Client type forces this pointless layering. -} | 382 | let net3 = {- XXX: Client type forces this pointless layering. -} |
382 | layerTransport ((Right .) . (,) . (,) False . snd) (,) net2 | 383 | layerTransport ((Right .) . (,) . (,) False . snd) (,) net2 |
@@ -428,12 +429,13 @@ showViaRelay (ViaRelay mcon nid tcp) = | |||
428 | "TCP:" ++ maybe "(oob)" (\(ConId con) -> "(" ++ show con ++ ")") mcon | 429 | "TCP:" ++ maybe "(oob)" (\(ConId con) -> "(" ++ show con ++ ")") mcon |
429 | ++ show nid ++ "@@" ++ show (nodeAddr tcp) | 430 | ++ show nid ++ "@@" ++ show (nodeAddr tcp) |
430 | 431 | ||
431 | partitionRelay :: TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket) | 432 | partitionRelay :: (IO () -> STM ()) |
433 | -> TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket) | ||
432 | -> IO ( Transport err ViaRelay ByteString | 434 | -> IO ( Transport err ViaRelay ByteString |
433 | , TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket)) | 435 | , TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket)) |
434 | partitionRelay tr = partitionTransportM parse encode tr | 436 | partitionRelay runio tr = partitionTransportM parse encode tr |
435 | where | 437 | where |
436 | parse :: ((SessionData,RelayPacket), NodeInfo) -> IO (Either (ByteString, ViaRelay) ((SessionData,RelayPacket),NodeInfo)) | 438 | parse :: ((SessionData,RelayPacket), NodeInfo) -> STM (Either (ByteString, ViaRelay) ((SessionData,RelayPacket),NodeInfo)) |
437 | parse ((st,RelayData bs conid), ni) = do | 439 | parse ((st,RelayData bs conid), ni) = do |
438 | nid <- getRelayedRemote st conid | 440 | nid <- getRelayedRemote st conid |
439 | return $ Left (bs, ViaRelay (Just conid) nid ni) | 441 | return $ Left (bs, ViaRelay (Just conid) nid ni) |
@@ -463,7 +465,7 @@ partitionRelay tr = partitionTransportM parse encode tr | |||
463 | 465 | ||
464 | 466 | ||
465 | partitionOnion :: TransportCrypto | 467 | partitionOnion :: TransportCrypto |
466 | -> (SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId))) | 468 | -> (SockAddr -> Nonce8 -> STM (Maybe (OnionDestination RouteId))) |
467 | -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) | 469 | -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) |
468 | -> TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket) | 470 | -> TransportA err NodeInfo (SessionData,RelayPacket) (Bool,RelayPacket) |
469 | -> IO ( Transport err (OnionDestination RouteId) (OnionMessage Encrypted) | 471 | -> IO ( Transport err (OnionDestination RouteId) (OnionMessage Encrypted) |
@@ -471,8 +473,8 @@ partitionOnion :: TransportCrypto | |||
471 | partitionOnion crypto lookupSender getRoute tr = partitionTransportM parse encode tr | 473 | partitionOnion crypto lookupSender getRoute tr = partitionTransportM parse encode tr |
472 | where | 474 | where |
473 | parse :: ((SessionData,RelayPacket), NodeInfo) | 475 | parse :: ((SessionData,RelayPacket), NodeInfo) |
474 | -> IO (Either (OnionMessage Encrypted , OnionDestination RouteId) | 476 | -> STM (Either (OnionMessage Encrypted , OnionDestination RouteId) |
475 | ((SessionData,RelayPacket), NodeInfo)) | 477 | ((SessionData,RelayPacket), NodeInfo)) |
476 | parse pass@((_,OnionPacketResponse msg@(OnionAnnounceResponse n8 _ _)), nodeA) = do | 478 | parse pass@((_,OnionPacketResponse msg@(OnionAnnounceResponse n8 _ _)), nodeA) = do |
477 | m <- lookupSender (nodeAddr nodeA) n8 | 479 | m <- lookupSender (nodeAddr nodeA) n8 |
478 | case m of | 480 | case m of |