summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-08-17 05:05:17 -0400
committerJoe Crayne <joe@jerkface.net>2018-08-17 16:06:38 -0400
commitfde5005a2d1ef3a0636cff21547d4cda22b7b215 (patch)
tree1263b8d66cbcc838432afd6cc5cb122d9c4c064b /src
parentf4dd948176187f5fb46a2cf0dbfbfc4c32badfa5 (diff)
Simplified PacketQueue/PacketBuffer interface.
Diffstat (limited to 'src')
-rw-r--r--src/DPut.hs1
-rw-r--r--src/Data/PacketBuffer.hs102
-rw-r--r--src/Data/PacketQueue.hs312
-rw-r--r--src/Network/Tox/Crypto/Handlers.hs138
-rw-r--r--src/Network/Tox/Crypto/Transport.hs7
5 files changed, 305 insertions, 255 deletions
diff --git a/src/DPut.hs b/src/DPut.hs
index e02f8ce6..f3760645 100644
--- a/src/DPut.hs
+++ b/src/DPut.hs
@@ -19,6 +19,7 @@ data DebugTag
19 | XLan 19 | XLan
20 | XMan 20 | XMan
21 | XNetCrypto 21 | XNetCrypto
22 | XNetCryptoOut
22 | XOnion 23 | XOnion
23 | XRoutes 24 | XRoutes
24 | XPing 25 | XPing
diff --git a/src/Data/PacketBuffer.hs b/src/Data/PacketBuffer.hs
new file mode 100644
index 00000000..c5ede50d
--- /dev/null
+++ b/src/Data/PacketBuffer.hs
@@ -0,0 +1,102 @@
1{-# LANGUAGE TupleSections #-}
2module Data.PacketBuffer
3 ( PacketBuffer
4 , newPacketBuffer
5 , PacketEvent(..)
6 , grokPacket
7 , awaitReadyPacket
8 , packetNumbersToRequest
9 , expectingSequenceNumber
10 , nextToSendSequenceNumber
11 , retrieveForResend
12 , decompressSequenceNumbers
13 ) where
14
15import Data.PacketQueue as Q
16
17import Control.Concurrent.STM
18import Control.Monad
19import Data.Maybe
20import Data.Word
21
22data PacketBuffer a b = PacketBuffer
23 { inQueue :: PacketQueue a
24 , outBuffer :: PacketQueue b
25 }
26
27-- | Initialize the packet buffers. Note, the capacity of the inbound packet
28-- queue is currently hardcoded to 200 packets and the capacity of the outbound
29-- buffer is hardcoded to 400 packets.
30newPacketBuffer :: STM (PacketBuffer a b)
31newPacketBuffer = PacketBuffer <$> Q.new 200 0
32 <*> Q.new 400 0
33
34-- | Input for 'grokPacket'.
35data PacketEvent a b
36 = PacketSent { peSeqNum :: Word32 -- ^ Sequence number for payload.
37 , peSentPayload :: b -- ^ Payload packet we sent to them.
38 }
39 | PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload.
40 , peReceivedPayload :: a -- ^ Payload packet they sent to us.
41 , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
42 }
43 | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet.
44 , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
45 }
46
47-- | Whenever a packet is received or sent (but not resent) from the network,
48-- this function should be called to update the relevant buffers.
49--
50-- On outgoing packets, if the outbound buffer is full, this will block
51-- indefinitely until it is called in another thread with an inbound
52-- acknowledgement.
53grokPacket :: PacketBuffer a b -> PacketEvent a b -> STM ()
54grokPacket (PacketBuffer _ outb) (PacketSent seqno a)
55 = do (n,_) <- Q.enqueue outb seqno a
56 when (n/=0) retry
57grokPacket (PacketBuffer inb outb) (PacketReceived seqno a ack)
58 = do Q.enqueue inb seqno a
59 Q.dropPacketsBefore outb ack
60grokPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack)
61 = do Q.observeOutOfBand inb seqno
62 Q.dropPacketsBefore outb ack
63
64-- | Wait until an inbound packet is ready to process. Any necessary
65-- re-ordering will have been done.
66awaitReadyPacket :: PacketBuffer a b -> STM a
67awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb
68
69-- | Obtain a list of sequence numbers that may have been dropped. This would
70-- be any number not yet received that is prior to the maxium sequence number
71-- we have received. For convenience, a lowerbound for the missing squence numbers
72-- is also returned as the second item of the pair.
73packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32)
74packetNumbersToRequest (PacketBuffer inb _) = do
75 ns <- Q.getMissing inb
76 lb <- Q.getLastDequeuedPlus1 inb
77 return (ns,lb)
78
79expectingSequenceNumber :: PacketBuffer a b -> STM Word32
80expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb
81
82nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32
83nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb
84
85-- | Retrieve already-sent packets by their sequence numbers. See
86-- 'decompressSequenceNumbers' to obtain the sequence number list from a
87-- compressed encoding. There is no need to call 'grokPacket' when sending the
88-- packets returned from this call.
89retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)]
90retrieveForResend (PacketBuffer _ outb) seqnos =
91 catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no)
92
93-- | Expand a compressed set of sequence numbers. The first squence number is
94-- given directly and the rest are computed using 8-bit offsets. This is
95-- normally used to obtain input for 'retrieveForResend'.
96decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32]
97decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1)
98 where
99 doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32]
100 doOne 0 f addend = f (addend+255)
101 doOne x f addend = let y = fromIntegral x + addend
102 in y : f y
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index 6f997ac0..c5d5ac09 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -3,30 +3,22 @@
3-- be stored out of order, but from which they are extracted in the proper 3-- be stored out of order, but from which they are extracted in the proper
4-- sequence. 4-- sequence.
5{-# LANGUAGE NamedFieldPuns #-} 5{-# LANGUAGE NamedFieldPuns #-}
6{-# LANGUAGE FlexibleContexts #-}
7module Data.PacketQueue 6module Data.PacketQueue
8 ( PacketQueue 7 ( PacketQueue
9 , getCapacity 8 , getCapacity
10 , getLastDequeuedPlus1 9 , getLastDequeuedPlus1
10 , getLastEnqueuedPlus1
11 , new 11 , new
12 , dequeue 12 , dequeue
13 , dropPacketsBefore
13 , getMissing 14 , getMissing
14 , dequeueOrGetMissing 15 -- , dequeueOrGetMissing
15 , markButNotDequeue 16 -- , markButNotDequeue
16 , enqueue 17 , enqueue
17 , observeOutOfBand 18 , observeOutOfBand
18 , PacketOutQueue
19 , packetQueueViewList 19 , packetQueueViewList
20 , newOutGoing 20 -- , mapQ
21 , readyOutGoing 21 , Data.PacketQueue.lookup
22 , toPNums
23 , getRequested
24 , peekPacket
25 , tryAppendQueueOutgoing
26 , dequeueOutgoing
27 , getHighestHandledPacketPlus1
28 , mapOutGoing
29 , OutGoingResult(..)
30 ) where 22 ) where
31 23
32import Control.Concurrent.STM 24import Control.Concurrent.STM
@@ -34,24 +26,35 @@ import Control.Monad
34import Data.Word 26import Data.Word
35import Data.Array.MArray 27import Data.Array.MArray
36import Data.Maybe 28import Data.Maybe
37import DPut
38 29
39data PacketQueue a = PacketQueue 30data PacketQueue a = PacketQueue
40 { pktq :: TArray Word32 (Maybe a) 31 { pktq :: TArray Word32 (Maybe a)
41 , seqno :: TVar Word32 -- (buffer_start) 32 , seqno :: TVar Word32 -- (buffer_start)
42 , qsize :: Word32 33 , qsize :: Word32
43 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 34 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1
35 -- i.e. one more than the largest seen sequence number.
44 } 36 }
45 37
38-- | Obtain a list of non-empty slots in the 'PacketQueue'. The numeric value
39-- is an index into the underlying array, not a sequence number.
46packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] 40packetQueueViewList :: PacketQueue a -> STM [(Word32,a)]
47packetQueueViewList p = do 41packetQueueViewList p = do
48 let f (n,Nothing) = Nothing 42 let f (n,Nothing) = Nothing
49 f (n,Just x) = Just (n,x) 43 f (n,Just x) = Just (n,x)
50 catMaybes . map f <$> getAssocs (pktq p) 44 catMaybes . map f <$> getAssocs (pktq p)
51 45
46-- | This returns the earliest sequence number with a slot in the queue.
52getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 47getLastDequeuedPlus1 :: PacketQueue a -> STM Word32
53getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno 48getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno
54 49
50-- | This returns the least upper bound of sequence numbers that have been
51-- enqueued.
52getLastEnqueuedPlus1 :: PacketQueue a -> STM Word32
53getLastEnqueuedPlus1 PacketQueue {buffend} = readTVar buffend
54
55
56-- | This is the number of consequetive sequence numbers, starting at
57-- 'getLastDequeuedPlus1' that can be stored in the queue
55getCapacity :: Applicative m => PacketQueue t -> m Word32 58getCapacity :: Applicative m => PacketQueue t -> m Word32
56getCapacity (PacketQueue { qsize }) = pure qsize 59getCapacity (PacketQueue { qsize }) = pure qsize
57 60
@@ -63,7 +66,7 @@ new capacity seqstart = do
63 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 66 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
64 q <- newArray (0,cap - 1) Nothing 67 q <- newArray (0,cap - 1) Nothing
65 seqv <- newTVar seqstart 68 seqv <- newTVar seqstart
66 bufe <- newTVar 0 69 bufe <- newTVar seqstart
67 return PacketQueue 70 return PacketQueue
68 { pktq = q 71 { pktq = q
69 , seqno = seqv 72 , seqno = seqv
@@ -72,8 +75,13 @@ new capacity seqstart = do
72 } 75 }
73 76
74-- | Update the packet queue given: 77-- | Update the packet queue given:
78--
75-- * packet queue 79-- * packet queue
80--
76-- * the number of next lossless packet they intend to send you 81-- * the number of next lossless packet they intend to send you
82--
83-- This behaves exactly like 'enqueue' except that no packet data is written to
84-- the queue.
77observeOutOfBand :: PacketQueue a -> Word32-> STM () 85observeOutOfBand :: PacketQueue a -> Word32-> STM ()
78observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do 86observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do
79 low <- readTVar seqno 87 low <- readTVar seqno
@@ -85,37 +93,35 @@ observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacke
85-- | If seqno < buffend then return expected packet numbers for all 93-- | If seqno < buffend then return expected packet numbers for all
86-- the Nothings in the array between them. 94-- the Nothings in the array between them.
87-- Otherwise, return empty list. 95-- Otherwise, return empty list.
88getMissing :: Show a => PacketQueue a -> STM [Word32] 96getMissing :: PacketQueue a -> STM [Word32]
89getMissing PacketQueue { pktq, seqno, qsize, buffend } = do 97getMissing PacketQueue { pktq, seqno, qsize, buffend } = do
90 seqno0 <- readTVar seqno 98 seqno0 <- readTVar seqno
91 buffend0 <- readTVar buffend 99 buffend0 <- readTVar buffend
92 -- note relying on fact that [ b .. a ] is null when a < b 100 -- note relying on fact that [ b .. a ] is null when a < b
93 let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1] 101 let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1]
94 -- tput XNetCrypto $ "(netCRYPTO getMissing indices: " ++ show indices
95 maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices 102 maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices
96 tput XNetCrypto $ "(netCRYPTO) getMissing: (" ++ show seqno0 ++ " " ++ show buffend0 ++") => " ++ show maybes
97 let nums = map fst . filter (isNothing . snd) $ maybes 103 let nums = map fst . filter (isNothing . snd) $ maybes
98 return nums 104 return nums
99 105
100-- | If seqno < buffend then return expected packet numbers for all 106-- -- | If seqno < buffend then return expected packet numbers for all
101-- the Nothings in the array between them. 107-- -- the Nothings in the array between them.
102-- Otherwise, behave as 'dequeue' would. 108-- -- Otherwise, behave as 'dequeue' would.
103-- TODO: Do we need this function? Delete it if not. 109-- -- TODO: Do we need this function? Delete it if not.
104dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) 110-- dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a)
105dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do 111-- dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do
106 seqno0 <- readTVar seqno 112-- seqno0 <- readTVar seqno
107 buffend0 <- readTVar buffend 113-- buffend0 <- readTVar buffend
108 if seqno0 < buffend0 114-- if seqno0 < buffend0
109 then do 115-- then do
110 maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) 116-- maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ])
111 let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes 117-- let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes
112 return (Left nums) 118-- return (Left nums)
113 else do 119-- else do
114 let i = seqno0 `mod` qsize 120-- let i = seqno0 `mod` qsize
115 x <- maybe retry return =<< readArray pktq i 121-- x <- maybe retry return =<< readArray pktq i
116 writeArray pktq i Nothing 122-- writeArray pktq i Nothing
117 modifyTVar' seqno succ 123-- modifyTVar' seqno succ
118 return (Right x) 124-- return (Right x)
119 125
120-- | Retry until the next expected packet is enqueued. Then return it. 126-- | Retry until the next expected packet is enqueued. Then return it.
121dequeue :: PacketQueue a -> STM a 127dequeue :: PacketQueue a -> STM a
@@ -127,20 +133,46 @@ dequeue PacketQueue { pktq, seqno, qsize } = do
127 modifyTVar' seqno succ 133 modifyTVar' seqno succ
128 return x 134 return x
129 135
130-- | Like dequeue, but marks as viewed rather than removing 136-- | Drop all packets preceding the given packet number.
131markButNotDequeue :: PacketQueue (Bool,a) -> STM a 137dropPacketsBefore :: PacketQueue a -> Word32 -> STM ()
132markButNotDequeue PacketQueue { pktq, seqno, qsize } = do 138dropPacketsBefore PacketQueue{ pktq, seqno, qsize, buffend } no0 = do
133 i0 <- readTVar seqno 139 low <- readTVar seqno
134 let i = i0 `mod` qsize 140 let no = no0 - 1 -- possibly negative
135 (b,x) <- maybe retry return =<< readArray pktq i 141 proj = no - low -- possibly negative
136 writeArray pktq i (Just (True,x)) 142 if (proj < qsize)
137 modifyTVar' seqno succ 143 then do
138 return x 144 -- Clear some, but not all, slots.
145 let ilow = low `mod` qsize
146 i = no `mod` qsize
147 ranges = if ilow <= i then [[ilow .. i]]
148 else [[0 .. i],[ilow .. qsize-1]]
149 writeTVar seqno no
150 forM_ ranges $ mapM_ $ \i -> writeArray pktq i Nothing
151 else do
152 -- Reset to empty queue.
153 writeTVar seqno no
154 writeTVar buffend no
155 (z,n) <- getBounds pktq
156 forM_ [z .. n] $ \i -> writeArray pktq i Nothing
157
158-- -- | Like dequeue, but marks as viewed rather than removing
159-- markButNotDequeue :: PacketQueue (Bool,a) -> STM a
160-- markButNotDequeue PacketQueue { pktq, seqno, qsize } = do
161-- i0 <- readTVar seqno
162-- let i = i0 `mod` qsize
163-- (b,x) <- maybe retry return =<< readArray pktq i
164-- writeArray pktq i (Just (True,x))
165-- modifyTVar' seqno succ
166-- return x
139 167
140-- | Enqueue a packet. Packets need not be enqueued in order as long as there 168-- | Enqueue a packet. Packets need not be enqueued in order as long as there
141-- is spare capacity in the queue. If there is not, the packet will be 169-- is spare capacity in the queue. If there is not, the packet will be
142-- silently discarded without blocking. (Unless this is an Overwrite-queue, 170-- silently discarded without blocking. (Unless this is an Overwrite-queue, in
143-- in which case, the packets will simply wrap around overwriting the old ones.) 171-- which case, the packets will simply wrap around overwriting the old ones.)
172--
173-- If the packet was enqueued, (0,i) will be retuned where /i/ is the index at
174-- which the new packet was stored in the buffer. If the queue was full, the
175-- first of the returned pair will be non-zero.
144enqueue :: PacketQueue a -- ^ The packet queue. 176enqueue :: PacketQueue a -- ^ The packet queue.
145 -> Word32 -- ^ Sequence number of the packet. 177 -> Word32 -- ^ Sequence number of the packet.
146 -> a -- ^ The packet. 178 -> a -- ^ The packet.
@@ -155,167 +187,21 @@ enqueue PacketQueue{ pktq, seqno, qsize, buffend} no x = do
155 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) 187 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
156 return (proj `divMod` qsize) 188 return (proj `divMod` qsize)
157 189
158-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) 190-- | Obtain the packet with the given sequence number if it is stored in the
159-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo 191-- queue, otherwise /Nothing/ is returned without blocking.
160 192lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
161----------------------------------------------------- 193lookup PacketQueue{ pktq, seqno, qsize } no = do
162-- * PacketOutQueue 194 low <- readTVar seqno
163-- 195 let proj = no - low
164 196 if proj < qsize
165data PacketOutQueue extra msg toWire fromWire = PacketOutQueue 197 then let i = no `mod` qsize
166 { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' 198 in readArray pktq i
167 , pktoOutPQ :: PacketQueue (Word32,toWire) 199 else return Nothing
168 , pktoPacketNo :: TVar Word32 200
169 , pktoToWireIO :: IO (STM extra) 201-- -- | For each item in the queue, modify or delete it.
170 , pktoToWire :: STM extra 202-- mapQ :: (a -> Maybe a) -> PacketQueue a -> STM ()
171 -> Word32{-packet number we expect to recieve-} 203-- mapQ f PacketQueue{pktq} = do
172 -> Word32{- buffer_end -} 204-- (z,n) <- getBounds pktq
173 -> Word32{- packet number -} 205-- forM_ [z .. n] $ \i -> do
174 -> msg 206-- e <- readArray pktq i
175 -> STM (Maybe (toWire,Word32{-next packet no-})) 207-- writeArray pktq i (e>>=f)
176 }
177
178mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM ()
179mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do
180 (z,n) <- getBounds pktq
181 let ff i = do
182 e <- readArray pktq i
183 writeArray pktq i (e>>=f)
184 mapM_ ff [z .. n]
185
186newOutGoing :: PacketQueue fromwire
187 -- ^ Incoming queue
188 -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-})))
189 -- ^ toWire callback
190 -> IO (STM io)
191 -- ^ io action to get extra parameter
192 -> Word32 -- ^ packet number of first outgoing packet
193 -> Word32 -- ^ Capacity of queue.
194 -> Word32 -- ^ Initial sequence number.
195 -> STM (PacketOutQueue io msg wire fromwire)
196newOutGoing inq towire toWireIO num capacity seqstart = do
197 outq <- new capacity seqstart
198 numVar <- newTVar num
199 return $ PacketOutQueue
200 { pktoInPQ = inq
201 , pktoOutPQ = outq
202 , pktoPacketNo = numVar
203 , pktoToWireIO = toWireIO
204 , pktoToWire = towire
205 }
206
207data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail
208 deriving (Show)
209
210instance Eq (OutGoingResult a) where
211 OGSuccess _ == OGSuccess _ = True
212 OGFull == OGFull = True
213 OGEncodeFail == OGEncodeFail = True
214 _ == _ = False
215
216
217-- | do something in IO before appending to the queue
218readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra)
219readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO
220
221getRequested :: Show wire => STM extra -> PacketOutQueue extra msg wire fromwire -> Word32 -> [Word8] -> STM [Maybe (Word32,wire)]
222getRequested _ _ _ [] = return []
223getRequested getExtra pktoq snum ns = do
224 let pnums = toPNums snum ns
225 indices = map toIndex pnums
226 tput XNetCrypto $ "getRequested: snum = " ++ show snum
227 tput XNetCrypto $ "getRequested: pnums = " ++ show pnums ++ " indices = " ++ show indices
228 xs <- packetQueueViewList (pktoOutPQ pktoq)
229 tput XNetCrypto $ "getRequested viewList -> "
230 mapM_ (tput XNetCrypto . show) xs
231 forM indices $ \i -> readArray (pktq $ pktoOutPQ pktoq) i
232 where
233 toIndex :: Word32 -> Word32
234 toIndex = (`mod` qsize (pktoOutPQ pktoq))
235
236toPNums :: Word32 -> [Word8] -> [Word32]
237toPNums snum ns = reverse . snd $ foldl doOne (snum-1,[]) ns
238 where
239 doOne :: (Word32,[Word32]) -> Word8 -> (Word32,[Word32])
240 doOne (addend,as) 0 = (addend+255,as)
241 doOne (addend,as) x = let y = fromIntegral x + addend
242 in (y,y:as)
243
244-- | Makes no modifications to the PacketOutQueue state, useful for lossy packets
245peekPacket :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (Maybe (wire,Word32))
246peekPacket getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg
247 = do
248 be <- readTVar (buffend pktoOutPQ)
249 let i = be `mod` (qsize pktoOutPQ)
250 let arrayEmpty :: MArray a e m => a Word32 e -> m Bool
251 arrayEmpty ar = do (lowB,highB) <- getBounds ar
252 let result= lowB > highB
253 tput XNetCrypto
254 ("arrayEmpty result=" ++ show result
255 ++ " lowB=" ++ show lowB
256 ++ " highB = " ++ show highB
257 ++ " i = " ++ show i)
258 return result
259 mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ)
260 if emp then tput XNetCrypto "(peekPacket empty)" >> return Nothing
261 else do tput XNetCrypto "(peekPacket nonempty)"
262 result <- readArray (pktq pktoOutPQ) i
263 tput XNetCrypto ("readArray (isJust result)==" ++ show (isJust result))
264 return result
265 pktno <- readTVar pktoPacketNo
266 nextno <- readTVar (seqno pktoInPQ)
267 pktoToWire getExtra nextno be pktno msg
268
269-- | Convert a message to packet format and append it to the front of a queue
270-- used for outgoing messages. (Note that ‘front‛ usually means the higher
271-- index in this implementation.)
272tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (OutGoingResult wire)
273tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg
274 = do
275 be <- readTVar (buffend pktoOutPQ)
276 let i = be `mod` (qsize pktoOutPQ)
277 let arrayEmpty :: MArray a e m => a Word32 e -> m Bool
278 arrayEmpty ar = do (lowB,highB) <- getBounds ar
279 return $ lowB > highB
280 mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ)
281 if emp then return Nothing
282 else do readArray (pktq pktoOutPQ) i
283 pktno <- readTVar pktoPacketNo
284 nextno <- readTVar (seqno pktoInPQ)
285 mbWire <- pktoToWire getExtra nextno be pktno msg
286 -- TODO all the above lines ^^ can be replaced with call to peekPacket
287 case mbWire of
288 Just (pkt,pktno')
289 -> case mbPkt of
290 -- slot is free, insert element
291 Nothing -> do
292 modifyTVar' (buffend pktoOutPQ) (+1)
293 writeTVar pktoPacketNo $! pktno'
294 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
295 return (OGSuccess pkt)
296 -- queue is full
297 Just (n,_) -> do
298 nn <- getHighestHandledPacketPlus1 q
299 if (n < nn)
300 -- but we can overwrite an old packet
301 then do
302 modifyTVar' (buffend pktoOutPQ) (+1)
303 writeTVar pktoPacketNo $! pktno'
304 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
305 return (OGSuccess pkt)
306 -- uh oh this packet is still needed...
307 else return OGFull
308 -- don't know how to send this message
309 Nothing -> return OGEncodeFail
310
311dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire)
312dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do
313 i0 <- readTVar seqno
314 let i = i0 `mod` qsize
315 x <- maybe retry return =<< readArray pktq i
316 -- writeArray pktq i Nothing -- not cleaning
317 modifyTVar' seqno succ
318 return x
319
320getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32
321getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ)
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs
index 6f440ea5..db7543f7 100644
--- a/src/Network/Tox/Crypto/Handlers.hs
+++ b/src/Network/Tox/Crypto/Handlers.hs
@@ -22,8 +22,7 @@ import qualified Data.ByteString as B
22import Data.ByteString (ByteString) 22import Data.ByteString (ByteString)
23import Control.Lens 23import Control.Lens
24import Data.Function 24import Data.Function
25import qualified Data.PacketQueue as PQ 25import Data.PacketBuffer as PB
26 ;import Data.PacketQueue (PacketQueue)
27import qualified Data.CyclicBuffer as CB 26import qualified Data.CyclicBuffer as CB
28 ;import Data.CyclicBuffer (CyclicBuffer) 27 ;import Data.CyclicBuffer (CyclicBuffer)
29import Data.Serialize as S 28import Data.Serialize as S
@@ -354,7 +353,7 @@ data NetCryptoSession = NCrypto
354 -- the case in group chats 353 -- the case in group chats
355 , ncView :: TVar SessionView 354 , ncView :: TVar SessionView
356 -- ^ contains your nick, status etc 355 -- ^ contains your nick, status etc
357 , ncPacketQueue :: PacketQueue CryptoData 356 , ncPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted)
358 -- ^ a buffer in which incoming packets may be stored out of order 357 -- ^ a buffer in which incoming packets may be stored out of order
359 -- but from which they may be extracted in sequence, 358 -- but from which they may be extracted in sequence,
360 -- helps ensure lossless packets are processed in order 359 -- helps ensure lossless packets are processed in order
@@ -383,13 +382,13 @@ data NetCryptoSession = NCrypto
383 , ncPingThread :: TVar (Maybe ThreadId) 382 , ncPingThread :: TVar (Maybe ThreadId)
384 -- ^ thread which actually queues outgoing pings 383 -- ^ thread which actually queues outgoing pings
385 , ncIdleEventThread :: TVar (Maybe ThreadId) 384 , ncIdleEventThread :: TVar (Maybe ThreadId)
386 , ncOutgoingQueue :: TVar 385 , ncOutgoingQueue :: TVar (UponHandshake NetCryptoOutQueue)
387 (UponHandshake 386 {-
388 (PQ.PacketOutQueue 387 (PQ.PacketOutQueue
389 (State,Nonce24,U.RangeMap TArray Word8 TVar) 388 (State,Nonce24,U.RangeMap TArray Word8 TVar)
390 CryptoMessage 389 CryptoMessage
391 (CryptoPacket Encrypted) 390 (CryptoPacket Encrypted)
392 CryptoData)) 391 CryptoData)) -}
393 -- ^ To send a message add it to this queue, by calling 'tryAppendQueueOutgoing' 392 -- ^ To send a message add it to this queue, by calling 'tryAppendQueueOutgoing'
394 -- but remember to call 'readyOutGoing' first, because the shared secret cache 393 -- but remember to call 'readyOutGoing' first, because the shared secret cache
395 -- presently requires the IO monad. 394 -- presently requires the IO monad.
@@ -643,7 +642,7 @@ freshCryptoSession sessions
643 insertArrayAt outHooks 0 (A.array (0,64) (map assignHook [0..64])) 642 insertArrayAt outHooks 0 (A.array (0,64) (map assignHook [0..64]))
644 return (idmap,lossyEsc,losslessEsc,outHooks) 643 return (idmap,lossyEsc,losslessEsc,outHooks)
645 ncView0 <- newTVar (sessionView sessions) 644 ncView0 <- newTVar (sessionView sessions)
646 pktq <- PQ.new (inboundQueueCapacity sessions) 0 645 pktq <- PB.newPacketBuffer
647 bufstart <- newTVar 0 646 bufstart <- newTVar 0
648 mbpktoq 647 mbpktoq
649 <- case mbtheirSessionKey of 648 <- case mbtheirSessionKey of
@@ -696,7 +695,7 @@ freshCryptoSession sessions
696 , ncOutgoingIdMapEscapedLossy = lossyEscapeIdMap 695 , ncOutgoingIdMapEscapedLossy = lossyEscapeIdMap
697 , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap 696 , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap
698 , ncView = ncView0 697 , ncView = ncView0
699 , ncPacketQueue = pktq 698 , ncPacketBuffer = pktq
700 , ncStoredRequests = ncStoredRequests0 699 , ncStoredRequests = ncStoredRequests0
701 , ncRequestInterval = ncRequestInterval0 700 , ncRequestInterval = ncRequestInterval0
702 , ncAliveInterval = ncAliveInterval0 701 , ncAliveInterval = ncAliveInterval0
@@ -720,12 +719,25 @@ freshCryptoSession sessions
720 HaveHandshake pktoq -> return (runUponHandshake netCryptoSession0 addr pktoq) 719 HaveHandshake pktoq -> return (runUponHandshake netCryptoSession0 addr pktoq)
721 return (myhandshake,maybeLaunchMissles) 720 return (myhandshake,maybeLaunchMissles)
722 721
722{-
723type NetCryptoOutQueue = PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar) 723type NetCryptoOutQueue = PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar)
724 CryptoMessage 724 CryptoMessage
725 (CryptoPacket Encrypted) 725 (CryptoPacket Encrypted)
726 CryptoData 726 CryptoData
727-}
728data NetCryptoOutQueue = NetCryptoOutQueue
729 { nqPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted)
730 , nqToWire :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar)
731 -> Word32
732 -> Word32
733 -> Word32
734 -> XMessage
735 -> STM (Maybe (CryptoPacket Encrypted, Word32))
736 , nqToWireIO :: IO (STM (State, Nonce24, U.RangeMap TArray Word8 TVar))
737 , nqPacketNo :: TVar Word32
738 }
727 739
728createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketQueue CryptoData 740createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketBuffer CryptoData (CryptoPacket Encrypted)
729 -> TVar Nonce24 -> U.RangeMap TArray Word8 TVar -> STM (UponHandshake NetCryptoOutQueue) 741 -> TVar Nonce24 -> U.RangeMap TArray Word8 TVar -> STM (UponHandshake NetCryptoOutQueue)
730createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 = do 742createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 = do
731 let crypto = transportCrypto sessions 743 let crypto = transportCrypto sessions
@@ -739,8 +751,13 @@ createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce
739 ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) 751 ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession))
740 ) $ writeTVar ncMyPacketNonce0 n24plus1 752 ) $ writeTVar ncMyPacketNonce0 n24plus1
741 return (return (f n24, n24, ncOutgoingIdMap0)) 753 return (return (f n24, n24, ncOutgoingIdMap0))
742 pktoq <- PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 754 seqnoVar <- newTVar 0
743 return (HaveHandshake pktoq) 755 return (HaveHandshake NetCryptoOutQueue
756 { nqPacketBuffer = pktq
757 , nqToWire = ncToWire
758 , nqToWireIO = toWireIO
759 , nqPacketNo = seqnoVar
760 })
744 761
745-- | add new session to the lookup maps 762-- | add new session to the lookup maps
746addSessionToMap :: NetCryptoSessions -> SockAddr -> NetCryptoSession -> STM () 763addSessionToMap :: NetCryptoSessions -> SockAddr -> NetCryptoSession -> STM ()
@@ -788,11 +805,39 @@ addSessionToMapIfNotThere sessions addrRaw netCryptoSession = do
788 -- in case we're using the same long term key on different IPs ... 805 -- in case we're using the same long term key on different IPs ...
789 modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) 806 modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs))
790 807
808data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail
809 deriving (Show)
810
811-- | Convert a message to packet format and append it to the front of a queue
812-- used for outgoing messages. (Note that ‘front‛ usually means the higher
813-- index in this implementation.)
814--
815-- Called from 'runUponHandshake' and 'sendCrypto'.
816--
817-- Whenever this is called, you should also send the resulting packet out on
818-- the network.
819
820tryAppendQueueOutgoing :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar)
821 -> NetCryptoOutQueue
822 -> CryptoMessage
823 -> STM (OutGoingResult (CryptoPacket Encrypted))
824tryAppendQueueOutgoing getExtra outq msg = do
825 pktno <- readTVar (nqPacketNo outq)
826 nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq)
827 be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq)
828 mbWire <- nqToWire outq getExtra nextno be pktno msg
829 case mbWire of
830 Just (payload,seqno) -> do
831 PB.grokPacket (nqPacketBuffer outq) (PacketSent seqno payload)
832 return $ OGSuccess payload
833 Nothing -> return OGEncodeFail
834
835
791runUponHandshake :: NetCryptoSession -> SockAddr -> NetCryptoOutQueue -> IO () 836runUponHandshake :: NetCryptoSession -> SockAddr -> NetCryptoOutQueue -> IO ()
792runUponHandshake netCryptoSession0 addr pktoq = do 837runUponHandshake netCryptoSession0 addr pktoq = do
793 dput XNetCrypto "(((((((runUponHandshake))))))) Launching threads" 838 dput XNetCrypto "(((((((runUponHandshake))))))) Launching threads"
794 let sessions = ncAllSessions netCryptoSession0 839 let sessions = ncAllSessions netCryptoSession0
795 pktq = ncPacketQueue netCryptoSession0 840 pktq = ncPacketBuffer netCryptoSession0
796 remotePublicKey = ncTheirPublicKey netCryptoSession0 841 remotePublicKey = ncTheirPublicKey netCryptoSession0
797 crypto = transportCrypto sessions 842 crypto = transportCrypto sessions
798 allsessions = netCryptoSessions sessions 843 allsessions = netCryptoSessions sessions
@@ -805,7 +850,7 @@ runUponHandshake netCryptoSession0 addr pktoq = do
805 atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) 850 atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid)
806 labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey) ++ sidStr) 851 labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey) ++ sidStr)
807 fix $ \loop -> do 852 fix $ \loop -> do
808 cd <- atomically $ PQ.dequeue pktq 853 cd <- atomically $ PB.awaitReadyPacket pktq
809 if msgID (bufferData cd) == PacketRequest 854 if msgID (bufferData cd) == PacketRequest
810 then do 855 then do
811 dput XNetCrypto $ "Dequeued::PacketRequest seqno=" ++ show (bufferStart cd) ++ " " ++ show (bufferData cd) 856 dput XNetCrypto $ "Dequeued::PacketRequest seqno=" ++ show (bufferStart cd) ++ " " ++ show (bufferData cd)
@@ -824,19 +869,21 @@ runUponHandshake netCryptoSession0 addr pktoq = do
824 labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) 869 labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr)
825 fix $ \loop -> do 870 fix $ \loop -> do
826 atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) 871 atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000)
827 nums <- atomically $ PQ.getMissing pktq 872 (nums,seqno) <- atomically $ PB.packetNumbersToRequest pktq
828 dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums 873 dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums
829 getOutGoingParam <- PQ.readyOutGoing pktoq 874 getOutGoingParam <- nqToWireIO pktoq
830 atomically $ do 875 x <- atomically $ do
831 seqno <- PQ.getLastDequeuedPlus1 pktq 876 ogresult <- tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums)
832 ogresult <- PQ.tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums)
833 case ogresult of 877 case ogresult of
834 PQ.OGSuccess _ -> return () 878 OGSuccess x -> return x
835 _ -> retry 879 _ -> retry
880 sendSessionPacket sessions addr x
836 loop 881 loop
837 dput XNetCrypto $ "runUponHandshake: " ++ show threadid ++ " = NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr 882 dput XNetCrypto $ "runUponHandshake: " ++ show threadid ++ " = NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr
838 883
839 -- launch dequeueOutgoing thread 884 -- launch dequeueOutgoing thread
885 {-
886 -- TODO
840 threadidOutgoing <- forkIO $ do 887 threadidOutgoing <- forkIO $ do
841 tid <- myThreadId 888 tid <- myThreadId
842 atomically $ writeTVar (ncDequeueOutGoingThread netCryptoSession0) (Just tid) 889 atomically $ writeTVar (ncDequeueOutGoingThread netCryptoSession0) (Just tid)
@@ -846,7 +893,8 @@ runUponHandshake netCryptoSession0 addr pktoq = do
846 dput XNetCrypto "NetCryptoDequeueOutgoing thread... Sending encrypted Packet" 893 dput XNetCrypto "NetCryptoDequeueOutgoing thread... Sending encrypted Packet"
847 sendSessionPacket sessions addr pkt 894 sendSessionPacket sessions addr pkt
848 loop 895 loop
849 dput XNetCrypto $ "runUponHandshake: " ++ show threadidOutgoing ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr 896 -}
897 dput XNetCrypto $ "runUponHandshake: " ++ show "threadidOutgoing" ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr
850 898
851 -- launch ping Machine thread 899 -- launch ping Machine thread
852 pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) 900 pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0)
@@ -859,10 +907,10 @@ runUponHandshake netCryptoSession0 addr pktoq = do
859 labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) 907 labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr)
860 fix $ \loop -> do 908 fix $ \loop -> do
861 atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) 909 atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000)
862 dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" 910 dput XNetCryptoOut $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet"
863 lr <- sendPing crypto netCryptoSession0 911 lr <- sendPing crypto netCryptoSession0
864 case lr of 912 case lr of
865 Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s 913 Left s -> dput XNetCryptoOut $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s
866 Right _ -> return () 914 Right _ -> return ()
867 loop 915 loop
868 916
@@ -972,7 +1020,7 @@ updateCryptoSession sessions addr newsession timestamp hp session handshake = do
972 sessions 1020 sessions
973 newsession 1021 newsession
974 theirSessionPublic 1022 theirSessionPublic
975 (ncPacketQueue session) 1023 (ncPacketBuffer session)
976 (ncMyPacketNonce session) 1024 (ncMyPacketNonce session)
977 (ncOutgoingIdMap session) 1025 (ncOutgoingIdMap session)
978 writeTVar (ncOutgoingQueue session) mbpktoq 1026 writeTVar (ncOutgoingQueue session) mbpktoq
@@ -1055,7 +1103,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do
1055 Nothing -> do 1103 Nothing -> do
1056 dput XNetCrypto "Dropping packet.. no session" 1104 dput XNetCrypto "Dropping packet.. no session"
1057 return Nothing -- drop packet, we have no session 1105 return Nothing -- drop packet, we have no session
1058 Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketQueue, ncHooks, 1106 Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketBuffer, ncHooks,
1059 ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, 1107 ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce,
1060 ncPingMachine, ncSessionId, ncStoredRequests}) -> do 1108 ncPingMachine, ncSessionId, ncStoredRequests}) -> do
1061 -- Unrecognized packets, try them thrice so as to give 1109 -- Unrecognized packets, try them thrice so as to give
@@ -1130,9 +1178,10 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do
1130 isLossy (GrpMsg KnownLossy _) = True 1178 isLossy (GrpMsg KnownLossy _) = True
1131 isLossy (Msg mid) | lossyness mid == Lossy = True 1179 isLossy (Msg mid) | lossyness mid == Lossy = True
1132 isLossy _ = False 1180 isLossy _ = False
1181 ack = bufferStart -- Earliest sequence number they've seen from us.
1133 if isLossy msgTypMapped 1182 if isLossy msgTypMapped
1134 then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm 1183 then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm
1135 atomically $ PQ.observeOutOfBand ncPacketQueue bufferEnd 1184 atomically $ PB.grokPacket ncPacketBuffer (PacketReceivedLossy bufferEnd ack)
1136 runCryptoHook session (bufferData cd) 1185 runCryptoHook session (bufferData cd)
1137 else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm 1186 else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm
1138 when (msgID cm == PING) $ 1187 when (msgID cm == PING) $
@@ -1143,7 +1192,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do
1143 -- num <- CB.getNextSequenceNum ncStoredRequests 1192 -- num <- CB.getNextSequenceNum ncStoredRequests
1144 -- CB.enqueue ncStoredRequests num cd 1193 -- CB.enqueue ncStoredRequests num cd
1145 handlePacketRequest session cd 1194 handlePacketRequest session cd
1146 atomically $ PQ.enqueue ncPacketQueue bufferEnd cd 1195 atomically $ PB.grokPacket ncPacketBuffer (PacketReceived bufferEnd cd ack)
1147 return Nothing 1196 return Nothing
1148 where 1197 where
1149 last2Bytes :: Nonce24 -> Word16 1198 last2Bytes :: Nonce24 -> Word16
@@ -1262,14 +1311,18 @@ sendCrypto crypto session updateLocal cm = do
1262 HaveHandshake outq <- atomically $ readTVar (ncOutgoingQueue session) 1311 HaveHandshake outq <- atomically $ readTVar (ncOutgoingQueue session)
1263 -- XXX: potential race? if shared secret comes out of sync with cache? 1312 -- XXX: potential race? if shared secret comes out of sync with cache?
1264 dput XNetCrypto "sendCrypto: enter " 1313 dput XNetCrypto "sendCrypto: enter "
1265 getOutGoingParam <- PQ.readyOutGoing outq 1314 getOutGoingParam <- nqToWireIO outq
1266 dput XNetCrypto "sendCrypto: got the io extra stuff" 1315 dput XNetCrypto "sendCrypto: got the io extra stuff"
1267 atomically $ do 1316 r <- atomically $ do
1268 result <- PQ.tryAppendQueueOutgoing getOutGoingParam outq cm 1317 result <- tryAppendQueueOutgoing getOutGoingParam outq cm
1269 case result of 1318 case result of
1270 PQ.OGSuccess x -> updateLocal >> return (Right x) 1319 OGSuccess x -> updateLocal >> return (Right x)
1271 PQ.OGFull -> return (Left "Outgoing packet buffer is full") 1320 OGFull -> return (Left "Outgoing packet buffer is full")
1272 PQ.OGEncodeFail -> return (Left "Failed to encode outgoing packet") 1321 OGEncodeFail -> return (Left "Failed to encode outgoing packet")
1322 case ncSockAddr session of
1323 HaveDHTKey saddr -> mapM_ (sendSessionPacket (ncAllSessions session) saddr) r
1324 _ -> return ()
1325 return r
1273 1326
1274sendPing :: TransportCrypto -> NetCryptoSession -> IO (Either String (CryptoPacket Encrypted)) 1327sendPing :: TransportCrypto -> NetCryptoSession -> IO (Either String (CryptoPacket Encrypted))
1275sendPing crypto session = do 1328sendPing crypto session = do
@@ -1327,8 +1380,16 @@ sendCryptoLossy crypto session updateLocal cm = do
1327 updateLocal 1380 updateLocal
1328 return (Left errmsg) 1381 return (Left errmsg)
1329 HaveHandshake outq -> do 1382 HaveHandshake outq -> do
1330 getOutGoingParam <- PQ.readyOutGoing outq 1383 getOutGoingParam <- nqToWireIO outq
1331 mbPkt <- atomically $ PQ.peekPacket getOutGoingParam outq cm 1384 mbPkt <- atomically $ do
1385 pktno <- readTVar (nqPacketNo outq)
1386 nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq)
1387 be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq)
1388 nqToWire outq getOutGoingParam -- See 'ncToWire'
1389 nextno -- packet number we expect to recieve
1390 be -- buffer_end (for lossy)
1391 pktno -- packet number (for lossless)
1392 cm
1332 case mbPkt of 1393 case mbPkt of
1333 Nothing -> do 1394 Nothing -> do
1334 let errmsg = "Error sending lossy packet! (sessionid: " ++ show (ncSessionId session) ++ ") " ++ show cm 1395 let errmsg = "Error sending lossy packet! (sessionid: " ++ show (ncSessionId session) ++ ") " ++ show cm
@@ -1466,11 +1527,10 @@ handlePacketRequest session (CryptoData { bufferStart=num
1466 mbOutQ <- atomically $ readTVar (ncOutgoingQueue session) 1527 mbOutQ <- atomically $ readTVar (ncOutgoingQueue session)
1467 case mbOutQ of 1528 case mbOutQ of
1468 HaveHandshake pktoq -> do 1529 HaveHandshake pktoq -> do
1469 getOutGoingParam <-PQ.readyOutGoing pktoq 1530 getOutGoingParam <-nqToWireIO pktoq
1470 ps <- atomically $ PQ.getRequested getOutGoingParam pktoq num bs 1531 ps <- atomically $ PB.retrieveForResend (nqPacketBuffer pktoq) $ PB.decompressSequenceNumbers num bs
1471 let resend (Just (n,pkt)) = sendSessionPacket (ncAllSessions session) addr pkt 1532 let resend (n,pkt) = sendSessionPacket (ncAllSessions session) addr pkt
1472 resend _ = return () 1533 dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst ps))
1473 dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst $ catMaybes ps))
1474 mapM_ resend ps 1534 mapM_ resend ps
1475 _ -> return () 1535 _ -> return ()
1476 1536
diff --git a/src/Network/Tox/Crypto/Transport.hs b/src/Network/Tox/Crypto/Transport.hs
index a453a13c..8bb41254 100644
--- a/src/Network/Tox/Crypto/Transport.hs
+++ b/src/Network/Tox/Crypto/Transport.hs
@@ -82,12 +82,13 @@ import Data.Text.Encoding as T
82import Data.Serialize as S 82import Data.Serialize as S
83import Control.Arrow 83import Control.Arrow
84import DPut 84import DPut
85import Data.PacketQueue (toPNums) 85import Data.PacketBuffer as PB
86import Data.Function 86import Data.Function
87 87
88showCryptoMsg :: Word32 -> CryptoMessage -> [Char] 88showCryptoMsg :: Word32 -> CryptoMessage -> [Char]
89showCryptoMsg seqno (UpToN PacketRequest bytes) = "UpToN PacketRequest --> " ++ show (toPNums seqno $ B.unpack bytes) 89showCryptoMsg seqno (UpToN PacketRequest bytes) = "UpToN PacketRequest --> "
90showCryptoMsg _ msg = show msg 90 ++ show (PB.decompressSequenceNumbers seqno $ B.unpack bytes)
91showCryptoMsg _ msg = show msg
91 92
92parseCrypto :: (ByteString, SockAddr) -> Either (CryptoPacket Encrypted, SockAddr) (ByteString, SockAddr) 93parseCrypto :: (ByteString, SockAddr) -> Either (CryptoPacket Encrypted, SockAddr) (ByteString, SockAddr)
93parseCrypto ((B.uncons -> Just (0x1b,pkt)),saddr) = either (\_ -> Right (pkt,saddr)) 94parseCrypto ((B.uncons -> Just (0x1b,pkt)),saddr) = either (\_ -> Right (pkt,saddr))