summaryrefslogtreecommitdiff
path: root/src/Data/PacketQueue.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2017-11-19 22:52:07 +0000
committerJames Crayne <jim.crayne@gmail.com>2017-11-19 23:40:18 +0000
commit8d9abc1df036a8184bc2fd88ddf6f1d621e7e4c1 (patch)
tree920d4c424e1d1358df0f066e4b5e6a256cd96da2 /src/Data/PacketQueue.hs
parent5c34b3bffc286b6cc5010a30c1016355c86359a5 (diff)
Outgoing queue and related
Diffstat (limited to 'src/Data/PacketQueue.hs')
-rw-r--r--src/Data/PacketQueue.hs93
1 files changed, 92 insertions, 1 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index b349bf4b..927d6c53 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -3,12 +3,19 @@
3-- be stored out of order, but from which they are extracted in the proper 3-- be stored out of order, but from which they are extracted in the proper
4-- sequence. 4-- sequence.
5{-# LANGUAGE NamedFieldPuns #-} 5{-# LANGUAGE NamedFieldPuns #-}
6{-# LANGUAGE FlexibleContexts #-}
6module Data.PacketQueue 7module Data.PacketQueue
7 ( PacketQueue 8 ( PacketQueue
8 , new 9 , new
9 , dequeue 10 , dequeue
10 , enqueue 11 , enqueue
11 , observeOutOfBand 12 , observeOutOfBand
13 , PacketOutQueue
14 , newOutGoing
15 , tryAppendQueueOutgoing
16 , dequeueOutgoing
17 , mapOutGoing
18 , OutGoingResult(..)
12 ) where 19 ) where
13 20
14import Control.Concurrent.STM 21import Control.Concurrent.STM
@@ -21,7 +28,7 @@ data PacketQueue a = PacketQueue
21 { pktq :: TArray Word32 (Maybe a) 28 { pktq :: TArray Word32 (Maybe a)
22 , seqno :: TVar Word32 29 , seqno :: TVar Word32
23 , qsize :: Word32 30 , qsize :: Word32
24 , buffend :: TVar Word32 31 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
25 } 32 }
26 33
27-- | Create a new PacketQueue. 34-- | Create a new PacketQueue.
@@ -77,3 +84,87 @@ enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do
77 84
78-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) 85-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
79-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo 86-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo
87
88-----------------------------------------------------
89-- * PacketOutQueue
90--
91
92data PacketOutQueue extra msg toWire fromWire = PacketOutQueue
93 { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue'
94 , pktoOutPQ :: PacketQueue (Word32,toWire)
95 , pktoPacketNo :: TVar Word32
96 , pktoToWireIO :: IO (STM extra)
97 , pktoToWire :: STM extra
98 -> Word32{-packet number we expect to recieve-}
99 -> Word32{- buffer_end -}
100 -> Word32{- packet number -}
101 -> msg
102 -> STM (Maybe (toWire,Word32{-next packet no-}))
103 }
104
105mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM ()
106mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do
107 (z,n) <- getBounds pktq
108 let ff i = do
109 e <- readArray pktq i
110 writeArray pktq i (e>>=f)
111 mapM_ ff [z .. n]
112
113newOutGoing :: PacketQueue fromwire
114 -- ^ Incoming queue
115 -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-})))
116 -- ^ toWire callback
117 -> IO (STM io)
118 -- ^ io action to get extra parameter
119 -> Word32 -- ^ packet number of first outgoing packet
120 -> Word32 -- ^ Capacity of queue.
121 -> Word32 -- ^ Initial sequence number.
122 -> STM (PacketOutQueue io msg wire fromwire)
123newOutGoing inq towire toWireIO num capacity seqstart = do
124 outq <- new capacity seqstart
125 numVar <- newTVar num
126 return $ PacketOutQueue
127 { pktoInPQ = inq
128 , pktoOutPQ = outq
129 , pktoPacketNo = numVar
130 , pktoToWireIO = toWireIO
131 , pktoToWire = towire
132 }
133
134data OutGoingResult = OGSuccess | OGFull | OGEncodeFail
135 deriving (Eq,Show)
136
137-- | Convert a message to packet format and append it to the front of a queue
138-- used for outgoing messages. (Note that ‘front‛ usually means the higher
139-- index in this implementation.)
140tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM OutGoingResult
141tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg = do
142 be <- readTVar (buffend pktoOutPQ)
143 let i = be `mod` (qsize pktoOutPQ)
144 mbPkt <- readArray (pktq pktoOutPQ) i
145 pktno <- readTVar pktoPacketNo
146 nextno <- readTVar (seqno pktoInPQ)
147 mbWire <- pktoToWire getExtra nextno be pktno msg
148 case mbWire of
149 Just (pkt,pktno')
150 -> case mbPkt of
151 -- slot is free, insert element
152 Nothing -> do
153 modifyTVar' (buffend pktoOutPQ) (+1)
154 writeTVar pktoPacketNo $! pktno'
155 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
156 return OGSuccess
157 -- queue is full, block until its not
158 _ -> return OGFull
159 -- don't know how to send this message
160 Nothing -> return OGEncodeFail
161
162dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire)
163dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do
164 i0 <- readTVar seqno
165 let i = i0 `mod` qsize
166 x <- maybe retry return =<< readArray pktq i
167 -- writeArray pktq i Nothing -- not cleaning
168 modifyTVar' seqno succ
169 return x
170