summaryrefslogtreecommitdiff
path: root/dht/src/Data/PacketBuffer.hs
blob: 177456641ada1e20011c6b109742029a1ed1f92d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE DeriveFunctor #-}
module Data.PacketBuffer
    ( PacketBuffer
    , newPacketBuffer
    , PacketOutboundEvent(..)
    , PacketInboundEvent(..)
    , grokOutboundPacket
    , grokInboundPacket
    , awaitReadyPacket
    , packetNumbersToRequest
    , expectingSequenceNumber
    , nextToSendSequenceNumber
    , retrieveForResend
    , decompressSequenceNumbers
    , compressSequenceNumbers
    , pbReport
    ) where

import Data.PacketQueue as Q
import DPut
import DebugTag

import Control.Concurrent.STM
import Control.Monad
import Data.Maybe
import Data.Word

data PacketBuffer a b = PacketBuffer
    { inQueue   :: PacketQueue a
    , outBuffer :: PacketQueue b }

-- | Initialize the packet buffers.  Note, the capacity of the inbound packet
-- queue is currently hardcoded to 200 packets and the capacity of the outbound
-- buffer is hardcoded to 400 packets.
newPacketBuffer :: STM (PacketBuffer a b)
newPacketBuffer = PacketBuffer <$> Q.new 200 0
                               <*> Q.new 400 0

-- | Input for 'grokPacket'.
data PacketOutboundEvent b
    = PacketSent { poSeqNum      :: Word32 -- ^ Sequence number for payload.
                 , poSentPayload :: b      -- ^ Payload packet we sent to them.
                 }
 deriving Functor

data PacketInboundEvent a
    = PacketReceived { peSeqNum          :: Word32 -- ^ Sequence number for payload.
                     , peReceivedPayload :: a      -- ^ Payload packet they sent to us.
                     , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
                     }
    | PacketReceivedLossy { peSeqNum          :: Word32 -- ^ Sequence number for lossy packet.
                          , peReceivedPayload :: a      -- ^ Payload packet they sent to us (ignored).
                          , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
                          }
 deriving Functor

-- | Whenever a packet is received or sent (but not resent) from the network,
-- this function should be called to update the relevant buffers.
--
-- On outgoing packets, if the outbound buffer is full, this will return
-- True.  In this case, the caller may retry to enable blocking until
-- 'grokInboundPacket' is called in another thread.
grokOutboundPacket :: PacketBuffer a b -> PacketOutboundEvent b -> STM (Bool,(Word32,Word32))
grokOutboundPacket (PacketBuffer _   outb) (PacketSent seqno a)
    = do (n,r) <- Q.enqueue outb seqno a
         return (n/=0,(n,r))

grokInboundPacket :: PacketBuffer a b -> PacketInboundEvent a -> STM ()
grokInboundPacket (PacketBuffer inb outb) (PacketReceived seqno a ack)
    = do Q.enqueue inb seqno a
         Q.dropPacketsBefore outb ack
grokInboundPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno _ ack)
    = do Q.observeOutOfBand inb seqno
         Q.dropPacketsBefore outb ack

-- | Wait until an inbound packet is ready to process.  Any necessary
-- re-ordering will have been done.
awaitReadyPacket :: PacketBuffer a b -> STM a
awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb

-- | Obtain a list of sequence numbers that may have been dropped.  This would
-- be any number not yet received that is prior to the maxium sequence number
-- we have received.  For convenience, a lowerbound for the missing squence numbers
-- is also returned as the second item of the pair.
packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32)
packetNumbersToRequest (PacketBuffer inb _) = do
    ns <- Q.getMissing inb
    lb <- Q.getLastDequeuedPlus1 inb
    return (ns,lb)

expectingSequenceNumber :: PacketBuffer a b -> STM Word32
expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb

nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32
nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb

-- | Retrieve already-sent packets by their sequence numbers.  See
-- 'decompressSequenceNumbers' to obtain the sequence number list from a
-- compressed encoding.  There is no need to call 'grokPacket' when sending the
-- packets returned from this call.
retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)]
retrieveForResend (PacketBuffer _ outb) seqnos =
    catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no)

-- | Expand a compressed set of sequence numbers.  The first sequence number is
-- given directly and the rest are computed using 8-bit offsets.  This is
-- normally used to obtain input for 'retrieveForResend'.
decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32]
decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1)
    where
        doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32]
        doOne 0 f addend = f (addend + 255)
        doOne x f addend = let y = fromIntegral x + addend
                           in y : f y

compressSequenceNumbers :: Word32 -> [Word32] -> [Word8]
compressSequenceNumbers baseno xs = foldr doOne (const []) xs (baseno-1)
    where
        doOne :: Word32 -> (Word32 -> [Word8]) -> Word32 -> [Word8]
        doOne y f addend = case y - addend of
            x | x < 255   -> fromIntegral x : f y
              | otherwise -> 0 : doOne y f (addend + 255)

{-
compressSequenceNumbers :: Word32 -> [Word32] -> [Word8]
compressSequenceNumbers seqno xs = let r = map fromIntegral (reduceToSums ys >>= makeZeroes)
                                   in dtrace XNetCrypto ("compressSequenceNumbers " ++ show seqno ++ show xs ++ " --> "++show r) r
    where
        ys = Prelude.map (subtract (seqno - 1)) xs
        reduceToSums [] = []
        reduceToSums (x:xs) = x:(reduceToSums $ Prelude.map (subtract x) xs)
        makeZeroes :: Word32 -> [Word32]
    --  makeZeroes 0 = []
        makeZeroes x
            = let (d,m)= x `divMod` 255
                  zeros= Prelude.replicate (fromIntegral d) 0
                  in zeros ++ [m]
-}

pbReport :: String -> PacketBuffer a b -> STM String
pbReport what (PacketBuffer inb outb) = do
    inb_seqno <- getLastDequeuedPlus1 inb
    inb_buffend <- getLastEnqueuedPlus1 inb
    outb_seqno <- getLastDequeuedPlus1 outb
    outb_bufend <- getLastEnqueuedPlus1 outb
    return $ "PacketBuffer<"++what++"> Inbound" ++ show (inb_seqno, inb_buffend)
                                  ++" Outbound" ++ show (outb_seqno, outb_bufend)