diff options
author | Joe Crayne <joe@jerkface.net> | 2018-08-17 05:05:17 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-08-17 16:06:38 -0400 |
commit | fde5005a2d1ef3a0636cff21547d4cda22b7b215 (patch) | |
tree | 1263b8d66cbcc838432afd6cc5cb122d9c4c064b /src | |
parent | f4dd948176187f5fb46a2cf0dbfbfc4c32badfa5 (diff) |
Simplified PacketQueue/PacketBuffer interface.
Diffstat (limited to 'src')
-rw-r--r-- | src/DPut.hs | 1 | ||||
-rw-r--r-- | src/Data/PacketBuffer.hs | 102 | ||||
-rw-r--r-- | src/Data/PacketQueue.hs | 312 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 138 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Transport.hs | 7 |
5 files changed, 305 insertions, 255 deletions
diff --git a/src/DPut.hs b/src/DPut.hs index e02f8ce6..f3760645 100644 --- a/src/DPut.hs +++ b/src/DPut.hs | |||
@@ -19,6 +19,7 @@ data DebugTag | |||
19 | | XLan | 19 | | XLan |
20 | | XMan | 20 | | XMan |
21 | | XNetCrypto | 21 | | XNetCrypto |
22 | | XNetCryptoOut | ||
22 | | XOnion | 23 | | XOnion |
23 | | XRoutes | 24 | | XRoutes |
24 | | XPing | 25 | | XPing |
diff --git a/src/Data/PacketBuffer.hs b/src/Data/PacketBuffer.hs new file mode 100644 index 00000000..c5ede50d --- /dev/null +++ b/src/Data/PacketBuffer.hs | |||
@@ -0,0 +1,102 @@ | |||
1 | {-# LANGUAGE TupleSections #-} | ||
2 | module Data.PacketBuffer | ||
3 | ( PacketBuffer | ||
4 | , newPacketBuffer | ||
5 | , PacketEvent(..) | ||
6 | , grokPacket | ||
7 | , awaitReadyPacket | ||
8 | , packetNumbersToRequest | ||
9 | , expectingSequenceNumber | ||
10 | , nextToSendSequenceNumber | ||
11 | , retrieveForResend | ||
12 | , decompressSequenceNumbers | ||
13 | ) where | ||
14 | |||
15 | import Data.PacketQueue as Q | ||
16 | |||
17 | import Control.Concurrent.STM | ||
18 | import Control.Monad | ||
19 | import Data.Maybe | ||
20 | import Data.Word | ||
21 | |||
22 | data PacketBuffer a b = PacketBuffer | ||
23 | { inQueue :: PacketQueue a | ||
24 | , outBuffer :: PacketQueue b | ||
25 | } | ||
26 | |||
27 | -- | Initialize the packet buffers. Note, the capacity of the inbound packet | ||
28 | -- queue is currently hardcoded to 200 packets and the capacity of the outbound | ||
29 | -- buffer is hardcoded to 400 packets. | ||
30 | newPacketBuffer :: STM (PacketBuffer a b) | ||
31 | newPacketBuffer = PacketBuffer <$> Q.new 200 0 | ||
32 | <*> Q.new 400 0 | ||
33 | |||
34 | -- | Input for 'grokPacket'. | ||
35 | data PacketEvent a b | ||
36 | = PacketSent { peSeqNum :: Word32 -- ^ Sequence number for payload. | ||
37 | , peSentPayload :: b -- ^ Payload packet we sent to them. | ||
38 | } | ||
39 | | PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. | ||
40 | , peReceivedPayload :: a -- ^ Payload packet they sent to us. | ||
41 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | ||
42 | } | ||
43 | | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet. | ||
44 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | ||
45 | } | ||
46 | |||
47 | -- | Whenever a packet is received or sent (but not resent) from the network, | ||
48 | -- this function should be called to update the relevant buffers. | ||
49 | -- | ||
50 | -- On outgoing packets, if the outbound buffer is full, this will block | ||
51 | -- indefinitely until it is called in another thread with an inbound | ||
52 | -- acknowledgement. | ||
53 | grokPacket :: PacketBuffer a b -> PacketEvent a b -> STM () | ||
54 | grokPacket (PacketBuffer _ outb) (PacketSent seqno a) | ||
55 | = do (n,_) <- Q.enqueue outb seqno a | ||
56 | when (n/=0) retry | ||
57 | grokPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) | ||
58 | = do Q.enqueue inb seqno a | ||
59 | Q.dropPacketsBefore outb ack | ||
60 | grokPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack) | ||
61 | = do Q.observeOutOfBand inb seqno | ||
62 | Q.dropPacketsBefore outb ack | ||
63 | |||
64 | -- | Wait until an inbound packet is ready to process. Any necessary | ||
65 | -- re-ordering will have been done. | ||
66 | awaitReadyPacket :: PacketBuffer a b -> STM a | ||
67 | awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb | ||
68 | |||
69 | -- | Obtain a list of sequence numbers that may have been dropped. This would | ||
70 | -- be any number not yet received that is prior to the maxium sequence number | ||
71 | -- we have received. For convenience, a lowerbound for the missing squence numbers | ||
72 | -- is also returned as the second item of the pair. | ||
73 | packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32) | ||
74 | packetNumbersToRequest (PacketBuffer inb _) = do | ||
75 | ns <- Q.getMissing inb | ||
76 | lb <- Q.getLastDequeuedPlus1 inb | ||
77 | return (ns,lb) | ||
78 | |||
79 | expectingSequenceNumber :: PacketBuffer a b -> STM Word32 | ||
80 | expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb | ||
81 | |||
82 | nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32 | ||
83 | nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb | ||
84 | |||
85 | -- | Retrieve already-sent packets by their sequence numbers. See | ||
86 | -- 'decompressSequenceNumbers' to obtain the sequence number list from a | ||
87 | -- compressed encoding. There is no need to call 'grokPacket' when sending the | ||
88 | -- packets returned from this call. | ||
89 | retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)] | ||
90 | retrieveForResend (PacketBuffer _ outb) seqnos = | ||
91 | catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no) | ||
92 | |||
93 | -- | Expand a compressed set of sequence numbers. The first squence number is | ||
94 | -- given directly and the rest are computed using 8-bit offsets. This is | ||
95 | -- normally used to obtain input for 'retrieveForResend'. | ||
96 | decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32] | ||
97 | decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1) | ||
98 | where | ||
99 | doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32] | ||
100 | doOne 0 f addend = f (addend+255) | ||
101 | doOne x f addend = let y = fromIntegral x + addend | ||
102 | in y : f y | ||
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index 6f997ac0..c5d5ac09 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -3,30 +3,22 @@ | |||
3 | -- be stored out of order, but from which they are extracted in the proper | 3 | -- be stored out of order, but from which they are extracted in the proper |
4 | -- sequence. | 4 | -- sequence. |
5 | {-# LANGUAGE NamedFieldPuns #-} | 5 | {-# LANGUAGE NamedFieldPuns #-} |
6 | {-# LANGUAGE FlexibleContexts #-} | ||
7 | module Data.PacketQueue | 6 | module Data.PacketQueue |
8 | ( PacketQueue | 7 | ( PacketQueue |
9 | , getCapacity | 8 | , getCapacity |
10 | , getLastDequeuedPlus1 | 9 | , getLastDequeuedPlus1 |
10 | , getLastEnqueuedPlus1 | ||
11 | , new | 11 | , new |
12 | , dequeue | 12 | , dequeue |
13 | , dropPacketsBefore | ||
13 | , getMissing | 14 | , getMissing |
14 | , dequeueOrGetMissing | 15 | -- , dequeueOrGetMissing |
15 | , markButNotDequeue | 16 | -- , markButNotDequeue |
16 | , enqueue | 17 | , enqueue |
17 | , observeOutOfBand | 18 | , observeOutOfBand |
18 | , PacketOutQueue | ||
19 | , packetQueueViewList | 19 | , packetQueueViewList |
20 | , newOutGoing | 20 | -- , mapQ |
21 | , readyOutGoing | 21 | , Data.PacketQueue.lookup |
22 | , toPNums | ||
23 | , getRequested | ||
24 | , peekPacket | ||
25 | , tryAppendQueueOutgoing | ||
26 | , dequeueOutgoing | ||
27 | , getHighestHandledPacketPlus1 | ||
28 | , mapOutGoing | ||
29 | , OutGoingResult(..) | ||
30 | ) where | 22 | ) where |
31 | 23 | ||
32 | import Control.Concurrent.STM | 24 | import Control.Concurrent.STM |
@@ -34,24 +26,35 @@ import Control.Monad | |||
34 | import Data.Word | 26 | import Data.Word |
35 | import Data.Array.MArray | 27 | import Data.Array.MArray |
36 | import Data.Maybe | 28 | import Data.Maybe |
37 | import DPut | ||
38 | 29 | ||
39 | data PacketQueue a = PacketQueue | 30 | data PacketQueue a = PacketQueue |
40 | { pktq :: TArray Word32 (Maybe a) | 31 | { pktq :: TArray Word32 (Maybe a) |
41 | , seqno :: TVar Word32 -- (buffer_start) | 32 | , seqno :: TVar Word32 -- (buffer_start) |
42 | , qsize :: Word32 | 33 | , qsize :: Word32 |
43 | , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 | 34 | , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 |
35 | -- i.e. one more than the largest seen sequence number. | ||
44 | } | 36 | } |
45 | 37 | ||
38 | -- | Obtain a list of non-empty slots in the 'PacketQueue'. The numeric value | ||
39 | -- is an index into the underlying array, not a sequence number. | ||
46 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] | 40 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] |
47 | packetQueueViewList p = do | 41 | packetQueueViewList p = do |
48 | let f (n,Nothing) = Nothing | 42 | let f (n,Nothing) = Nothing |
49 | f (n,Just x) = Just (n,x) | 43 | f (n,Just x) = Just (n,x) |
50 | catMaybes . map f <$> getAssocs (pktq p) | 44 | catMaybes . map f <$> getAssocs (pktq p) |
51 | 45 | ||
46 | -- | This returns the earliest sequence number with a slot in the queue. | ||
52 | getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 | 47 | getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 |
53 | getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno | 48 | getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno |
54 | 49 | ||
50 | -- | This returns the least upper bound of sequence numbers that have been | ||
51 | -- enqueued. | ||
52 | getLastEnqueuedPlus1 :: PacketQueue a -> STM Word32 | ||
53 | getLastEnqueuedPlus1 PacketQueue {buffend} = readTVar buffend | ||
54 | |||
55 | |||
56 | -- | This is the number of consequetive sequence numbers, starting at | ||
57 | -- 'getLastDequeuedPlus1' that can be stored in the queue | ||
55 | getCapacity :: Applicative m => PacketQueue t -> m Word32 | 58 | getCapacity :: Applicative m => PacketQueue t -> m Word32 |
56 | getCapacity (PacketQueue { qsize }) = pure qsize | 59 | getCapacity (PacketQueue { qsize }) = pure qsize |
57 | 60 | ||
@@ -63,7 +66,7 @@ new capacity seqstart = do | |||
63 | let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 | 66 | let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 |
64 | q <- newArray (0,cap - 1) Nothing | 67 | q <- newArray (0,cap - 1) Nothing |
65 | seqv <- newTVar seqstart | 68 | seqv <- newTVar seqstart |
66 | bufe <- newTVar 0 | 69 | bufe <- newTVar seqstart |
67 | return PacketQueue | 70 | return PacketQueue |
68 | { pktq = q | 71 | { pktq = q |
69 | , seqno = seqv | 72 | , seqno = seqv |
@@ -72,8 +75,13 @@ new capacity seqstart = do | |||
72 | } | 75 | } |
73 | 76 | ||
74 | -- | Update the packet queue given: | 77 | -- | Update the packet queue given: |
78 | -- | ||
75 | -- * packet queue | 79 | -- * packet queue |
80 | -- | ||
76 | -- * the number of next lossless packet they intend to send you | 81 | -- * the number of next lossless packet they intend to send you |
82 | -- | ||
83 | -- This behaves exactly like 'enqueue' except that no packet data is written to | ||
84 | -- the queue. | ||
77 | observeOutOfBand :: PacketQueue a -> Word32-> STM () | 85 | observeOutOfBand :: PacketQueue a -> Word32-> STM () |
78 | observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do | 86 | observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do |
79 | low <- readTVar seqno | 87 | low <- readTVar seqno |
@@ -85,37 +93,35 @@ observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacke | |||
85 | -- | If seqno < buffend then return expected packet numbers for all | 93 | -- | If seqno < buffend then return expected packet numbers for all |
86 | -- the Nothings in the array between them. | 94 | -- the Nothings in the array between them. |
87 | -- Otherwise, return empty list. | 95 | -- Otherwise, return empty list. |
88 | getMissing :: Show a => PacketQueue a -> STM [Word32] | 96 | getMissing :: PacketQueue a -> STM [Word32] |
89 | getMissing PacketQueue { pktq, seqno, qsize, buffend } = do | 97 | getMissing PacketQueue { pktq, seqno, qsize, buffend } = do |
90 | seqno0 <- readTVar seqno | 98 | seqno0 <- readTVar seqno |
91 | buffend0 <- readTVar buffend | 99 | buffend0 <- readTVar buffend |
92 | -- note relying on fact that [ b .. a ] is null when a < b | 100 | -- note relying on fact that [ b .. a ] is null when a < b |
93 | let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1] | 101 | let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1] |
94 | -- tput XNetCrypto $ "(netCRYPTO getMissing indices: " ++ show indices | ||
95 | maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices | 102 | maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices |
96 | tput XNetCrypto $ "(netCRYPTO) getMissing: (" ++ show seqno0 ++ " " ++ show buffend0 ++") => " ++ show maybes | ||
97 | let nums = map fst . filter (isNothing . snd) $ maybes | 103 | let nums = map fst . filter (isNothing . snd) $ maybes |
98 | return nums | 104 | return nums |
99 | 105 | ||
100 | -- | If seqno < buffend then return expected packet numbers for all | 106 | -- -- | If seqno < buffend then return expected packet numbers for all |
101 | -- the Nothings in the array between them. | 107 | -- -- the Nothings in the array between them. |
102 | -- Otherwise, behave as 'dequeue' would. | 108 | -- -- Otherwise, behave as 'dequeue' would. |
103 | -- TODO: Do we need this function? Delete it if not. | 109 | -- -- TODO: Do we need this function? Delete it if not. |
104 | dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) | 110 | -- dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) |
105 | dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do | 111 | -- dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do |
106 | seqno0 <- readTVar seqno | 112 | -- seqno0 <- readTVar seqno |
107 | buffend0 <- readTVar buffend | 113 | -- buffend0 <- readTVar buffend |
108 | if seqno0 < buffend0 | 114 | -- if seqno0 < buffend0 |
109 | then do | 115 | -- then do |
110 | maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) | 116 | -- maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) |
111 | let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes | 117 | -- let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes |
112 | return (Left nums) | 118 | -- return (Left nums) |
113 | else do | 119 | -- else do |
114 | let i = seqno0 `mod` qsize | 120 | -- let i = seqno0 `mod` qsize |
115 | x <- maybe retry return =<< readArray pktq i | 121 | -- x <- maybe retry return =<< readArray pktq i |
116 | writeArray pktq i Nothing | 122 | -- writeArray pktq i Nothing |
117 | modifyTVar' seqno succ | 123 | -- modifyTVar' seqno succ |
118 | return (Right x) | 124 | -- return (Right x) |
119 | 125 | ||
120 | -- | Retry until the next expected packet is enqueued. Then return it. | 126 | -- | Retry until the next expected packet is enqueued. Then return it. |
121 | dequeue :: PacketQueue a -> STM a | 127 | dequeue :: PacketQueue a -> STM a |
@@ -127,20 +133,46 @@ dequeue PacketQueue { pktq, seqno, qsize } = do | |||
127 | modifyTVar' seqno succ | 133 | modifyTVar' seqno succ |
128 | return x | 134 | return x |
129 | 135 | ||
130 | -- | Like dequeue, but marks as viewed rather than removing | 136 | -- | Drop all packets preceding the given packet number. |
131 | markButNotDequeue :: PacketQueue (Bool,a) -> STM a | 137 | dropPacketsBefore :: PacketQueue a -> Word32 -> STM () |
132 | markButNotDequeue PacketQueue { pktq, seqno, qsize } = do | 138 | dropPacketsBefore PacketQueue{ pktq, seqno, qsize, buffend } no0 = do |
133 | i0 <- readTVar seqno | 139 | low <- readTVar seqno |
134 | let i = i0 `mod` qsize | 140 | let no = no0 - 1 -- possibly negative |
135 | (b,x) <- maybe retry return =<< readArray pktq i | 141 | proj = no - low -- possibly negative |
136 | writeArray pktq i (Just (True,x)) | 142 | if (proj < qsize) |
137 | modifyTVar' seqno succ | 143 | then do |
138 | return x | 144 | -- Clear some, but not all, slots. |
145 | let ilow = low `mod` qsize | ||
146 | i = no `mod` qsize | ||
147 | ranges = if ilow <= i then [[ilow .. i]] | ||
148 | else [[0 .. i],[ilow .. qsize-1]] | ||
149 | writeTVar seqno no | ||
150 | forM_ ranges $ mapM_ $ \i -> writeArray pktq i Nothing | ||
151 | else do | ||
152 | -- Reset to empty queue. | ||
153 | writeTVar seqno no | ||
154 | writeTVar buffend no | ||
155 | (z,n) <- getBounds pktq | ||
156 | forM_ [z .. n] $ \i -> writeArray pktq i Nothing | ||
157 | |||
158 | -- -- | Like dequeue, but marks as viewed rather than removing | ||
159 | -- markButNotDequeue :: PacketQueue (Bool,a) -> STM a | ||
160 | -- markButNotDequeue PacketQueue { pktq, seqno, qsize } = do | ||
161 | -- i0 <- readTVar seqno | ||
162 | -- let i = i0 `mod` qsize | ||
163 | -- (b,x) <- maybe retry return =<< readArray pktq i | ||
164 | -- writeArray pktq i (Just (True,x)) | ||
165 | -- modifyTVar' seqno succ | ||
166 | -- return x | ||
139 | 167 | ||
140 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there | 168 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there |
141 | -- is spare capacity in the queue. If there is not, the packet will be | 169 | -- is spare capacity in the queue. If there is not, the packet will be |
142 | -- silently discarded without blocking. (Unless this is an Overwrite-queue, | 170 | -- silently discarded without blocking. (Unless this is an Overwrite-queue, in |
143 | -- in which case, the packets will simply wrap around overwriting the old ones.) | 171 | -- which case, the packets will simply wrap around overwriting the old ones.) |
172 | -- | ||
173 | -- If the packet was enqueued, (0,i) will be retuned where /i/ is the index at | ||
174 | -- which the new packet was stored in the buffer. If the queue was full, the | ||
175 | -- first of the returned pair will be non-zero. | ||
144 | enqueue :: PacketQueue a -- ^ The packet queue. | 176 | enqueue :: PacketQueue a -- ^ The packet queue. |
145 | -> Word32 -- ^ Sequence number of the packet. | 177 | -> Word32 -- ^ Sequence number of the packet. |
146 | -> a -- ^ The packet. | 178 | -> a -- ^ The packet. |
@@ -155,167 +187,21 @@ enqueue PacketQueue{ pktq, seqno, qsize, buffend} no x = do | |||
155 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | 187 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) |
156 | return (proj `divMod` qsize) | 188 | return (proj `divMod` qsize) |
157 | 189 | ||
158 | -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) | 190 | -- | Obtain the packet with the given sequence number if it is stored in the |
159 | -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo | 191 | -- queue, otherwise /Nothing/ is returned without blocking. |
160 | 192 | lookup :: PacketQueue a -> Word32 -> STM (Maybe a) | |
161 | ----------------------------------------------------- | 193 | lookup PacketQueue{ pktq, seqno, qsize } no = do |
162 | -- * PacketOutQueue | 194 | low <- readTVar seqno |
163 | -- | 195 | let proj = no - low |
164 | 196 | if proj < qsize | |
165 | data PacketOutQueue extra msg toWire fromWire = PacketOutQueue | 197 | then let i = no `mod` qsize |
166 | { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' | 198 | in readArray pktq i |
167 | , pktoOutPQ :: PacketQueue (Word32,toWire) | 199 | else return Nothing |
168 | , pktoPacketNo :: TVar Word32 | 200 | |
169 | , pktoToWireIO :: IO (STM extra) | 201 | -- -- | For each item in the queue, modify or delete it. |
170 | , pktoToWire :: STM extra | 202 | -- mapQ :: (a -> Maybe a) -> PacketQueue a -> STM () |
171 | -> Word32{-packet number we expect to recieve-} | 203 | -- mapQ f PacketQueue{pktq} = do |
172 | -> Word32{- buffer_end -} | 204 | -- (z,n) <- getBounds pktq |
173 | -> Word32{- packet number -} | 205 | -- forM_ [z .. n] $ \i -> do |
174 | -> msg | 206 | -- e <- readArray pktq i |
175 | -> STM (Maybe (toWire,Word32{-next packet no-})) | 207 | -- writeArray pktq i (e>>=f) |
176 | } | ||
177 | |||
178 | mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM () | ||
179 | mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do | ||
180 | (z,n) <- getBounds pktq | ||
181 | let ff i = do | ||
182 | e <- readArray pktq i | ||
183 | writeArray pktq i (e>>=f) | ||
184 | mapM_ ff [z .. n] | ||
185 | |||
186 | newOutGoing :: PacketQueue fromwire | ||
187 | -- ^ Incoming queue | ||
188 | -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-}))) | ||
189 | -- ^ toWire callback | ||
190 | -> IO (STM io) | ||
191 | -- ^ io action to get extra parameter | ||
192 | -> Word32 -- ^ packet number of first outgoing packet | ||
193 | -> Word32 -- ^ Capacity of queue. | ||
194 | -> Word32 -- ^ Initial sequence number. | ||
195 | -> STM (PacketOutQueue io msg wire fromwire) | ||
196 | newOutGoing inq towire toWireIO num capacity seqstart = do | ||
197 | outq <- new capacity seqstart | ||
198 | numVar <- newTVar num | ||
199 | return $ PacketOutQueue | ||
200 | { pktoInPQ = inq | ||
201 | , pktoOutPQ = outq | ||
202 | , pktoPacketNo = numVar | ||
203 | , pktoToWireIO = toWireIO | ||
204 | , pktoToWire = towire | ||
205 | } | ||
206 | |||
207 | data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail | ||
208 | deriving (Show) | ||
209 | |||
210 | instance Eq (OutGoingResult a) where | ||
211 | OGSuccess _ == OGSuccess _ = True | ||
212 | OGFull == OGFull = True | ||
213 | OGEncodeFail == OGEncodeFail = True | ||
214 | _ == _ = False | ||
215 | |||
216 | |||
217 | -- | do something in IO before appending to the queue | ||
218 | readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra) | ||
219 | readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO | ||
220 | |||
221 | getRequested :: Show wire => STM extra -> PacketOutQueue extra msg wire fromwire -> Word32 -> [Word8] -> STM [Maybe (Word32,wire)] | ||
222 | getRequested _ _ _ [] = return [] | ||
223 | getRequested getExtra pktoq snum ns = do | ||
224 | let pnums = toPNums snum ns | ||
225 | indices = map toIndex pnums | ||
226 | tput XNetCrypto $ "getRequested: snum = " ++ show snum | ||
227 | tput XNetCrypto $ "getRequested: pnums = " ++ show pnums ++ " indices = " ++ show indices | ||
228 | xs <- packetQueueViewList (pktoOutPQ pktoq) | ||
229 | tput XNetCrypto $ "getRequested viewList -> " | ||
230 | mapM_ (tput XNetCrypto . show) xs | ||
231 | forM indices $ \i -> readArray (pktq $ pktoOutPQ pktoq) i | ||
232 | where | ||
233 | toIndex :: Word32 -> Word32 | ||
234 | toIndex = (`mod` qsize (pktoOutPQ pktoq)) | ||
235 | |||
236 | toPNums :: Word32 -> [Word8] -> [Word32] | ||
237 | toPNums snum ns = reverse . snd $ foldl doOne (snum-1,[]) ns | ||
238 | where | ||
239 | doOne :: (Word32,[Word32]) -> Word8 -> (Word32,[Word32]) | ||
240 | doOne (addend,as) 0 = (addend+255,as) | ||
241 | doOne (addend,as) x = let y = fromIntegral x + addend | ||
242 | in (y,y:as) | ||
243 | |||
244 | -- | Makes no modifications to the PacketOutQueue state, useful for lossy packets | ||
245 | peekPacket :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (Maybe (wire,Word32)) | ||
246 | peekPacket getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg | ||
247 | = do | ||
248 | be <- readTVar (buffend pktoOutPQ) | ||
249 | let i = be `mod` (qsize pktoOutPQ) | ||
250 | let arrayEmpty :: MArray a e m => a Word32 e -> m Bool | ||
251 | arrayEmpty ar = do (lowB,highB) <- getBounds ar | ||
252 | let result= lowB > highB | ||
253 | tput XNetCrypto | ||
254 | ("arrayEmpty result=" ++ show result | ||
255 | ++ " lowB=" ++ show lowB | ||
256 | ++ " highB = " ++ show highB | ||
257 | ++ " i = " ++ show i) | ||
258 | return result | ||
259 | mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ) | ||
260 | if emp then tput XNetCrypto "(peekPacket empty)" >> return Nothing | ||
261 | else do tput XNetCrypto "(peekPacket nonempty)" | ||
262 | result <- readArray (pktq pktoOutPQ) i | ||
263 | tput XNetCrypto ("readArray (isJust result)==" ++ show (isJust result)) | ||
264 | return result | ||
265 | pktno <- readTVar pktoPacketNo | ||
266 | nextno <- readTVar (seqno pktoInPQ) | ||
267 | pktoToWire getExtra nextno be pktno msg | ||
268 | |||
269 | -- | Convert a message to packet format and append it to the front of a queue | ||
270 | -- used for outgoing messages. (Note that ‘front‛ usually means the higher | ||
271 | -- index in this implementation.) | ||
272 | tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (OutGoingResult wire) | ||
273 | tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg | ||
274 | = do | ||
275 | be <- readTVar (buffend pktoOutPQ) | ||
276 | let i = be `mod` (qsize pktoOutPQ) | ||
277 | let arrayEmpty :: MArray a e m => a Word32 e -> m Bool | ||
278 | arrayEmpty ar = do (lowB,highB) <- getBounds ar | ||
279 | return $ lowB > highB | ||
280 | mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ) | ||
281 | if emp then return Nothing | ||
282 | else do readArray (pktq pktoOutPQ) i | ||
283 | pktno <- readTVar pktoPacketNo | ||
284 | nextno <- readTVar (seqno pktoInPQ) | ||
285 | mbWire <- pktoToWire getExtra nextno be pktno msg | ||
286 | -- TODO all the above lines ^^ can be replaced with call to peekPacket | ||
287 | case mbWire of | ||
288 | Just (pkt,pktno') | ||
289 | -> case mbPkt of | ||
290 | -- slot is free, insert element | ||
291 | Nothing -> do | ||
292 | modifyTVar' (buffend pktoOutPQ) (+1) | ||
293 | writeTVar pktoPacketNo $! pktno' | ||
294 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) | ||
295 | return (OGSuccess pkt) | ||
296 | -- queue is full | ||
297 | Just (n,_) -> do | ||
298 | nn <- getHighestHandledPacketPlus1 q | ||
299 | if (n < nn) | ||
300 | -- but we can overwrite an old packet | ||
301 | then do | ||
302 | modifyTVar' (buffend pktoOutPQ) (+1) | ||
303 | writeTVar pktoPacketNo $! pktno' | ||
304 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) | ||
305 | return (OGSuccess pkt) | ||
306 | -- uh oh this packet is still needed... | ||
307 | else return OGFull | ||
308 | -- don't know how to send this message | ||
309 | Nothing -> return OGEncodeFail | ||
310 | |||
311 | dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire) | ||
312 | dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do | ||
313 | i0 <- readTVar seqno | ||
314 | let i = i0 `mod` qsize | ||
315 | x <- maybe retry return =<< readArray pktq i | ||
316 | -- writeArray pktq i Nothing -- not cleaning | ||
317 | modifyTVar' seqno succ | ||
318 | return x | ||
319 | |||
320 | getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32 | ||
321 | getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ) | ||
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 6f440ea5..db7543f7 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -22,8 +22,7 @@ import qualified Data.ByteString as B | |||
22 | import Data.ByteString (ByteString) | 22 | import Data.ByteString (ByteString) |
23 | import Control.Lens | 23 | import Control.Lens |
24 | import Data.Function | 24 | import Data.Function |
25 | import qualified Data.PacketQueue as PQ | 25 | import Data.PacketBuffer as PB |
26 | ;import Data.PacketQueue (PacketQueue) | ||
27 | import qualified Data.CyclicBuffer as CB | 26 | import qualified Data.CyclicBuffer as CB |
28 | ;import Data.CyclicBuffer (CyclicBuffer) | 27 | ;import Data.CyclicBuffer (CyclicBuffer) |
29 | import Data.Serialize as S | 28 | import Data.Serialize as S |
@@ -354,7 +353,7 @@ data NetCryptoSession = NCrypto | |||
354 | -- the case in group chats | 353 | -- the case in group chats |
355 | , ncView :: TVar SessionView | 354 | , ncView :: TVar SessionView |
356 | -- ^ contains your nick, status etc | 355 | -- ^ contains your nick, status etc |
357 | , ncPacketQueue :: PacketQueue CryptoData | 356 | , ncPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted) |
358 | -- ^ a buffer in which incoming packets may be stored out of order | 357 | -- ^ a buffer in which incoming packets may be stored out of order |
359 | -- but from which they may be extracted in sequence, | 358 | -- but from which they may be extracted in sequence, |
360 | -- helps ensure lossless packets are processed in order | 359 | -- helps ensure lossless packets are processed in order |
@@ -383,13 +382,13 @@ data NetCryptoSession = NCrypto | |||
383 | , ncPingThread :: TVar (Maybe ThreadId) | 382 | , ncPingThread :: TVar (Maybe ThreadId) |
384 | -- ^ thread which actually queues outgoing pings | 383 | -- ^ thread which actually queues outgoing pings |
385 | , ncIdleEventThread :: TVar (Maybe ThreadId) | 384 | , ncIdleEventThread :: TVar (Maybe ThreadId) |
386 | , ncOutgoingQueue :: TVar | 385 | , ncOutgoingQueue :: TVar (UponHandshake NetCryptoOutQueue) |
387 | (UponHandshake | 386 | {- |
388 | (PQ.PacketOutQueue | 387 | (PQ.PacketOutQueue |
389 | (State,Nonce24,U.RangeMap TArray Word8 TVar) | 388 | (State,Nonce24,U.RangeMap TArray Word8 TVar) |
390 | CryptoMessage | 389 | CryptoMessage |
391 | (CryptoPacket Encrypted) | 390 | (CryptoPacket Encrypted) |
392 | CryptoData)) | 391 | CryptoData)) -} |
393 | -- ^ To send a message add it to this queue, by calling 'tryAppendQueueOutgoing' | 392 | -- ^ To send a message add it to this queue, by calling 'tryAppendQueueOutgoing' |
394 | -- but remember to call 'readyOutGoing' first, because the shared secret cache | 393 | -- but remember to call 'readyOutGoing' first, because the shared secret cache |
395 | -- presently requires the IO monad. | 394 | -- presently requires the IO monad. |
@@ -643,7 +642,7 @@ freshCryptoSession sessions | |||
643 | insertArrayAt outHooks 0 (A.array (0,64) (map assignHook [0..64])) | 642 | insertArrayAt outHooks 0 (A.array (0,64) (map assignHook [0..64])) |
644 | return (idmap,lossyEsc,losslessEsc,outHooks) | 643 | return (idmap,lossyEsc,losslessEsc,outHooks) |
645 | ncView0 <- newTVar (sessionView sessions) | 644 | ncView0 <- newTVar (sessionView sessions) |
646 | pktq <- PQ.new (inboundQueueCapacity sessions) 0 | 645 | pktq <- PB.newPacketBuffer |
647 | bufstart <- newTVar 0 | 646 | bufstart <- newTVar 0 |
648 | mbpktoq | 647 | mbpktoq |
649 | <- case mbtheirSessionKey of | 648 | <- case mbtheirSessionKey of |
@@ -696,7 +695,7 @@ freshCryptoSession sessions | |||
696 | , ncOutgoingIdMapEscapedLossy = lossyEscapeIdMap | 695 | , ncOutgoingIdMapEscapedLossy = lossyEscapeIdMap |
697 | , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap | 696 | , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap |
698 | , ncView = ncView0 | 697 | , ncView = ncView0 |
699 | , ncPacketQueue = pktq | 698 | , ncPacketBuffer = pktq |
700 | , ncStoredRequests = ncStoredRequests0 | 699 | , ncStoredRequests = ncStoredRequests0 |
701 | , ncRequestInterval = ncRequestInterval0 | 700 | , ncRequestInterval = ncRequestInterval0 |
702 | , ncAliveInterval = ncAliveInterval0 | 701 | , ncAliveInterval = ncAliveInterval0 |
@@ -720,12 +719,25 @@ freshCryptoSession sessions | |||
720 | HaveHandshake pktoq -> return (runUponHandshake netCryptoSession0 addr pktoq) | 719 | HaveHandshake pktoq -> return (runUponHandshake netCryptoSession0 addr pktoq) |
721 | return (myhandshake,maybeLaunchMissles) | 720 | return (myhandshake,maybeLaunchMissles) |
722 | 721 | ||
722 | {- | ||
723 | type NetCryptoOutQueue = PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar) | 723 | type NetCryptoOutQueue = PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar) |
724 | CryptoMessage | 724 | CryptoMessage |
725 | (CryptoPacket Encrypted) | 725 | (CryptoPacket Encrypted) |
726 | CryptoData | 726 | CryptoData |
727 | -} | ||
728 | data NetCryptoOutQueue = NetCryptoOutQueue | ||
729 | { nqPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted) | ||
730 | , nqToWire :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar) | ||
731 | -> Word32 | ||
732 | -> Word32 | ||
733 | -> Word32 | ||
734 | -> XMessage | ||
735 | -> STM (Maybe (CryptoPacket Encrypted, Word32)) | ||
736 | , nqToWireIO :: IO (STM (State, Nonce24, U.RangeMap TArray Word8 TVar)) | ||
737 | , nqPacketNo :: TVar Word32 | ||
738 | } | ||
727 | 739 | ||
728 | createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketQueue CryptoData | 740 | createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketBuffer CryptoData (CryptoPacket Encrypted) |
729 | -> TVar Nonce24 -> U.RangeMap TArray Word8 TVar -> STM (UponHandshake NetCryptoOutQueue) | 741 | -> TVar Nonce24 -> U.RangeMap TArray Word8 TVar -> STM (UponHandshake NetCryptoOutQueue) |
730 | createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 = do | 742 | createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 = do |
731 | let crypto = transportCrypto sessions | 743 | let crypto = transportCrypto sessions |
@@ -739,8 +751,13 @@ createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce | |||
739 | ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) | 751 | ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) |
740 | ) $ writeTVar ncMyPacketNonce0 n24plus1 | 752 | ) $ writeTVar ncMyPacketNonce0 n24plus1 |
741 | return (return (f n24, n24, ncOutgoingIdMap0)) | 753 | return (return (f n24, n24, ncOutgoingIdMap0)) |
742 | pktoq <- PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 | 754 | seqnoVar <- newTVar 0 |
743 | return (HaveHandshake pktoq) | 755 | return (HaveHandshake NetCryptoOutQueue |
756 | { nqPacketBuffer = pktq | ||
757 | , nqToWire = ncToWire | ||
758 | , nqToWireIO = toWireIO | ||
759 | , nqPacketNo = seqnoVar | ||
760 | }) | ||
744 | 761 | ||
745 | -- | add new session to the lookup maps | 762 | -- | add new session to the lookup maps |
746 | addSessionToMap :: NetCryptoSessions -> SockAddr -> NetCryptoSession -> STM () | 763 | addSessionToMap :: NetCryptoSessions -> SockAddr -> NetCryptoSession -> STM () |
@@ -788,11 +805,39 @@ addSessionToMapIfNotThere sessions addrRaw netCryptoSession = do | |||
788 | -- in case we're using the same long term key on different IPs ... | 805 | -- in case we're using the same long term key on different IPs ... |
789 | modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) | 806 | modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) |
790 | 807 | ||
808 | data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail | ||
809 | deriving (Show) | ||
810 | |||
811 | -- | Convert a message to packet format and append it to the front of a queue | ||
812 | -- used for outgoing messages. (Note that ‘front‛ usually means the higher | ||
813 | -- index in this implementation.) | ||
814 | -- | ||
815 | -- Called from 'runUponHandshake' and 'sendCrypto'. | ||
816 | -- | ||
817 | -- Whenever this is called, you should also send the resulting packet out on | ||
818 | -- the network. | ||
819 | |||
820 | tryAppendQueueOutgoing :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar) | ||
821 | -> NetCryptoOutQueue | ||
822 | -> CryptoMessage | ||
823 | -> STM (OutGoingResult (CryptoPacket Encrypted)) | ||
824 | tryAppendQueueOutgoing getExtra outq msg = do | ||
825 | pktno <- readTVar (nqPacketNo outq) | ||
826 | nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq) | ||
827 | be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq) | ||
828 | mbWire <- nqToWire outq getExtra nextno be pktno msg | ||
829 | case mbWire of | ||
830 | Just (payload,seqno) -> do | ||
831 | PB.grokPacket (nqPacketBuffer outq) (PacketSent seqno payload) | ||
832 | return $ OGSuccess payload | ||
833 | Nothing -> return OGEncodeFail | ||
834 | |||
835 | |||
791 | runUponHandshake :: NetCryptoSession -> SockAddr -> NetCryptoOutQueue -> IO () | 836 | runUponHandshake :: NetCryptoSession -> SockAddr -> NetCryptoOutQueue -> IO () |
792 | runUponHandshake netCryptoSession0 addr pktoq = do | 837 | runUponHandshake netCryptoSession0 addr pktoq = do |
793 | dput XNetCrypto "(((((((runUponHandshake))))))) Launching threads" | 838 | dput XNetCrypto "(((((((runUponHandshake))))))) Launching threads" |
794 | let sessions = ncAllSessions netCryptoSession0 | 839 | let sessions = ncAllSessions netCryptoSession0 |
795 | pktq = ncPacketQueue netCryptoSession0 | 840 | pktq = ncPacketBuffer netCryptoSession0 |
796 | remotePublicKey = ncTheirPublicKey netCryptoSession0 | 841 | remotePublicKey = ncTheirPublicKey netCryptoSession0 |
797 | crypto = transportCrypto sessions | 842 | crypto = transportCrypto sessions |
798 | allsessions = netCryptoSessions sessions | 843 | allsessions = netCryptoSessions sessions |
@@ -805,7 +850,7 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
805 | atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) | 850 | atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) |
806 | labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey) ++ sidStr) | 851 | labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey) ++ sidStr) |
807 | fix $ \loop -> do | 852 | fix $ \loop -> do |
808 | cd <- atomically $ PQ.dequeue pktq | 853 | cd <- atomically $ PB.awaitReadyPacket pktq |
809 | if msgID (bufferData cd) == PacketRequest | 854 | if msgID (bufferData cd) == PacketRequest |
810 | then do | 855 | then do |
811 | dput XNetCrypto $ "Dequeued::PacketRequest seqno=" ++ show (bufferStart cd) ++ " " ++ show (bufferData cd) | 856 | dput XNetCrypto $ "Dequeued::PacketRequest seqno=" ++ show (bufferStart cd) ++ " " ++ show (bufferData cd) |
@@ -824,19 +869,21 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
824 | labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) | 869 | labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) |
825 | fix $ \loop -> do | 870 | fix $ \loop -> do |
826 | atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) | 871 | atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) |
827 | nums <- atomically $ PQ.getMissing pktq | 872 | (nums,seqno) <- atomically $ PB.packetNumbersToRequest pktq |
828 | dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums | 873 | dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums |
829 | getOutGoingParam <- PQ.readyOutGoing pktoq | 874 | getOutGoingParam <- nqToWireIO pktoq |
830 | atomically $ do | 875 | x <- atomically $ do |
831 | seqno <- PQ.getLastDequeuedPlus1 pktq | 876 | ogresult <- tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums) |
832 | ogresult <- PQ.tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums) | ||
833 | case ogresult of | 877 | case ogresult of |
834 | PQ.OGSuccess _ -> return () | 878 | OGSuccess x -> return x |
835 | _ -> retry | 879 | _ -> retry |
880 | sendSessionPacket sessions addr x | ||
836 | loop | 881 | loop |
837 | dput XNetCrypto $ "runUponHandshake: " ++ show threadid ++ " = NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr | 882 | dput XNetCrypto $ "runUponHandshake: " ++ show threadid ++ " = NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr |
838 | 883 | ||
839 | -- launch dequeueOutgoing thread | 884 | -- launch dequeueOutgoing thread |
885 | {- | ||
886 | -- TODO | ||
840 | threadidOutgoing <- forkIO $ do | 887 | threadidOutgoing <- forkIO $ do |
841 | tid <- myThreadId | 888 | tid <- myThreadId |
842 | atomically $ writeTVar (ncDequeueOutGoingThread netCryptoSession0) (Just tid) | 889 | atomically $ writeTVar (ncDequeueOutGoingThread netCryptoSession0) (Just tid) |
@@ -846,7 +893,8 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
846 | dput XNetCrypto "NetCryptoDequeueOutgoing thread... Sending encrypted Packet" | 893 | dput XNetCrypto "NetCryptoDequeueOutgoing thread... Sending encrypted Packet" |
847 | sendSessionPacket sessions addr pkt | 894 | sendSessionPacket sessions addr pkt |
848 | loop | 895 | loop |
849 | dput XNetCrypto $ "runUponHandshake: " ++ show threadidOutgoing ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr | 896 | -} |
897 | dput XNetCrypto $ "runUponHandshake: " ++ show "threadidOutgoing" ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr | ||
850 | 898 | ||
851 | -- launch ping Machine thread | 899 | -- launch ping Machine thread |
852 | pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) | 900 | pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) |
@@ -859,10 +907,10 @@ runUponHandshake netCryptoSession0 addr pktoq = do | |||
859 | labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) | 907 | labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) |
860 | fix $ \loop -> do | 908 | fix $ \loop -> do |
861 | atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) | 909 | atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) |
862 | dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" | 910 | dput XNetCryptoOut $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" |
863 | lr <- sendPing crypto netCryptoSession0 | 911 | lr <- sendPing crypto netCryptoSession0 |
864 | case lr of | 912 | case lr of |
865 | Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s | 913 | Left s -> dput XNetCryptoOut $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s |
866 | Right _ -> return () | 914 | Right _ -> return () |
867 | loop | 915 | loop |
868 | 916 | ||
@@ -972,7 +1020,7 @@ updateCryptoSession sessions addr newsession timestamp hp session handshake = do | |||
972 | sessions | 1020 | sessions |
973 | newsession | 1021 | newsession |
974 | theirSessionPublic | 1022 | theirSessionPublic |
975 | (ncPacketQueue session) | 1023 | (ncPacketBuffer session) |
976 | (ncMyPacketNonce session) | 1024 | (ncMyPacketNonce session) |
977 | (ncOutgoingIdMap session) | 1025 | (ncOutgoingIdMap session) |
978 | writeTVar (ncOutgoingQueue session) mbpktoq | 1026 | writeTVar (ncOutgoingQueue session) mbpktoq |
@@ -1055,7 +1103,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1055 | Nothing -> do | 1103 | Nothing -> do |
1056 | dput XNetCrypto "Dropping packet.. no session" | 1104 | dput XNetCrypto "Dropping packet.. no session" |
1057 | return Nothing -- drop packet, we have no session | 1105 | return Nothing -- drop packet, we have no session |
1058 | Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketQueue, ncHooks, | 1106 | Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketBuffer, ncHooks, |
1059 | ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, | 1107 | ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, |
1060 | ncPingMachine, ncSessionId, ncStoredRequests}) -> do | 1108 | ncPingMachine, ncSessionId, ncStoredRequests}) -> do |
1061 | -- Unrecognized packets, try them thrice so as to give | 1109 | -- Unrecognized packets, try them thrice so as to give |
@@ -1130,9 +1178,10 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1130 | isLossy (GrpMsg KnownLossy _) = True | 1178 | isLossy (GrpMsg KnownLossy _) = True |
1131 | isLossy (Msg mid) | lossyness mid == Lossy = True | 1179 | isLossy (Msg mid) | lossyness mid == Lossy = True |
1132 | isLossy _ = False | 1180 | isLossy _ = False |
1181 | ack = bufferStart -- Earliest sequence number they've seen from us. | ||
1133 | if isLossy msgTypMapped | 1182 | if isLossy msgTypMapped |
1134 | then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm | 1183 | then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm |
1135 | atomically $ PQ.observeOutOfBand ncPacketQueue bufferEnd | 1184 | atomically $ PB.grokPacket ncPacketBuffer (PacketReceivedLossy bufferEnd ack) |
1136 | runCryptoHook session (bufferData cd) | 1185 | runCryptoHook session (bufferData cd) |
1137 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm | 1186 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm |
1138 | when (msgID cm == PING) $ | 1187 | when (msgID cm == PING) $ |
@@ -1143,7 +1192,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1143 | -- num <- CB.getNextSequenceNum ncStoredRequests | 1192 | -- num <- CB.getNextSequenceNum ncStoredRequests |
1144 | -- CB.enqueue ncStoredRequests num cd | 1193 | -- CB.enqueue ncStoredRequests num cd |
1145 | handlePacketRequest session cd | 1194 | handlePacketRequest session cd |
1146 | atomically $ PQ.enqueue ncPacketQueue bufferEnd cd | 1195 | atomically $ PB.grokPacket ncPacketBuffer (PacketReceived bufferEnd cd ack) |
1147 | return Nothing | 1196 | return Nothing |
1148 | where | 1197 | where |
1149 | last2Bytes :: Nonce24 -> Word16 | 1198 | last2Bytes :: Nonce24 -> Word16 |
@@ -1262,14 +1311,18 @@ sendCrypto crypto session updateLocal cm = do | |||
1262 | HaveHandshake outq <- atomically $ readTVar (ncOutgoingQueue session) | 1311 | HaveHandshake outq <- atomically $ readTVar (ncOutgoingQueue session) |
1263 | -- XXX: potential race? if shared secret comes out of sync with cache? | 1312 | -- XXX: potential race? if shared secret comes out of sync with cache? |
1264 | dput XNetCrypto "sendCrypto: enter " | 1313 | dput XNetCrypto "sendCrypto: enter " |
1265 | getOutGoingParam <- PQ.readyOutGoing outq | 1314 | getOutGoingParam <- nqToWireIO outq |
1266 | dput XNetCrypto "sendCrypto: got the io extra stuff" | 1315 | dput XNetCrypto "sendCrypto: got the io extra stuff" |
1267 | atomically $ do | 1316 | r <- atomically $ do |
1268 | result <- PQ.tryAppendQueueOutgoing getOutGoingParam outq cm | 1317 | result <- tryAppendQueueOutgoing getOutGoingParam outq cm |
1269 | case result of | 1318 | case result of |
1270 | PQ.OGSuccess x -> updateLocal >> return (Right x) | 1319 | OGSuccess x -> updateLocal >> return (Right x) |
1271 | PQ.OGFull -> return (Left "Outgoing packet buffer is full") | 1320 | OGFull -> return (Left "Outgoing packet buffer is full") |
1272 | PQ.OGEncodeFail -> return (Left "Failed to encode outgoing packet") | 1321 | OGEncodeFail -> return (Left "Failed to encode outgoing packet") |
1322 | case ncSockAddr session of | ||
1323 | HaveDHTKey saddr -> mapM_ (sendSessionPacket (ncAllSessions session) saddr) r | ||
1324 | _ -> return () | ||
1325 | return r | ||
1273 | 1326 | ||
1274 | sendPing :: TransportCrypto -> NetCryptoSession -> IO (Either String (CryptoPacket Encrypted)) | 1327 | sendPing :: TransportCrypto -> NetCryptoSession -> IO (Either String (CryptoPacket Encrypted)) |
1275 | sendPing crypto session = do | 1328 | sendPing crypto session = do |
@@ -1327,8 +1380,16 @@ sendCryptoLossy crypto session updateLocal cm = do | |||
1327 | updateLocal | 1380 | updateLocal |
1328 | return (Left errmsg) | 1381 | return (Left errmsg) |
1329 | HaveHandshake outq -> do | 1382 | HaveHandshake outq -> do |
1330 | getOutGoingParam <- PQ.readyOutGoing outq | 1383 | getOutGoingParam <- nqToWireIO outq |
1331 | mbPkt <- atomically $ PQ.peekPacket getOutGoingParam outq cm | 1384 | mbPkt <- atomically $ do |
1385 | pktno <- readTVar (nqPacketNo outq) | ||
1386 | nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq) | ||
1387 | be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq) | ||
1388 | nqToWire outq getOutGoingParam -- See 'ncToWire' | ||
1389 | nextno -- packet number we expect to recieve | ||
1390 | be -- buffer_end (for lossy) | ||
1391 | pktno -- packet number (for lossless) | ||
1392 | cm | ||
1332 | case mbPkt of | 1393 | case mbPkt of |
1333 | Nothing -> do | 1394 | Nothing -> do |
1334 | let errmsg = "Error sending lossy packet! (sessionid: " ++ show (ncSessionId session) ++ ") " ++ show cm | 1395 | let errmsg = "Error sending lossy packet! (sessionid: " ++ show (ncSessionId session) ++ ") " ++ show cm |
@@ -1466,11 +1527,10 @@ handlePacketRequest session (CryptoData { bufferStart=num | |||
1466 | mbOutQ <- atomically $ readTVar (ncOutgoingQueue session) | 1527 | mbOutQ <- atomically $ readTVar (ncOutgoingQueue session) |
1467 | case mbOutQ of | 1528 | case mbOutQ of |
1468 | HaveHandshake pktoq -> do | 1529 | HaveHandshake pktoq -> do |
1469 | getOutGoingParam <-PQ.readyOutGoing pktoq | 1530 | getOutGoingParam <-nqToWireIO pktoq |
1470 | ps <- atomically $ PQ.getRequested getOutGoingParam pktoq num bs | 1531 | ps <- atomically $ PB.retrieveForResend (nqPacketBuffer pktoq) $ PB.decompressSequenceNumbers num bs |
1471 | let resend (Just (n,pkt)) = sendSessionPacket (ncAllSessions session) addr pkt | 1532 | let resend (n,pkt) = sendSessionPacket (ncAllSessions session) addr pkt |
1472 | resend _ = return () | 1533 | dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst ps)) |
1473 | dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst $ catMaybes ps)) | ||
1474 | mapM_ resend ps | 1534 | mapM_ resend ps |
1475 | _ -> return () | 1535 | _ -> return () |
1476 | 1536 | ||
diff --git a/src/Network/Tox/Crypto/Transport.hs b/src/Network/Tox/Crypto/Transport.hs index a453a13c..8bb41254 100644 --- a/src/Network/Tox/Crypto/Transport.hs +++ b/src/Network/Tox/Crypto/Transport.hs | |||
@@ -82,12 +82,13 @@ import Data.Text.Encoding as T | |||
82 | import Data.Serialize as S | 82 | import Data.Serialize as S |
83 | import Control.Arrow | 83 | import Control.Arrow |
84 | import DPut | 84 | import DPut |
85 | import Data.PacketQueue (toPNums) | 85 | import Data.PacketBuffer as PB |
86 | import Data.Function | 86 | import Data.Function |
87 | 87 | ||
88 | showCryptoMsg :: Word32 -> CryptoMessage -> [Char] | 88 | showCryptoMsg :: Word32 -> CryptoMessage -> [Char] |
89 | showCryptoMsg seqno (UpToN PacketRequest bytes) = "UpToN PacketRequest --> " ++ show (toPNums seqno $ B.unpack bytes) | 89 | showCryptoMsg seqno (UpToN PacketRequest bytes) = "UpToN PacketRequest --> " |
90 | showCryptoMsg _ msg = show msg | 90 | ++ show (PB.decompressSequenceNumbers seqno $ B.unpack bytes) |
91 | showCryptoMsg _ msg = show msg | ||
91 | 92 | ||
92 | parseCrypto :: (ByteString, SockAddr) -> Either (CryptoPacket Encrypted, SockAddr) (ByteString, SockAddr) | 93 | parseCrypto :: (ByteString, SockAddr) -> Either (CryptoPacket Encrypted, SockAddr) (ByteString, SockAddr) |
93 | parseCrypto ((B.uncons -> Just (0x1b,pkt)),saddr) = either (\_ -> Right (pkt,saddr)) | 94 | parseCrypto ((B.uncons -> Just (0x1b,pkt)),saddr) = either (\_ -> Right (pkt,saddr)) |