diff options
author | James Crayne <jim.crayne@gmail.com> | 2017-11-19 22:52:07 +0000 |
---|---|---|
committer | James Crayne <jim.crayne@gmail.com> | 2017-11-19 23:40:18 +0000 |
commit | 8d9abc1df036a8184bc2fd88ddf6f1d621e7e4c1 (patch) | |
tree | 920d4c424e1d1358df0f066e4b5e6a256cd96da2 /src/Data | |
parent | 5c34b3bffc286b6cc5010a30c1016355c86359a5 (diff) |
Outgoing queue and related
Diffstat (limited to 'src/Data')
-rw-r--r-- | src/Data/PacketQueue.hs | 93 |
1 files changed, 92 insertions, 1 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index b349bf4b..927d6c53 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -3,12 +3,19 @@ | |||
3 | -- be stored out of order, but from which they are extracted in the proper | 3 | -- be stored out of order, but from which they are extracted in the proper |
4 | -- sequence. | 4 | -- sequence. |
5 | {-# LANGUAGE NamedFieldPuns #-} | 5 | {-# LANGUAGE NamedFieldPuns #-} |
6 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | module Data.PacketQueue | 7 | module Data.PacketQueue |
7 | ( PacketQueue | 8 | ( PacketQueue |
8 | , new | 9 | , new |
9 | , dequeue | 10 | , dequeue |
10 | , enqueue | 11 | , enqueue |
11 | , observeOutOfBand | 12 | , observeOutOfBand |
13 | , PacketOutQueue | ||
14 | , newOutGoing | ||
15 | , tryAppendQueueOutgoing | ||
16 | , dequeueOutgoing | ||
17 | , mapOutGoing | ||
18 | , OutGoingResult(..) | ||
12 | ) where | 19 | ) where |
13 | 20 | ||
14 | import Control.Concurrent.STM | 21 | import Control.Concurrent.STM |
@@ -21,7 +28,7 @@ data PacketQueue a = PacketQueue | |||
21 | { pktq :: TArray Word32 (Maybe a) | 28 | { pktq :: TArray Word32 (Maybe a) |
22 | , seqno :: TVar Word32 | 29 | , seqno :: TVar Word32 |
23 | , qsize :: Word32 | 30 | , qsize :: Word32 |
24 | , buffend :: TVar Word32 | 31 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 |
25 | } | 32 | } |
26 | 33 | ||
27 | -- | Create a new PacketQueue. | 34 | -- | Create a new PacketQueue. |
@@ -77,3 +84,87 @@ enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do | |||
77 | 84 | ||
78 | -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) | 85 | -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) |
79 | -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo | 86 | -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo |
87 | |||
88 | ----------------------------------------------------- | ||
89 | -- * PacketOutQueue | ||
90 | -- | ||
91 | |||
92 | data PacketOutQueue extra msg toWire fromWire = PacketOutQueue | ||
93 | { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' | ||
94 | , pktoOutPQ :: PacketQueue (Word32,toWire) | ||
95 | , pktoPacketNo :: TVar Word32 | ||
96 | , pktoToWireIO :: IO (STM extra) | ||
97 | , pktoToWire :: STM extra | ||
98 | -> Word32{-packet number we expect to recieve-} | ||
99 | -> Word32{- buffer_end -} | ||
100 | -> Word32{- packet number -} | ||
101 | -> msg | ||
102 | -> STM (Maybe (toWire,Word32{-next packet no-})) | ||
103 | } | ||
104 | |||
105 | mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM () | ||
106 | mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do | ||
107 | (z,n) <- getBounds pktq | ||
108 | let ff i = do | ||
109 | e <- readArray pktq i | ||
110 | writeArray pktq i (e>>=f) | ||
111 | mapM_ ff [z .. n] | ||
112 | |||
113 | newOutGoing :: PacketQueue fromwire | ||
114 | -- ^ Incoming queue | ||
115 | -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-}))) | ||
116 | -- ^ toWire callback | ||
117 | -> IO (STM io) | ||
118 | -- ^ io action to get extra parameter | ||
119 | -> Word32 -- ^ packet number of first outgoing packet | ||
120 | -> Word32 -- ^ Capacity of queue. | ||
121 | -> Word32 -- ^ Initial sequence number. | ||
122 | -> STM (PacketOutQueue io msg wire fromwire) | ||
123 | newOutGoing inq towire toWireIO num capacity seqstart = do | ||
124 | outq <- new capacity seqstart | ||
125 | numVar <- newTVar num | ||
126 | return $ PacketOutQueue | ||
127 | { pktoInPQ = inq | ||
128 | , pktoOutPQ = outq | ||
129 | , pktoPacketNo = numVar | ||
130 | , pktoToWireIO = toWireIO | ||
131 | , pktoToWire = towire | ||
132 | } | ||
133 | |||
134 | data OutGoingResult = OGSuccess | OGFull | OGEncodeFail | ||
135 | deriving (Eq,Show) | ||
136 | |||
137 | -- | Convert a message to packet format and append it to the front of a queue | ||
138 | -- used for outgoing messages. (Note that ‘front‛ usually means the higher | ||
139 | -- index in this implementation.) | ||
140 | tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM OutGoingResult | ||
141 | tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg = do | ||
142 | be <- readTVar (buffend pktoOutPQ) | ||
143 | let i = be `mod` (qsize pktoOutPQ) | ||
144 | mbPkt <- readArray (pktq pktoOutPQ) i | ||
145 | pktno <- readTVar pktoPacketNo | ||
146 | nextno <- readTVar (seqno pktoInPQ) | ||
147 | mbWire <- pktoToWire getExtra nextno be pktno msg | ||
148 | case mbWire of | ||
149 | Just (pkt,pktno') | ||
150 | -> case mbPkt of | ||
151 | -- slot is free, insert element | ||
152 | Nothing -> do | ||
153 | modifyTVar' (buffend pktoOutPQ) (+1) | ||
154 | writeTVar pktoPacketNo $! pktno' | ||
155 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) | ||
156 | return OGSuccess | ||
157 | -- queue is full, block until its not | ||
158 | _ -> return OGFull | ||
159 | -- don't know how to send this message | ||
160 | Nothing -> return OGEncodeFail | ||
161 | |||
162 | dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire) | ||
163 | dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do | ||
164 | i0 <- readTVar seqno | ||
165 | let i = i0 `mod` qsize | ||
166 | x <- maybe retry return =<< readArray pktq i | ||
167 | -- writeArray pktq i Nothing -- not cleaning | ||
168 | modifyTVar' seqno succ | ||
169 | return x | ||
170 | |||