diff options
author | Joe Crayne <joe@jerkface.net> | 2018-09-04 23:27:07 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-09-07 13:18:56 -0400 |
commit | 4aeaf247a25fbe80598ce54e4142a707ec5b9951 (patch) | |
tree | 7e592aecd269b5d71ae7b2aee343b23156bdfbe0 | |
parent | b597d554c2b970c775b2954709b397b0ddf6870d (diff) |
Tox Alternative sessions: Outgoing seqno/ack reversed.
-rw-r--r-- | src/Network/Lossless.hs | 30 | ||||
-rw-r--r-- | src/Network/SessionTransports.hs | 33 | ||||
-rw-r--r-- | src/Network/Tox/Session.hs | 36 |
3 files changed, 68 insertions, 31 deletions
diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs index 45241b6d..bdbeb3a2 100644 --- a/src/Network/Lossless.hs +++ b/src/Network/Lossless.hs | |||
@@ -10,6 +10,7 @@ import Data.Function | |||
10 | import Data.Word | 10 | import Data.Word |
11 | 11 | ||
12 | import Data.PacketBuffer as PB | 12 | import Data.PacketBuffer as PB |
13 | import DPut | ||
13 | import Network.QueryResponse | 14 | import Network.QueryResponse |
14 | 15 | ||
15 | #ifdef THREAD_DEBUG | 16 | #ifdef THREAD_DEBUG |
@@ -24,11 +25,12 @@ data SequenceInfo = SequenceInfo | |||
24 | } | 25 | } |
25 | deriving (Eq,Ord,Show) | 26 | deriving (Eq,Ord,Show) |
26 | 27 | ||
27 | lossless :: (x -> addr -> IO (PacketInboundEvent (x',addr'))) | 28 | lossless :: Show addr => |
29 | (x -> addr -> IO (PacketInboundEvent (x',addr'))) | ||
28 | -> (SequenceInfo -> x' -> addr' -> IO y) | 30 | -> (SequenceInfo -> x' -> addr' -> IO y) |
29 | -> addr | 31 | -> addr |
30 | -> TransportA err addr x y | 32 | -> TransportA String addr x y |
31 | -> IO ( Transport err addr' x' | 33 | -> IO ( Transport String addr' x' |
32 | , [Word32] -> IO () | 34 | , [Word32] -> IO () |
33 | , IO ([Word32],Word32) | 35 | , IO ([Word32],Word32) |
34 | ) | 36 | ) |
@@ -40,6 +42,7 @@ lossless isLossless encode saddr udp = do | |||
40 | rloop <- forkIO $ fix $ \loop -> do | 42 | rloop <- forkIO $ fix $ \loop -> do |
41 | -- This thread enqueues inbound packets or writes them to the oob | 43 | -- This thread enqueues inbound packets or writes them to the oob |
42 | -- channel. | 44 | -- channel. |
45 | myThreadId >>= flip labelThread ("lossless."++show saddr) | ||
43 | awaitMessage udp $ \m -> do | 46 | awaitMessage udp $ \m -> do |
44 | forM_ m $ \raw -> do | 47 | forM_ m $ \raw -> do |
45 | m' <- mapM (uncurry isLossless) raw | 48 | m' <- mapM (uncurry isLossless) raw |
@@ -53,7 +56,9 @@ lossless isLossless encode saddr udp = do | |||
53 | PB.grokInboundPacket pb event | 56 | PB.grokInboundPacket pb event |
54 | case event of | 57 | case event of |
55 | PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event) | 58 | PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event) |
56 | _ -> return () | 59 | _ -> do |
60 | report <- pbReport "enqueued" pb | ||
61 | writeTChan oob (Left report) | ||
57 | loop | 62 | loop |
58 | let tr = Transport | 63 | let tr = Transport |
59 | { awaitMessage = \kont -> do | 64 | { awaitMessage = \kont -> do |
@@ -61,14 +66,22 @@ lossless isLossless encode saddr udp = do | |||
61 | (do x <- readTChan oob | 66 | (do x <- readTChan oob |
62 | return $ kont $! Just x) | 67 | return $ kont $! Just x) |
63 | (do x <- PB.awaitReadyPacket pb | 68 | (do x <- PB.awaitReadyPacket pb |
64 | return $ kont $! Just (Right x)) | 69 | report <- pbReport "dequeued" pb |
70 | return $ do | ||
71 | dput XNetCrypto report | ||
72 | kont $! Just (Right x)) | ||
65 | , sendMessage = \a' x' -> do | 73 | , sendMessage = \a' x' -> do |
66 | seqno <- atomically $ do | 74 | seqno <- atomically $ do |
67 | seqno <- PB.nextToSendSequenceNumber pb | 75 | seqno <- PB.nextToSendSequenceNumber pb |
68 | ack <- PB.expectingSequenceNumber pb | 76 | ack <- PB.expectingSequenceNumber pb |
69 | return $ SequenceInfo seqno ack | 77 | return $ SequenceInfo seqno ack |
70 | x <- encode seqno x' a' | 78 | x <- encode seqno x' a' |
71 | atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | 79 | (isfull,nn) <- atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) |
80 | when isfull $ do | ||
81 | dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) | ||
82 | atomically $ do | ||
83 | (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | ||
84 | when isfull retry | ||
72 | sendMessage udp saddr x | 85 | sendMessage udp saddr x |
73 | , closeTransport = do | 86 | , closeTransport = do |
74 | killThread rloop | 87 | killThread rloop |
@@ -76,5 +89,8 @@ lossless isLossless encode saddr udp = do | |||
76 | } | 89 | } |
77 | resend ns = do | 90 | resend ns = do |
78 | xs <- atomically $ retrieveForResend pb ns | 91 | xs <- atomically $ retrieveForResend pb ns |
79 | mapM_ (sendMessage udp saddr . snd) xs | 92 | dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." |
93 | forM_ xs $ \x -> do | ||
94 | dput XNetCrypto $ shows saddr $ " <-- Resending packet." | ||
95 | sendMessage udp saddr . snd $ x | ||
80 | return (tr, resend, atomically $ PB.packetNumbersToRequest pb) | 96 | return (tr, resend, atomically $ PB.packetNumbersToRequest pb) |
diff --git a/src/Network/SessionTransports.hs b/src/Network/SessionTransports.hs index 17763e4e..e9daf6c1 100644 --- a/src/Network/SessionTransports.hs +++ b/src/Network/SessionTransports.hs | |||
@@ -49,7 +49,7 @@ newSession :: Sessions raw | |||
49 | -> (addr -> y -> IO raw) | 49 | -> (addr -> y -> IO raw) |
50 | -> (SockAddr -> raw -> IO (Maybe (x, addr))) | 50 | -> (SockAddr -> raw -> IO (Maybe (x, addr))) |
51 | -> SockAddr | 51 | -> SockAddr |
52 | -> IO (Maybe (TransportA err addr x y)) | 52 | -> IO (Maybe (Int,TransportA err addr x y)) |
53 | newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwrap wrap addr0 = do | 53 | newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwrap wrap addr0 = do |
54 | mvar <- newEmptyMVar | 54 | mvar <- newEmptyMVar |
55 | let saddr = -- Canonical in case of 6-mapped-4 addresses. | 55 | let saddr = -- Canonical in case of 6-mapped-4 addresses. |
@@ -69,21 +69,22 @@ newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwr | |||
69 | $ IntMap.singleton sid handlePacket | 69 | $ IntMap.singleton sid handlePacket |
70 | return sid | 70 | return sid |
71 | forM msid $ \sid -> do | 71 | forM msid $ \sid -> do |
72 | return Transport | 72 | let tr = Transport |
73 | { awaitMessage = \kont -> do | 73 | { awaitMessage = \kont -> do |
74 | x <- takeMVar mvar | 74 | x <- takeMVar mvar |
75 | kont $! Right <$> x | 75 | kont $! Right <$> x |
76 | , sendMessage = \addr x -> do | 76 | , sendMessage = \addr x -> do |
77 | x' <- unwrap addr x | 77 | x' <- unwrap addr x |
78 | sessionsSendRaw saddr x' | 78 | sessionsSendRaw saddr x' |
79 | , closeTransport = do | 79 | , closeTransport = do |
80 | tryTakeMVar mvar | 80 | tryTakeMVar mvar |
81 | putMVar mvar Nothing | 81 | putMVar mvar Nothing |
82 | atomically $ do | 82 | atomically $ do |
83 | modifyTVar' sessionIds $ S.delete sid | 83 | modifyTVar' sessionIds $ S.delete sid |
84 | modifyTVar' sessionsById $ IntMap.delete sid | 84 | modifyTVar' sessionsById $ IntMap.delete sid |
85 | modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr | 85 | modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr |
86 | } | 86 | } |
87 | return (sid,tr) | ||
87 | 88 | ||
88 | sessionHandler :: Sessions x -> (SockAddr -> x -> IO (Maybe (x -> x))) | 89 | sessionHandler :: Sessions x -> (SockAddr -> x -> IO (Maybe (x -> x))) |
89 | sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do | 90 | sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do |
diff --git a/src/Network/Tox/Session.hs b/src/Network/Tox/Session.hs index a52e9478..88221b11 100644 --- a/src/Network/Tox/Session.hs +++ b/src/Network/Tox/Session.hs | |||
@@ -10,6 +10,7 @@ import Network.Socket | |||
10 | import Crypto.Tox | 10 | import Crypto.Tox |
11 | import Data.PacketBuffer (PacketInboundEvent (..)) | 11 | import Data.PacketBuffer (PacketInboundEvent (..)) |
12 | import Data.Tox.Message | 12 | import Data.Tox.Message |
13 | import DPut | ||
13 | import Network.Lossless | 14 | import Network.Lossless |
14 | import Network.QueryResponse | 15 | import Network.QueryResponse |
15 | import Network.SessionTransports | 16 | import Network.SessionTransports |
@@ -43,6 +44,7 @@ data Session = Session | |||
43 | -- convenience, a lower bound for the numbers in the list is also | 44 | -- convenience, a lower bound for the numbers in the list is also |
44 | -- returned. Suggested polling interval: a few seconds. | 45 | -- returned. Suggested polling interval: a few seconds. |
45 | , sTransport :: Transport String () CryptoMessage | 46 | , sTransport :: Transport String () CryptoMessage |
47 | , sSessionID :: Int | ||
46 | } | 48 | } |
47 | 49 | ||
48 | handshakeH :: SessionParams | 50 | handshakeH :: SessionParams |
@@ -63,7 +65,11 @@ plainHandshakeH :: SessionParams | |||
63 | -> IO () | 65 | -> IO () |
64 | plainHandshakeH sp saddr skey handshake = do | 66 | plainHandshakeH sp saddr skey handshake = do |
65 | let hd = runIdentity $ handshakeData handshake | 67 | let hd = runIdentity $ handshakeData handshake |
68 | prelude = show saddr ++ " --> " | ||
69 | dput XNetCrypto $ prelude ++ "handshake: " ++ show (otherCookie hd, baseNonce hd) | ||
66 | sent <- spGetSentHandshake sp skey saddr (handshakeCookie handshake) (otherCookie hd) | 70 | sent <- spGetSentHandshake sp skey saddr (handshakeCookie handshake) (otherCookie hd) |
71 | -- TODO: this is always returning sent = Nothing | ||
72 | dput XNetCrypto $ prelude ++ "plainHandshakeH: cached outgoing: " ++ show (fmap (baseNonce . snd) sent) | ||
67 | forM_ sent $ \(hd_skey,hd_sent) -> do | 73 | forM_ sent $ \(hd_skey,hd_sent) -> do |
68 | sk <- SessionKeys (spCrypto sp) | 74 | sk <- SessionKeys (spCrypto sp) |
69 | hd_skey | 75 | hd_skey |
@@ -71,7 +77,8 @@ plainHandshakeH sp saddr skey handshake = do | |||
71 | <$> atomically (newTVar $ baseNonce hd) | 77 | <$> atomically (newTVar $ baseNonce hd) |
72 | <*> atomically (newTVar $ baseNonce hd_sent) | 78 | <*> atomically (newTVar $ baseNonce hd_sent) |
73 | m <- newSession (spSessions sp) (\() p -> return p) (decryptPacket sk) saddr | 79 | m <- newSession (spSessions sp) (\() p -> return p) (decryptPacket sk) saddr |
74 | forM_ m $ \t -> do | 80 | dput XNetCrypto $ prelude ++ "plainHandshakeH: session " ++ maybe "Nothing" (const "Just") m |
81 | forM_ m $ \(sid, t) -> do | ||
75 | (t2,resend,getMissing) | 82 | (t2,resend,getMissing) |
76 | <- lossless (\cp a -> return $ fmap (,a) $ checkLossless $ runIdentity $ pktData cp) | 83 | <- lossless (\cp a -> return $ fmap (,a) $ checkLossless $ runIdentity $ pktData cp) |
77 | (\seqno p _ -> encryptPacket sk $ bookKeeping seqno p) | 84 | (\seqno p _ -> encryptPacket sk $ bookKeeping seqno p) |
@@ -79,6 +86,7 @@ plainHandshakeH sp saddr skey handshake = do | |||
79 | t | 86 | t |
80 | let _ = t :: TransportA String () (CryptoPacket Identity) (CryptoPacket Encrypted) | 87 | let _ = t :: TransportA String () (CryptoPacket Identity) (CryptoPacket Encrypted) |
81 | _ = t2 :: Transport String () CryptoMessage | 88 | _ = t2 :: Transport String () CryptoMessage |
89 | sendMessage t2 () $ OneByte ONLINE | ||
82 | spOnNewSession sp Session | 90 | spOnNewSession sp Session |
83 | { sOurKey = skey | 91 | { sOurKey = skey |
84 | , sTheirAddr = saddr | 92 | , sTheirAddr = saddr |
@@ -87,6 +95,7 @@ plainHandshakeH sp saddr skey handshake = do | |||
87 | , sResendPackets = resend | 95 | , sResendPackets = resend |
88 | , sMissingInbound = getMissing | 96 | , sMissingInbound = getMissing |
89 | , sTransport = t2 | 97 | , sTransport = t2 |
98 | , sSessionID = sid | ||
90 | } | 99 | } |
91 | return () | 100 | return () |
92 | 101 | ||
@@ -122,17 +131,28 @@ data SessionKeys = SessionKeys | |||
122 | , skNonceOutgoing :: TVar Nonce24 -- +1 on every packet | 131 | , skNonceOutgoing :: TVar Nonce24 -- +1 on every packet |
123 | } | 132 | } |
124 | 133 | ||
134 | |||
135 | -- From spec.md: | ||
136 | -- | ||
137 | -- Data in the encrypted packets: | ||
138 | -- | ||
139 | -- [our recvbuffers buffer_start, (highest packet number handled + 1), (big endian)] | ||
140 | -- [uint32_t packet number if lossless, sendbuffer buffer_end if lossy, (big endian)] | ||
141 | -- [data] | ||
142 | |||
143 | |||
125 | bookKeeping :: SequenceInfo -> CryptoMessage -> CryptoData | 144 | bookKeeping :: SequenceInfo -> CryptoMessage -> CryptoData |
126 | bookKeeping (SequenceInfo seqno ack) m = CryptoData | 145 | bookKeeping (SequenceInfo seqno ack) m = CryptoData |
127 | { bufferStart = seqno :: Word32 | 146 | { bufferStart = ack :: Word32 |
128 | , bufferEnd = ack :: Word32 | 147 | , bufferEnd = seqno :: Word32 |
129 | , bufferData = m | 148 | , bufferData = m |
130 | } | 149 | } |
131 | 150 | ||
132 | checkLossless :: CryptoData -> PacketInboundEvent CryptoMessage | 151 | checkLossless :: CryptoData -> PacketInboundEvent CryptoMessage |
133 | checkLossless CryptoData{ bufferStart = ack | 152 | checkLossless cd@CryptoData{ bufferStart = ack |
134 | , bufferEnd = no | 153 | , bufferEnd = no |
135 | , bufferData = x } = tag no x ack | 154 | , bufferData = x } = tag no x' ack |
136 | where | 155 | where |
137 | tag = case lossyness (msgID x) of Lossy -> PacketReceivedLossy | 156 | x' = decodeRawCryptoMsg cd |
138 | _ -> PacketReceived | 157 | tag = case lossyness (msgID x') of Lossy -> PacketReceivedLossy |
158 | _ -> PacketReceived | ||