{-# LANGUAGE TupleSections #-} {-# LANGUAGE DeriveFunctor #-} module Data.PacketBuffer ( PacketBuffer , newPacketBuffer , PacketOutboundEvent(..) , PacketInboundEvent(..) , grokOutboundPacket , grokInboundPacket , awaitReadyPacket , packetNumbersToRequest , expectingSequenceNumber , nextToSendSequenceNumber , retrieveForResend , decompressSequenceNumbers , compressSequenceNumbers , pbReport ) where import Data.PacketQueue as Q import DPut import DebugTag import Control.Concurrent.STM import Control.Monad import Data.Maybe import Data.Word data PacketBuffer a b = PacketBuffer { inQueue :: PacketQueue a , outBuffer :: PacketQueue b } -- | Initialize the packet buffers. Note, the capacity of the inbound packet -- queue is currently hardcoded to 200 packets and the capacity of the outbound -- buffer is hardcoded to 400 packets. newPacketBuffer :: STM (PacketBuffer a b) newPacketBuffer = PacketBuffer <$> Q.new 200 0 <*> Q.new 400 0 -- | Input for 'grokPacket'. data PacketOutboundEvent b = PacketSent { poSeqNum :: Word32 -- ^ Sequence number for payload. , poSentPayload :: b -- ^ Payload packet we sent to them. } deriving Functor data PacketInboundEvent a = PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. , peReceivedPayload :: a -- ^ Payload packet they sent to us. , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. } | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet. , peReceivedPayload :: a -- ^ Payload packet they sent to us (ignored). , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. } deriving Functor -- | Whenever a packet is received or sent (but not resent) from the network, -- this function should be called to update the relevant buffers. -- -- On outgoing packets, if the outbound buffer is full, this will return -- True. In this case, the caller may retry to enable blocking until -- 'grokInboundPacket' is called in another thread. grokOutboundPacket :: PacketBuffer a b -> PacketOutboundEvent b -> STM (Bool,(Word32,Word32)) grokOutboundPacket (PacketBuffer _ outb) (PacketSent seqno a) = do (n,r) <- Q.enqueue outb seqno a return (n/=0,(n,r)) grokInboundPacket :: PacketBuffer a b -> PacketInboundEvent a -> STM () grokInboundPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) = do Q.enqueue inb seqno a Q.dropPacketsBefore outb ack grokInboundPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno _ ack) = do Q.observeOutOfBand inb seqno Q.dropPacketsBefore outb ack -- | Wait until an inbound packet is ready to process. Any necessary -- re-ordering will have been done. awaitReadyPacket :: PacketBuffer a b -> STM a awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb -- | Obtain a list of sequence numbers that may have been dropped. This would -- be any number not yet received that is prior to the maxium sequence number -- we have received. For convenience, a lowerbound for the missing squence numbers -- is also returned as the second item of the pair. packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32) packetNumbersToRequest (PacketBuffer inb _) = do ns <- Q.getMissing inb lb <- Q.getLastDequeuedPlus1 inb return (ns,lb) expectingSequenceNumber :: PacketBuffer a b -> STM Word32 expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32 nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb -- | Retrieve already-sent packets by their sequence numbers. See -- 'decompressSequenceNumbers' to obtain the sequence number list from a -- compressed encoding. There is no need to call 'grokPacket' when sending the -- packets returned from this call. retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)] retrieveForResend (PacketBuffer _ outb) seqnos = catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no) -- | Expand a compressed set of sequence numbers. The first sequence number is -- given directly and the rest are computed using 8-bit offsets. This is -- normally used to obtain input for 'retrieveForResend'. decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32] decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1) where doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32] doOne 0 f addend = f (addend + 255) doOne x f addend = let y = fromIntegral x + addend in y : f y compressSequenceNumbers :: Word32 -> [Word32] -> [Word8] compressSequenceNumbers baseno xs = foldr doOne (const []) xs (baseno-1) where doOne :: Word32 -> (Word32 -> [Word8]) -> Word32 -> [Word8] doOne y f addend = case y - addend of x | x < 255 -> fromIntegral x : f y | otherwise -> 0 : doOne y f (addend + 255) {- compressSequenceNumbers :: Word32 -> [Word32] -> [Word8] compressSequenceNumbers seqno xs = let r = map fromIntegral (reduceToSums ys >>= makeZeroes) in dtrace XNetCrypto ("compressSequenceNumbers " ++ show seqno ++ show xs ++ " --> "++show r) r where ys = Prelude.map (subtract (seqno - 1)) xs reduceToSums [] = [] reduceToSums (x:xs) = x:(reduceToSums $ Prelude.map (subtract x) xs) makeZeroes :: Word32 -> [Word32] -- makeZeroes 0 = [] makeZeroes x = let (d,m)= x `divMod` 255 zeros= Prelude.replicate (fromIntegral d) 0 in zeros ++ [m] -} pbReport :: String -> PacketBuffer a b -> STM String pbReport what (PacketBuffer inb outb) = do inb_seqno <- getLastDequeuedPlus1 inb inb_buffend <- getLastEnqueuedPlus1 inb outb_seqno <- getLastDequeuedPlus1 outb outb_bufend <- getLastEnqueuedPlus1 outb return $ "PacketBuffer<"++what++"> Inbound" ++ show (inb_seqno, inb_buffend) ++" Outbound" ++ show (outb_seqno, outb_bufend)