-- | This module is useful for implementing a lossess protocol on top of a -- lossy datagram style protocol. It implements a buffer in which packets may -- be stored out of order, but from which they are extracted in the proper -- sequence. {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE FlexibleContexts #-} module Data.PacketQueue ( PacketQueue , getCapacity , new , newOverwrite , dequeue , markButNotDequeue , enqueue , observeOutOfBand , PacketOutQueue , packetQueueViewList , newOutGoing , readyOutGoing , tryAppendQueueOutgoing , dequeueOutgoing , getHighestHandledPacketPlus1 , mapOutGoing , OutGoingResult(..) ) where import Control.Concurrent.STM import Control.Concurrent.STM.TArray import Control.Monad import Control.Applicative import Data.Word import Data.Array.MArray import Data.Maybe import Debug.Trace data PacketQueue a = PacketQueue { pktq :: TArray Word32 (Maybe a) , seqno :: TVar Word32 , qsize :: Word32 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead. } packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] packetQueueViewList p = do let f (n,Nothing) = Nothing f (n,Just x) = Just (n,x) catMaybes . map f <$> getAssocs (pktq p) getCapacity :: Applicative m => PacketQueue t -> m Word32 getCapacity (PacketQueue { qsize }) = pure qsize -- | Create a new PacketQueue. new :: Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. -> STM (PacketQueue a) new capacity seqstart = do let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 q <- newArray (0,cap - 1) Nothing seqv <- newTVar seqstart bufe <- newTVar 0 return PacketQueue { pktq = q , seqno = seqv , qsize = cap , buffend = bufe , qOverWriteMode = False } -- | Create a new PacketQueue with Overwrite on Wrap. newOverwrite :: Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. -> STM (PacketQueue a) newOverwrite capacity seqstart = do let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 q <- newArray (0,cap - 1) Nothing seqv <- newTVar seqstart bufe <- newTVar 0 return PacketQueue { pktq = q , seqno = seqv , qsize = cap , buffend = bufe , qOverWriteMode = True } observeOutOfBand :: PacketQueue a -> Word32-> STM () observeOutOfBand PacketQueue { seqno, qsize, buffend } no = do low <- readTVar seqno let proj = no - low -- Ignore packet if out of range. when ( proj < qsize) $ do modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) -- | Retry until the next expected packet is enqueued. Then return it. dequeue :: PacketQueue a -> STM a dequeue PacketQueue { pktq, seqno, qsize } = do i0 <- readTVar seqno let i = i0 `mod` qsize x <- maybe retry return =<< readArray pktq i writeArray pktq i Nothing modifyTVar' seqno succ return x -- | Like dequeue, but marks as viewed rather than removing markButNotDequeue :: PacketQueue (Bool,a) -> STM a markButNotDequeue PacketQueue { pktq, seqno, qsize } = do i0 <- readTVar seqno let i = i0 `mod` qsize (b,x) <- maybe retry return =<< readArray pktq i writeArray pktq i (Just (True,x)) modifyTVar' seqno succ return x -- | Enqueue a packet. Packets need not be enqueued in order as long as there -- is spare capacity in the queue. If there is not, the packet will be -- silently discarded without blocking. (Unless this is an Overwrite-queue, -- in which case, the packets will simply wrap around overwriting the old ones.) enqueue :: PacketQueue a -- ^ The packet queue. -> Word32 -- ^ Sequence number of the packet. -> a -- ^ The packet. -> STM (Word32,Word32) enqueue PacketQueue{ pktq, seqno, qsize, buffend,qOverWriteMode } no x = do low <- readTVar seqno let proj = no - low -- Ignore packet if out of range. when ( proj < qsize || qOverWriteMode) $ do let i = no `mod` qsize writeArray pktq i (Just x) modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) return (proj `divMod` qsize) -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo ----------------------------------------------------- -- * PacketOutQueue -- data PacketOutQueue extra msg toWire fromWire = PacketOutQueue { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' , pktoOutPQ :: PacketQueue (Word32,toWire) , pktoPacketNo :: TVar Word32 , pktoToWireIO :: IO (STM extra) , pktoToWire :: STM extra -> Word32{-packet number we expect to recieve-} -> Word32{- buffer_end -} -> Word32{- packet number -} -> msg -> STM (Maybe (toWire,Word32{-next packet no-})) } mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM () mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do (z,n) <- getBounds pktq let ff i = do e <- readArray pktq i writeArray pktq i (e>>=f) mapM_ ff [z .. n] newOutGoing :: PacketQueue fromwire -- ^ Incoming queue -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-}))) -- ^ toWire callback -> IO (STM io) -- ^ io action to get extra parameter -> Word32 -- ^ packet number of first outgoing packet -> Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. -> STM (PacketOutQueue io msg wire fromwire) newOutGoing inq towire toWireIO num capacity seqstart = do outq <- new capacity seqstart numVar <- newTVar num return $ PacketOutQueue { pktoInPQ = inq , pktoOutPQ = outq , pktoPacketNo = numVar , pktoToWireIO = toWireIO , pktoToWire = towire } data OutGoingResult = OGSuccess | OGFull | OGEncodeFail deriving (Eq,Show) -- | do something in IO before appending to the queue readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra) readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO -- | Convert a message to packet format and append it to the front of a queue -- used for outgoing messages. (Note that ‘front‛ usually means the higher -- index in this implementation.) tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM OutGoingResult tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg = trace "(tryAppendQueueOutgoing)" $ do be <- readTVar (buffend pktoOutPQ) let i = be `mod` (qsize pktoOutPQ) let arrayEmpty :: MArray a e m => a Word32 e -> m Bool arrayEmpty ar = do (lowB,highB) <- getBounds ar let result= lowB > highB return $ trace ("arrayEmpty result=" ++ show result ++ " lowB=" ++ show lowB ++ " highB = " ++ show highB ++ " i = " ++ show i) result mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ) if emp then trace "(tryAppendQueueOutgoing empty)" $ return Nothing else trace "(tryAppendQueueOutgoing nonempty)" $ do result <- readArray (pktq pktoOutPQ) i return $ trace ("readArray (isJust result)==" ++ show (isJust result)) result pktno <- readTVar pktoPacketNo nextno <- readTVar (seqno pktoInPQ) mbWire <- pktoToWire getExtra nextno be pktno msg case trace "(tryAppendQueueOutgoing mbWire)" mbWire of Just (pkt,pktno') -> trace "(tryAppendQueueOutgoing A)" $ case mbPkt of -- slot is free, insert element Nothing -> trace "(tryAppendQueueOutgoing Nothing case)" $ do modifyTVar' (buffend pktoOutPQ) (+1) writeTVar pktoPacketNo $! pktno' writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) return OGSuccess -- queue is full Just (n,_) -> trace "tryAppendQueueOutgoing Just case)" $ do nn <- getHighestHandledPacketPlus1 q if (n < nn) -- but we can overwrite an old packet then do modifyTVar' (buffend pktoOutPQ) (+1) writeTVar pktoPacketNo $! pktno' writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) return OGSuccess -- uh oh this packet is still needed... else return OGFull -- don't know how to send this message Nothing -> return OGEncodeFail dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire) dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do i0 <- readTVar seqno let i = i0 `mod` qsize x <- maybe retry return =<< readArray pktq i -- writeArray pktq i Nothing -- not cleaning modifyTVar' seqno succ return x getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32 getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ)