-- | This module is useful for implementing a lossess protocol on top of a -- lossy datagram style protocol. It implements a buffer in which packets may -- be stored out of order, but from which they are extracted in the proper -- sequence. {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE FlexibleContexts #-} module Data.PacketQueue ( PacketQueue , new , dequeue , enqueue , observeOutOfBand , PacketOutQueue , newOutGoing , readyOutGoing , tryAppendQueueOutgoing , dequeueOutgoing , getHighestHandledPacketPlus1 , mapOutGoing , OutGoingResult(..) ) where import Control.Concurrent.STM import Control.Concurrent.STM.TArray import Control.Monad import Data.Word import Data.Array.MArray data PacketQueue a = PacketQueue { pktq :: TArray Word32 (Maybe a) , seqno :: TVar Word32 , qsize :: Word32 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 } -- | Create a new PacketQueue. new :: Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. -> STM (PacketQueue a) new capacity seqstart = do let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 q <- newArray (0,cap - 1) Nothing seqv <- newTVar seqstart bufe <- newTVar 0 return PacketQueue { pktq = q , seqno = seqv , qsize = cap , buffend = bufe } observeOutOfBand :: PacketQueue a -> Word32-> STM () observeOutOfBand PacketQueue { seqno, qsize, buffend } no = do low <- readTVar seqno let proj = no - low -- Ignore packet if out of range. when ( proj < qsize) $ do modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) -- | Retry until the next expected packet is enqueued. Then return it. dequeue :: PacketQueue a -> STM a dequeue PacketQueue { pktq, seqno, qsize } = do i0 <- readTVar seqno let i = i0 `mod` qsize x <- maybe retry return =<< readArray pktq i writeArray pktq i Nothing modifyTVar' seqno succ return x -- | Enqueue a packet. Packets need not be enqueued in order as long as there -- is spare capacity in the queue. If there is not, the packet will be -- silently discarded without blocking. enqueue :: PacketQueue a -- ^ The packet queue. -> Word32 -- ^ Sequence number of the packet. -> a -- ^ The packet. -> STM () enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do low <- readTVar seqno let proj = no - low -- Ignore packet if out of range. when ( proj < qsize) $ do let i = no `mod` qsize writeArray pktq i (Just x) modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo ----------------------------------------------------- -- * PacketOutQueue -- data PacketOutQueue extra msg toWire fromWire = PacketOutQueue { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' , pktoOutPQ :: PacketQueue (Word32,toWire) , pktoPacketNo :: TVar Word32 , pktoToWireIO :: IO (STM extra) , pktoToWire :: STM extra -> Word32{-packet number we expect to recieve-} -> Word32{- buffer_end -} -> Word32{- packet number -} -> msg -> STM (Maybe (toWire,Word32{-next packet no-})) } mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM () mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do (z,n) <- getBounds pktq let ff i = do e <- readArray pktq i writeArray pktq i (e>>=f) mapM_ ff [z .. n] newOutGoing :: PacketQueue fromwire -- ^ Incoming queue -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-}))) -- ^ toWire callback -> IO (STM io) -- ^ io action to get extra parameter -> Word32 -- ^ packet number of first outgoing packet -> Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. -> STM (PacketOutQueue io msg wire fromwire) newOutGoing inq towire toWireIO num capacity seqstart = do outq <- new capacity seqstart numVar <- newTVar num return $ PacketOutQueue { pktoInPQ = inq , pktoOutPQ = outq , pktoPacketNo = numVar , pktoToWireIO = toWireIO , pktoToWire = towire } data OutGoingResult = OGSuccess | OGFull | OGEncodeFail deriving (Eq,Show) -- | do something in IO before appending to the queue readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra) readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO -- | Convert a message to packet format and append it to the front of a queue -- used for outgoing messages. (Note that ‘front‛ usually means the higher -- index in this implementation.) tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM OutGoingResult tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg = do be <- readTVar (buffend pktoOutPQ) let i = be `mod` (qsize pktoOutPQ) mbPkt <- readArray (pktq pktoOutPQ) i pktno <- readTVar pktoPacketNo nextno <- readTVar (seqno pktoInPQ) mbWire <- pktoToWire getExtra nextno be pktno msg case mbWire of Just (pkt,pktno') -> case mbPkt of -- slot is free, insert element Nothing -> do modifyTVar' (buffend pktoOutPQ) (+1) writeTVar pktoPacketNo $! pktno' writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) return OGSuccess -- queue is full Just (n,_) -> do nn <- getHighestHandledPacketPlus1 q if (n < nn) -- but we can overwrite an old packet then do modifyTVar' (buffend pktoOutPQ) (+1) writeTVar pktoPacketNo $! pktno' writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) return OGSuccess -- uh oh this packet is still needed... else return OGFull -- don't know how to send this message Nothing -> return OGEncodeFail dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire) dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do i0 <- readTVar seqno let i = i0 `mod` qsize x <- maybe retry return =<< readArray pktq i -- writeArray pktq i Nothing -- not cleaning modifyTVar' seqno succ return x getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32 getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ)