From 8d9abc1df036a8184bc2fd88ddf6f1d621e7e4c1 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sun, 19 Nov 2017 22:52:07 +0000 Subject: Outgoing queue and related --- src/Data/PacketQueue.hs | 93 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) (limited to 'src/Data/PacketQueue.hs') diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index b349bf4b..927d6c53 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs @@ -3,12 +3,19 @@ -- be stored out of order, but from which they are extracted in the proper -- sequence. {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE FlexibleContexts #-} module Data.PacketQueue ( PacketQueue , new , dequeue , enqueue , observeOutOfBand + , PacketOutQueue + , newOutGoing + , tryAppendQueueOutgoing + , dequeueOutgoing + , mapOutGoing + , OutGoingResult(..) ) where import Control.Concurrent.STM @@ -21,7 +28,7 @@ data PacketQueue a = PacketQueue { pktq :: TArray Word32 (Maybe a) , seqno :: TVar Word32 , qsize :: Word32 - , buffend :: TVar Word32 + , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 } -- | Create a new PacketQueue. @@ -77,3 +84,87 @@ enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo + +----------------------------------------------------- +-- * PacketOutQueue +-- + +data PacketOutQueue extra msg toWire fromWire = PacketOutQueue + { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' + , pktoOutPQ :: PacketQueue (Word32,toWire) + , pktoPacketNo :: TVar Word32 + , pktoToWireIO :: IO (STM extra) + , pktoToWire :: STM extra + -> Word32{-packet number we expect to recieve-} + -> Word32{- buffer_end -} + -> Word32{- packet number -} + -> msg + -> STM (Maybe (toWire,Word32{-next packet no-})) + } + +mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM () +mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do + (z,n) <- getBounds pktq + let ff i = do + e <- readArray pktq i + writeArray pktq i (e>>=f) + mapM_ ff [z .. n] + +newOutGoing :: PacketQueue fromwire + -- ^ Incoming queue + -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-}))) + -- ^ toWire callback + -> IO (STM io) + -- ^ io action to get extra parameter + -> Word32 -- ^ packet number of first outgoing packet + -> Word32 -- ^ Capacity of queue. + -> Word32 -- ^ Initial sequence number. + -> STM (PacketOutQueue io msg wire fromwire) +newOutGoing inq towire toWireIO num capacity seqstart = do + outq <- new capacity seqstart + numVar <- newTVar num + return $ PacketOutQueue + { pktoInPQ = inq + , pktoOutPQ = outq + , pktoPacketNo = numVar + , pktoToWireIO = toWireIO + , pktoToWire = towire + } + +data OutGoingResult = OGSuccess | OGFull | OGEncodeFail + deriving (Eq,Show) + +-- | Convert a message to packet format and append it to the front of a queue +-- used for outgoing messages. (Note that ‘front‛ usually means the higher +-- index in this implementation.) +tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM OutGoingResult +tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg = do + be <- readTVar (buffend pktoOutPQ) + let i = be `mod` (qsize pktoOutPQ) + mbPkt <- readArray (pktq pktoOutPQ) i + pktno <- readTVar pktoPacketNo + nextno <- readTVar (seqno pktoInPQ) + mbWire <- pktoToWire getExtra nextno be pktno msg + case mbWire of + Just (pkt,pktno') + -> case mbPkt of + -- slot is free, insert element + Nothing -> do + modifyTVar' (buffend pktoOutPQ) (+1) + writeTVar pktoPacketNo $! pktno' + writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) + return OGSuccess + -- queue is full, block until its not + _ -> return OGFull + -- don't know how to send this message + Nothing -> return OGEncodeFail + +dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire) +dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do + i0 <- readTVar seqno + let i = i0 `mod` qsize + x <- maybe retry return =<< readArray pktq i + -- writeArray pktq i Nothing -- not cleaning + modifyTVar' seqno succ + return x + -- cgit v1.2.3