diff options
author | Joe Crayne <joe@jerkface.net> | 2018-08-17 05:05:17 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-08-17 16:06:38 -0400 |
commit | fde5005a2d1ef3a0636cff21547d4cda22b7b215 (patch) | |
tree | 1263b8d66cbcc838432afd6cc5cb122d9c4c064b /src/Network/Tox/Crypto/Handlers.hs | |
parent | f4dd948176187f5fb46a2cf0dbfbfc4c32badfa5 (diff) |
Simplified PacketQueue/PacketBuffer interface.
Diffstat (limited to 'src/Network/Tox/Crypto/Handlers.hs')
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 138 |
1 files changed, 99 insertions, 39 deletions
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 6f440ea5..db7543f7 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -22,8 +22,7 @@ import qualified Data.ByteString as B | |||
22 | import Data.ByteString (ByteString) | 22 | import Data.ByteString (ByteString) |
23 | import Control.Lens | 23 | import Control.Lens |
24 | import Data.Function | 24 | import Data.Function |
25 | import qualified Data.PacketQueue as PQ | 25 | import Data.PacketBuffer as PB |
26 | ;import Data.PacketQueue (PacketQueue) | ||
27 | import qualified Data.CyclicBuffer as CB | 26 | import qualified Data.CyclicBuffer as CB |
28 | ;import Data.CyclicBuffer (CyclicBuffer) | 27 | ;import Data.CyclicBuffer (CyclicBuffer) |
29 | import Data.Serialize as S | 28 | import Data.Serialize as S |
@@ -354,7 +353,7 @@ data NetCryptoSession = NCrypto | |||
354 | -- the case in group chats | 353 | -- the case in group chats |
355 | , ncView :: TVar SessionView | 354 | , ncView :: TVar SessionView |
356 | -- ^ contains your nick, status etc | 355 | -- ^ contains your nick, status etc |
357 | , ncPacketQueue :: PacketQueue CryptoData | 356 | , ncPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted) |
358 | -- ^ a buffer in which incoming packets may be stored out of order | 357 | -- ^ a buffer in which incoming packets may be stored out of order |
359 | -- but from which they may be extracted in sequence, | 358 | -- but from which they may be extracted in sequence, |
360 | -- helps ensure lossless packets are processed in order | 359 | -- helps ensure lossless packets are processed in order |
@@ -383,13 +382,13 @@ data NetCryptoSession = NCrypto | |||
383 | , ncPingThread :: TVar (Maybe ThreadId) | 382 | , ncPingThread :: TVar (Maybe ThreadId) |
384 | -- ^ thread which actually queues outgoing pings | 383 | -- ^ thread which actually queues outgoing pings |
385 | , ncIdleEventThread :: TVar (Maybe ThreadId) | 384 | , ncIdleEventThread :: TVar (Maybe ThreadId) |
386 | , ncOutgoingQueue :: TVar | 385 | , ncOutgoingQueue :: TVar (UponHandshake NetCryptoOutQueue) |
387 | (UponHandshake | 386 | {- |
388 | (PQ.PacketOutQueue | 387 | (PQ.PacketOutQueue |
389 | (State,Nonce24,U.RangeMap TArray Word8 TVar) | 388 | (State,Nonce24,U.RangeMap TArray Word8 TVar) |
390 | CryptoMessage | 389 | CryptoMessage |
391 | (CryptoPacket Encrypted) | 390 | (CryptoPacket Encrypted) |
392 | CryptoData)) | 391 | CryptoData)) -} |
393 | -- ^ To send a message add it to this queue, by calling 'tryAppendQueueOutgoing' | 392 | -- ^ To send a message add it to this queue, by calling 'tryAppendQueueOutgoing' |
394 | -- but remember to call 'readyOutGoing' first, because the shared secret cache | 393 | -- but remember to call 'readyOutGoing' first, because the shared secret cache |
395 | -- presently requires the IO monad. | 394 | -- presently requires the IO monad. |
@@ -643,7 +642,7 @@ freshCryptoSession sessions | |||
643 | insertArrayAt outHooks 0 (A.array (0,64) (map assignHook [0..64])) | 642 | insertArrayAt outHooks 0 (A.array (0,64) (map assignHook [0..64])) |
644 | return (idmap,lossyEsc,losslessEsc,outHooks) | 643 | return (idmap,lossyEsc,losslessEsc,outHooks) |
645 | ncView0 <- newTVar (sessionView sessions) | 644 | ncView0 <- newTVar (sessionView sessions) |
646 | pktq <- PQ.new (inboundQueueCapacity sessions) 0 | 645 | pktq <- PB.newPacketBuffer |
647 | bufstart <- newTVar 0 | 646 | bufstart <- newTVar 0 |
648 | mbpktoq | 647 | mbpktoq |
649 | <- case mbtheirSessionKey of | 648 | <- case mbtheirSessionKey of |
@@ -696,7 +695,7 @@ freshCryptoSession sessions | |||
696 | , ncOutgoingIdMapEscapedLossy = lossyEscapeIdMap | 695 | , ncOutgoingIdMapEscapedLossy = lossyEscapeIdMap |
697 | , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap | 696 | , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap |
698 | , ncView = ncView0 | 697 | , ncView = ncView0 |
699 | , ncPacketQueue = pktq | 698 | , ncPacketBuffer = pktq |
700 | , ncStoredRequests = ncStoredRequests0 | 699 | , ncStoredRequests = ncStoredRequests0 |
701 | , ncRequestInterval = ncRequestInterval0 | 700 | , ncRequestInterval = ncRequestInterval0 |
702 | , ncAliveInterval = ncAliveInterval0 | 701 | , ncAliveInterval = ncAliveInterval0 |
@@ -720,12 +719,25 @@ freshCryptoSession sessions | |||
720 | HaveHandshake pktoq -> return (runUponHandshake netCryptoSession0 addr pktoq) | 719 | HaveHandshake pktoq -> return (runUponHandshake netCryptoSession0 addr pktoq) |
721 | return (myhandshake,maybeLaunchMissles) | 720 | return (myhandshake,maybeLaunchMissles) |
722 | 721 | ||
722 | {- | ||
723 | type NetCryptoOutQueue = PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar) | 723 | type NetCryptoOutQueue = PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar) |
724 | CryptoMessage | 724 | CryptoMessage |
725 | (CryptoPacket Encrypted) | 725 | (CryptoPacket Encrypted) |
726 | CryptoData | 726 | CryptoData |
727 | -} | ||
728 | data NetCryptoOutQueue = NetCryptoOutQueue | ||
729 | { nqPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted) | ||
730 | , nqToWire :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar) | ||
731 | -> Word32 | ||
732 | -> Word32 | ||
733 | -> Word32 | ||
734 | -> XMessage | ||
735 | -> STM (Maybe (CryptoPacket Encrypted, Word32)) | ||
736 | , nqToWireIO :: IO (STM (State, Nonce24, U.RangeMap TArray Word8 TVar)) | ||
737 | , nqPacketNo :: TVar Word32 | ||
738 | } | ||
727 | 739 | ||
728 | createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketQueue CryptoData | 740 | createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketBuffer CryptoData (CryptoPacket Encrypted) |
729 | -> TVar Nonce24 -> U.RangeMap TArray Word8 TVar -> STM (UponHandshake NetCryptoOutQueue) | 741 | -> TVar Nonce24 -> U.RangeMap TArray Word8 TVar -> STM (UponHandshake NetCryptoOutQueue) |
730 | createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 = do | 742 | createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 = do |
731 | let crypto = transportCrypto sessions | 743 | let crypto = transportCrypto sessions |
@@ -739,8 +751,13 @@ createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce | |||
739 | ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) | 751 | ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) |
740 | ) $ writeTVar ncMyPacketNonce0 n24plus1 | 752 | ) $ writeTVar ncMyPacketNonce0 n24plus1 |
741 | return (return (f n24, n24, ncOutgoingIdMap0)) | 753 | return (return (f n24, n24, ncOutgoingIdMap0)) |
742 | pktoq <- PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 | 754 | seqnoVar <- newTVar 0 |
743 | return (HaveHandshake pktoq) | 755 | return (HaveHandshake NetCryptoOutQueue |
756 | { nqPacketBuffer = pktq | ||
757 | , nqToWire = ncToWire | ||
758 | , nqToWireIO = toWireIO | ||
759 | , nqPacketNo = seqnoVar | ||
760 | }) | ||
744 | 761 | ||
745 | -- | add new session to the lookup maps | 762 | -- | add new session to the lookup maps |
746 | addSessionToMap :: NetCryptoSessions -> SockAddr -> NetCryptoSession -> STM () | 763 | addSessionToMap :: NetCryptoSessions -> SockAddr -> NetCryptoSession -> STM () |
@@ -788,11 +805,39 @@ addSessionToMapIfNotThere sessions addrRaw netCryptoSession = do | |||
788 | -- in case we're using the same long term key on different IPs ... | 805 | -- in case we're using the same long term key on different IPs ... |
789 | modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) | 806 | modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) |
790 | 807 | ||
808 | data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail | ||
809 | deriving (Show) | ||
810 | |||
811 | -- | Convert a message to packet format and append it to the front of a queue | ||
812 | -- used for outgoing messages. (Note that ‘front‛ usually means the higher | ||
813 | -- index in this implementation.) | ||
814 | -- | ||
815 | -- Called from 'runUponHandshake' and 'sendCrypto'. | ||
816 | -- | ||
817 | -- Whenever this is called, you should also send the resulting packet out on | ||
818 | -- the network. | ||
819 | |||
820 | tryAppendQueueOutgoing :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar) | ||
821 | -> NetCryptoOutQueue | ||
822 | -> CryptoMessage | ||
823 | -> STM (OutGoingResult (CryptoPacket Encrypted)) | ||
824 | tryAppendQueueOutgoing getExtra outq msg = do | ||
825 | pktno <- readTVar (nqPacketNo outq) | ||
826 | nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq) | ||
827 | be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq) | ||
828 | mbWire <- nqToWire outq getExtra nextno be pktno msg | ||
829 | case mbWire of | ||
830 | Just (payload,seqno) -> do | ||
831 | PB.grokPacket (nqPacketBuffer outq) (PacketSent seqno payload) | ||
832 | return $ OGSuccess payload | ||
833 | Nothing -> return OGEncodeFail | ||
834 | |||
835 | |||
791 | runUponHandshake :: NetCryptoSession -> SockAddr -> NetCryptoOutQueue -> IO () | 836 | runUponHandshake :: NetCryptoSession -> SockAddr -> NetCryptoOutQueue -> IO () |
792 | runUponHandshake netCryptoSession0 addr pktoq = do | 837 | runUponHandshake netCryptoSession0 addr pktoq = do |
793 | dput XNetCrypto "(((((((runUponHandshake))))))) Launching threads" | 838 | dput XNetCrypto "(((((((runUponHandshake))))))) Launching threads" |
794 | let sessions = ncAllSessions netCryptoSession0 | 839 | let sessions = ncAllSessions netCryptoSession0 |
795 | pktq = ncPacketQueue netCryptoSession0 | 840 | pktq = ncPacketBuffer netCryptoSession0 |
796 | remotePublicKey = ncTheirPublicKey netCryptoSession0 | 841 | remotePublicKey = ncTheirPublicKey netCryptoSession0 |
797 | crypto = transportCrypto sessions | 842 | crypto = transportCrypto sessions |
798 | allsessions = netCryptoSessions sessions | 843 | allsessions = netCryptoSessions sessions |
@@ -805,7 +850,7 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
805 | atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) | 850 | atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) |
806 | labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey) ++ sidStr) | 851 | labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey) ++ sidStr) |
807 | fix $ \loop -> do | 852 | fix $ \loop -> do |
808 | cd <- atomically $ PQ.dequeue pktq | 853 | cd <- atomically $ PB.awaitReadyPacket pktq |
809 | if msgID (bufferData cd) == PacketRequest | 854 | if msgID (bufferData cd) == PacketRequest |
810 | then do | 855 | then do |
811 | dput XNetCrypto $ "Dequeued::PacketRequest seqno=" ++ show (bufferStart cd) ++ " " ++ show (bufferData cd) | 856 | dput XNetCrypto $ "Dequeued::PacketRequest seqno=" ++ show (bufferStart cd) ++ " " ++ show (bufferData cd) |
@@ -824,19 +869,21 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
824 | labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) | 869 | labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) |
825 | fix $ \loop -> do | 870 | fix $ \loop -> do |
826 | atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) | 871 | atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) |
827 | nums <- atomically $ PQ.getMissing pktq | 872 | (nums,seqno) <- atomically $ PB.packetNumbersToRequest pktq |
828 | dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums | 873 | dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums |
829 | getOutGoingParam <- PQ.readyOutGoing pktoq | 874 | getOutGoingParam <- nqToWireIO pktoq |
830 | atomically $ do | 875 | x <- atomically $ do |
831 | seqno <- PQ.getLastDequeuedPlus1 pktq | 876 | ogresult <- tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums) |
832 | ogresult <- PQ.tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums) | ||
833 | case ogresult of | 877 | case ogresult of |
834 | PQ.OGSuccess _ -> return () | 878 | OGSuccess x -> return x |
835 | _ -> retry | 879 | _ -> retry |
880 | sendSessionPacket sessions addr x | ||
836 | loop | 881 | loop |
837 | dput XNetCrypto $ "runUponHandshake: " ++ show threadid ++ " = NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr | 882 | dput XNetCrypto $ "runUponHandshake: " ++ show threadid ++ " = NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr |
838 | 883 | ||
839 | -- launch dequeueOutgoing thread | 884 | -- launch dequeueOutgoing thread |
885 | {- | ||
886 | -- TODO | ||
840 | threadidOutgoing <- forkIO $ do | 887 | threadidOutgoing <- forkIO $ do |
841 | tid <- myThreadId | 888 | tid <- myThreadId |
842 | atomically $ writeTVar (ncDequeueOutGoingThread netCryptoSession0) (Just tid) | 889 | atomically $ writeTVar (ncDequeueOutGoingThread netCryptoSession0) (Just tid) |
@@ -846,7 +893,8 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
846 | dput XNetCrypto "NetCryptoDequeueOutgoing thread... Sending encrypted Packet" | 893 | dput XNetCrypto "NetCryptoDequeueOutgoing thread... Sending encrypted Packet" |
847 | sendSessionPacket sessions addr pkt | 894 | sendSessionPacket sessions addr pkt |
848 | loop | 895 | loop |
849 | dput XNetCrypto $ "runUponHandshake: " ++ show threadidOutgoing ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr | 896 | -} |
897 | dput XNetCrypto $ "runUponHandshake: " ++ show "threadidOutgoing" ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr | ||
850 | 898 | ||
851 | -- launch ping Machine thread | 899 | -- launch ping Machine thread |
852 | pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) | 900 | pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) |
@@ -859,10 +907,10 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
859 | labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) | 907 | labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) |
860 | fix $ \loop -> do | 908 | fix $ \loop -> do |
861 | atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) | 909 | atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) |
862 | dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" | 910 | dput XNetCryptoOut $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" |
863 | lr <- sendPing crypto netCryptoSession0 | 911 | lr <- sendPing crypto netCryptoSession0 |
864 | case lr of | 912 | case lr of |
865 | Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s | 913 | Left s -> dput XNetCryptoOut $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s |
866 | Right _ -> return () | 914 | Right _ -> return () |
867 | loop | 915 | loop |
868 | 916 | ||
@@ -972,7 +1020,7 @@ updateCryptoSession sessions addr newsession timestamp hp session handshake = do | |||
972 | sessions | 1020 | sessions |
973 | newsession | 1021 | newsession |
974 | theirSessionPublic | 1022 | theirSessionPublic |
975 | (ncPacketQueue session) | 1023 | (ncPacketBuffer session) |
976 | (ncMyPacketNonce session) | 1024 | (ncMyPacketNonce session) |
977 | (ncOutgoingIdMap session) | 1025 | (ncOutgoingIdMap session) |
978 | writeTVar (ncOutgoingQueue session) mbpktoq | 1026 | writeTVar (ncOutgoingQueue session) mbpktoq |
@@ -1055,7 +1103,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1055 | Nothing -> do | 1103 | Nothing -> do |
1056 | dput XNetCrypto "Dropping packet.. no session" | 1104 | dput XNetCrypto "Dropping packet.. no session" |
1057 | return Nothing -- drop packet, we have no session | 1105 | return Nothing -- drop packet, we have no session |
1058 | Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketQueue, ncHooks, | 1106 | Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketBuffer, ncHooks, |
1059 | ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, | 1107 | ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, |
1060 | ncPingMachine, ncSessionId, ncStoredRequests}) -> do | 1108 | ncPingMachine, ncSessionId, ncStoredRequests}) -> do |
1061 | -- Unrecognized packets, try them thrice so as to give | 1109 | -- Unrecognized packets, try them thrice so as to give |
@@ -1130,9 +1178,10 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1130 | isLossy (GrpMsg KnownLossy _) = True | 1178 | isLossy (GrpMsg KnownLossy _) = True |
1131 | isLossy (Msg mid) | lossyness mid == Lossy = True | 1179 | isLossy (Msg mid) | lossyness mid == Lossy = True |
1132 | isLossy _ = False | 1180 | isLossy _ = False |
1181 | ack = bufferStart -- Earliest sequence number they've seen from us. | ||
1133 | if isLossy msgTypMapped | 1182 | if isLossy msgTypMapped |
1134 | then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm | 1183 | then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm |
1135 | atomically $ PQ.observeOutOfBand ncPacketQueue bufferEnd | 1184 | atomically $ PB.grokPacket ncPacketBuffer (PacketReceivedLossy bufferEnd ack) |
1136 | runCryptoHook session (bufferData cd) | 1185 | runCryptoHook session (bufferData cd) |
1137 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm | 1186 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm |
1138 | when (msgID cm == PING) $ | 1187 | when (msgID cm == PING) $ |
@@ -1143,7 +1192,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1143 | -- num <- CB.getNextSequenceNum ncStoredRequests | 1192 | -- num <- CB.getNextSequenceNum ncStoredRequests |
1144 | -- CB.enqueue ncStoredRequests num cd | 1193 | -- CB.enqueue ncStoredRequests num cd |
1145 | handlePacketRequest session cd | 1194 | handlePacketRequest session cd |
1146 | atomically $ PQ.enqueue ncPacketQueue bufferEnd cd | 1195 | atomically $ PB.grokPacket ncPacketBuffer (PacketReceived bufferEnd cd ack) |
1147 | return Nothing | 1196 | return Nothing |
1148 | where | 1197 | where |
1149 | last2Bytes :: Nonce24 -> Word16 | 1198 | last2Bytes :: Nonce24 -> Word16 |
@@ -1262,14 +1311,18 @@ sendCrypto crypto session updateLocal cm = do | |||
1262 | HaveHandshake outq <- atomically $ readTVar (ncOutgoingQueue session) | 1311 | HaveHandshake outq <- atomically $ readTVar (ncOutgoingQueue session) |
1263 | -- XXX: potential race? if shared secret comes out of sync with cache? | 1312 | -- XXX: potential race? if shared secret comes out of sync with cache? |
1264 | dput XNetCrypto "sendCrypto: enter " | 1313 | dput XNetCrypto "sendCrypto: enter " |
1265 | getOutGoingParam <- PQ.readyOutGoing outq | 1314 | getOutGoingParam <- nqToWireIO outq |
1266 | dput XNetCrypto "sendCrypto: got the io extra stuff" | 1315 | dput XNetCrypto "sendCrypto: got the io extra stuff" |
1267 | atomically $ do | 1316 | r <- atomically $ do |
1268 | result <- PQ.tryAppendQueueOutgoing getOutGoingParam outq cm | 1317 | result <- tryAppendQueueOutgoing getOutGoingParam outq cm |
1269 | case result of | 1318 | case result of |
1270 | PQ.OGSuccess x -> updateLocal >> return (Right x) | 1319 | OGSuccess x -> updateLocal >> return (Right x) |
1271 | PQ.OGFull -> return (Left "Outgoing packet buffer is full") | 1320 | OGFull -> return (Left "Outgoing packet buffer is full") |
1272 | PQ.OGEncodeFail -> return (Left "Failed to encode outgoing packet") | 1321 | OGEncodeFail -> return (Left "Failed to encode outgoing packet") |
1322 | case ncSockAddr session of | ||
1323 | HaveDHTKey saddr -> mapM_ (sendSessionPacket (ncAllSessions session) saddr) r | ||
1324 | _ -> return () | ||
1325 | return r | ||
1273 | 1326 | ||
1274 | sendPing :: TransportCrypto -> NetCryptoSession -> IO (Either String (CryptoPacket Encrypted)) | 1327 | sendPing :: TransportCrypto -> NetCryptoSession -> IO (Either String (CryptoPacket Encrypted)) |
1275 | sendPing crypto session = do | 1328 | sendPing crypto session = do |
@@ -1327,8 +1380,16 @@ sendCryptoLossy crypto session updateLocal cm = do | |||
1327 | updateLocal | 1380 | updateLocal |
1328 | return (Left errmsg) | 1381 | return (Left errmsg) |
1329 | HaveHandshake outq -> do | 1382 | HaveHandshake outq -> do |
1330 | getOutGoingParam <- PQ.readyOutGoing outq | 1383 | getOutGoingParam <- nqToWireIO outq |
1331 | mbPkt <- atomically $ PQ.peekPacket getOutGoingParam outq cm | 1384 | mbPkt <- atomically $ do |
1385 | pktno <- readTVar (nqPacketNo outq) | ||
1386 | nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq) | ||
1387 | be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq) | ||
1388 | nqToWire outq getOutGoingParam -- See 'ncToWire' | ||
1389 | nextno -- packet number we expect to recieve | ||
1390 | be -- buffer_end (for lossy) | ||
1391 | pktno -- packet number (for lossless) | ||
1392 | cm | ||
1332 | case mbPkt of | 1393 | case mbPkt of |
1333 | Nothing -> do | 1394 | Nothing -> do |
1334 | let errmsg = "Error sending lossy packet! (sessionid: " ++ show (ncSessionId session) ++ ") " ++ show cm | 1395 | let errmsg = "Error sending lossy packet! (sessionid: " ++ show (ncSessionId session) ++ ") " ++ show cm |
@@ -1466,11 +1527,10 @@ handlePacketRequest session (CryptoData { bufferStart=num | |||
1466 | mbOutQ <- atomically $ readTVar (ncOutgoingQueue session) | 1527 | mbOutQ <- atomically $ readTVar (ncOutgoingQueue session) |
1467 | case mbOutQ of | 1528 | case mbOutQ of |
1468 | HaveHandshake pktoq -> do | 1529 | HaveHandshake pktoq -> do |
1469 | getOutGoingParam <-PQ.readyOutGoing pktoq | 1530 | getOutGoingParam <-nqToWireIO pktoq |
1470 | ps <- atomically $ PQ.getRequested getOutGoingParam pktoq num bs | 1531 | ps <- atomically $ PB.retrieveForResend (nqPacketBuffer pktoq) $ PB.decompressSequenceNumbers num bs |
1471 | let resend (Just (n,pkt)) = sendSessionPacket (ncAllSessions session) addr pkt | 1532 | let resend (n,pkt) = sendSessionPacket (ncAllSessions session) addr pkt |
1472 | resend _ = return () | 1533 | dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst ps)) |
1473 | dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst $ catMaybes ps)) | ||
1474 | mapM_ resend ps | 1534 | mapM_ resend ps |
1475 | _ -> return () | 1535 | _ -> return () |
1476 | 1536 | ||