-- | This module is useful for implementing a lossess protocol on top of a -- lossy datagram style protocol. It implements a buffer in which packets may -- be stored out of order, but from which they are extracted in the proper -- sequence. {-# LANGUAGE NamedFieldPuns #-} module Data.PacketQueue ( PacketQueue , getCapacity , getLastDequeuedPlus1 , getLastEnqueuedPlus1 , new , dequeue , dropPacketsLogic , dropPacketsBefore , getMissing -- , dequeueOrGetMissing -- , markButNotDequeue , enqueue , observeOutOfBand , packetQueueViewList -- , mapQ , Data.PacketQueue.lookup ) where import Control.Concurrent.STM import Control.Monad import Data.Word import Data.Array.MArray import Data.Maybe data PacketQueue a = PacketQueue { pktq :: TArray Word32 (Maybe a) , seqno :: TVar Word32 -- (buffer_start) , qsize :: Word32 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 -- i.e. one more than the largest seen sequence number. -- Written by: -- observeOutOfBand -- dropPacketsBefore -- enqueue } -- | Obtain a list of non-empty slots in the 'PacketQueue'. The numeric value -- is an index into the underlying array, not a sequence number. packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] packetQueueViewList p = do let f (n,Nothing) = Nothing f (n,Just x) = Just (n,x) catMaybes . map f <$> getAssocs (pktq p) -- | This returns the earliest sequence number with a slot in the queue. getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno -- | This returns the least upper bound of sequence numbers that have been -- enqueued. getLastEnqueuedPlus1 :: PacketQueue a -> STM Word32 getLastEnqueuedPlus1 PacketQueue {buffend} = readTVar buffend -- | This is the number of consequetive sequence numbers, starting at -- 'getLastDequeuedPlus1' that can be stored in the queue getCapacity :: Applicative m => PacketQueue t -> m Word32 getCapacity (PacketQueue { qsize }) = pure qsize -- | Create a new PacketQueue. new :: Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. -> STM (PacketQueue a) new capacity seqstart = do let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 q <- newArray (0,cap - 1) Nothing seqv <- newTVar seqstart bufe <- newTVar seqstart return PacketQueue { pktq = q , seqno = seqv , qsize = cap , buffend = bufe } -- | Update the packet queue given: -- -- * packet queue -- -- * the number of next lossless packet they intend to send you -- -- This behaves exactly like 'enqueue' except that no packet data is written to -- the queue. observeOutOfBand :: PacketQueue a -> Word32-> STM () observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do low <- readTVar seqno let proj = numberOfNextLosslessPacketThatTheyWillSend - low -- Ignore packet if out of range. when ( proj < qsize) $ do modifyTVar' buffend (\be -> if be - low <= proj then numberOfNextLosslessPacketThatTheyWillSend + 1 else be) -- | If seqno < buffend then return expected packet numbers for all -- the Nothings in the array between them. -- Otherwise, return empty list. getMissing :: PacketQueue a -> STM [Word32] getMissing PacketQueue { pktq, seqno, qsize, buffend } = do seqno0 <- readTVar seqno buffend0 <- readTVar buffend -- note relying on fact that [ b .. a ] is null when a < b let indices = take (fromIntegral qsize) $ [ seqno0 .. buffend0 - 1] maybes <- forM indices $ \i -> do x <- readArray pktq $ i `mod` qsize return (i,x) let nums = map fst . filter (isNothing . snd) $ maybes return nums -- -- | If seqno < buffend then return expected packet numbers for all -- -- the Nothings in the array between them. -- -- Otherwise, behave as 'dequeue' would. -- -- TODO: Do we need this function? Delete it if not. -- dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) -- dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do -- seqno0 <- readTVar seqno -- buffend0 <- readTVar buffend -- if seqno0 < buffend0 -- then do -- maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) -- let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes -- return (Left nums) -- else do -- let i = seqno0 `mod` qsize -- x <- maybe retry return =<< readArray pktq i -- writeArray pktq i Nothing -- modifyTVar' seqno succ -- return (Right x) -- | Retry until the next expected packet is enqueued. Then return it. dequeue :: PacketQueue a -> STM a dequeue PacketQueue { pktq, seqno, qsize } = do i0 <- readTVar seqno let i = i0 `mod` qsize x <- maybe retry return =<< readArray pktq i writeArray pktq i Nothing modifyTVar' seqno succ return x -- | Helper to 'dropPacketsBefore'. dropPacketsLogic :: Word32 -> Word32 -> Word32 -> (Maybe Word32, Word32, [(Word32,Word32)]) dropPacketsLogic qsize low no0 = let no = no0 - 1 -- Unsigned: could overflow proj = no - low -- Unsigned: could overflow in if proj < qsize then let ilow = low `mod` qsize i = no `mod` qsize ranges = if ilow <= i then [(ilow, i)] else [(0,i),(ilow,qsize-1)] in (Nothing,no0,ranges) -- Clear some, but not all, slots. else (Nothing,low,[]) -- out of bounds, do nothing -- (Just no0, no0, [(0,qsize - 1)]) -- Reset to empty queue. -- | Drop all packets preceding the given packet number. dropPacketsBefore :: PacketQueue a -> Word32 -> STM () dropPacketsBefore PacketQueue{ pktq, seqno, qsize, buffend } no0 = do low <- readTVar seqno let (mbuffend, no, ranges) = dropPacketsLogic qsize low no0 mapM_ (writeTVar buffend) mbuffend writeTVar seqno no forM_ ranges $ \(lo,hi) -> forM_ [lo .. hi] $ \i -> writeArray pktq i Nothing -- -- | Like dequeue, but marks as viewed rather than removing -- markButNotDequeue :: PacketQueue (Bool,a) -> STM a -- markButNotDequeue PacketQueue { pktq, seqno, qsize } = do -- i0 <- readTVar seqno -- let i = i0 `mod` qsize -- (b,x) <- maybe retry return =<< readArray pktq i -- writeArray pktq i (Just (True,x)) -- modifyTVar' seqno succ -- return x -- | Enqueue a packet. Packets need not be enqueued in order as long as there -- is spare capacity in the queue. If there is not, the packet will be -- silently discarded without blocking. (Unless this is an Overwrite-queue, in -- which case, the packets will simply wrap around overwriting the old ones.) -- -- If the packet was enqueued, (0,i) will be retuned where /i/ is the index at -- which the new packet was stored in the buffer. If the queue was full, the -- first of the returned pair will be non-zero. enqueue :: PacketQueue a -- ^ The packet queue. -> Word32 -- ^ Sequence number of the packet. -> a -- ^ The packet. -> STM (Word32,Word32) enqueue PacketQueue{ pktq, seqno, qsize, buffend} no x = do low <- readTVar seqno let proj = no - low -- Ignore packet if out of range. when ( proj < qsize) $ do let i = no `mod` qsize writeArray pktq i (Just x) modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) return (proj `divMod` qsize) -- | Obtain the packet with the given sequence number if it is stored in the -- queue, otherwise /Nothing/ is returned without blocking. lookup :: PacketQueue a -> Word32 -> STM (Maybe a) lookup PacketQueue{ pktq, seqno, qsize } no = do low <- readTVar seqno let proj = no - low if proj < qsize then let i = no `mod` qsize in readArray pktq i else return Nothing -- -- | For each item in the queue, modify or delete it. -- mapQ :: (a -> Maybe a) -> PacketQueue a -> STM () -- mapQ f PacketQueue{pktq} = do -- (z,n) <- getBounds pktq -- forM_ [z .. n] $ \i -> do -- e <- readArray pktq i -- writeArray pktq i (e>>=f)