1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
-- | This module is useful for implementing a lossess protocol on top of a
-- lossy datagram style protocol. It implements a buffer in which packets may
-- be stored out of order, but from which they are extracted in the proper
-- sequence.
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE FlexibleContexts #-}
module Data.PacketQueue
( PacketQueue
, new
, dequeue
, enqueue
, observeOutOfBand
, PacketOutQueue
, newOutGoing
, tryAppendQueueOutgoing
, dequeueOutgoing
, mapOutGoing
, OutGoingResult(..)
) where
import Control.Concurrent.STM
import Control.Concurrent.STM.TArray
import Control.Monad
import Data.Word
import Data.Array.MArray
data PacketQueue a = PacketQueue
{ pktq :: TArray Word32 (Maybe a)
, seqno :: TVar Word32
, qsize :: Word32
, buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
}
-- | Create a new PacketQueue.
new :: Word32 -- ^ Capacity of queue.
-> Word32 -- ^ Initial sequence number.
-> STM (PacketQueue a)
new capacity seqstart = do
let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
q <- newArray (0,cap - 1) Nothing
seqv <- newTVar seqstart
bufe <- newTVar 0
return PacketQueue
{ pktq = q
, seqno = seqv
, qsize = cap
, buffend = bufe
}
observeOutOfBand :: PacketQueue a -> Word32-> STM ()
observeOutOfBand PacketQueue { seqno, qsize, buffend } no = do
low <- readTVar seqno
let proj = no - low
-- Ignore packet if out of range.
when ( proj < qsize) $ do
modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
-- | Retry until the next expected packet is enqueued. Then return it.
dequeue :: PacketQueue a -> STM a
dequeue PacketQueue { pktq, seqno, qsize } = do
i0 <- readTVar seqno
let i = i0 `mod` qsize
x <- maybe retry return =<< readArray pktq i
writeArray pktq i Nothing
modifyTVar' seqno succ
return x
-- | Enqueue a packet. Packets need not be enqueued in order as long as there
-- is spare capacity in the queue. If there is not, the packet will be
-- silently discarded without blocking.
enqueue :: PacketQueue a -- ^ The packet queue.
-> Word32 -- ^ Sequence number of the packet.
-> a -- ^ The packet.
-> STM ()
enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do
low <- readTVar seqno
let proj = no - low
-- Ignore packet if out of range.
when ( proj < qsize) $ do
let i = no `mod` qsize
writeArray pktq i (Just x)
modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo
-----------------------------------------------------
-- * PacketOutQueue
--
data PacketOutQueue extra msg toWire fromWire = PacketOutQueue
{ pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue'
, pktoOutPQ :: PacketQueue (Word32,toWire)
, pktoPacketNo :: TVar Word32
, pktoToWireIO :: IO (STM extra)
, pktoToWire :: STM extra
-> Word32{-packet number we expect to recieve-}
-> Word32{- buffer_end -}
-> Word32{- packet number -}
-> msg
-> STM (Maybe (toWire,Word32{-next packet no-}))
}
mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM ()
mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do
(z,n) <- getBounds pktq
let ff i = do
e <- readArray pktq i
writeArray pktq i (e>>=f)
mapM_ ff [z .. n]
newOutGoing :: PacketQueue fromwire
-- ^ Incoming queue
-> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-})))
-- ^ toWire callback
-> IO (STM io)
-- ^ io action to get extra parameter
-> Word32 -- ^ packet number of first outgoing packet
-> Word32 -- ^ Capacity of queue.
-> Word32 -- ^ Initial sequence number.
-> STM (PacketOutQueue io msg wire fromwire)
newOutGoing inq towire toWireIO num capacity seqstart = do
outq <- new capacity seqstart
numVar <- newTVar num
return $ PacketOutQueue
{ pktoInPQ = inq
, pktoOutPQ = outq
, pktoPacketNo = numVar
, pktoToWireIO = toWireIO
, pktoToWire = towire
}
data OutGoingResult = OGSuccess | OGFull | OGEncodeFail
deriving (Eq,Show)
-- | Convert a message to packet format and append it to the front of a queue
-- used for outgoing messages. (Note that ‘front‛ usually means the higher
-- index in this implementation.)
tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM OutGoingResult
tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg = do
be <- readTVar (buffend pktoOutPQ)
let i = be `mod` (qsize pktoOutPQ)
mbPkt <- readArray (pktq pktoOutPQ) i
pktno <- readTVar pktoPacketNo
nextno <- readTVar (seqno pktoInPQ)
mbWire <- pktoToWire getExtra nextno be pktno msg
case mbWire of
Just (pkt,pktno')
-> case mbPkt of
-- slot is free, insert element
Nothing -> do
modifyTVar' (buffend pktoOutPQ) (+1)
writeTVar pktoPacketNo $! pktno'
writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
return OGSuccess
-- queue is full, block until its not
_ -> return OGFull
-- don't know how to send this message
Nothing -> return OGEncodeFail
dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire)
dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do
i0 <- readTVar seqno
let i = i0 `mod` qsize
x <- maybe retry return =<< readArray pktq i
-- writeArray pktq i Nothing -- not cleaning
modifyTVar' seqno succ
return x
|