summaryrefslogtreecommitdiff
path: root/dht/src/Data/PacketBuffer.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/src/Data/PacketBuffer.hs
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/src/Data/PacketBuffer.hs')
-rw-r--r--dht/src/Data/PacketBuffer.hs148
1 files changed, 148 insertions, 0 deletions
diff --git a/dht/src/Data/PacketBuffer.hs b/dht/src/Data/PacketBuffer.hs
new file mode 100644
index 00000000..17745664
--- /dev/null
+++ b/dht/src/Data/PacketBuffer.hs
@@ -0,0 +1,148 @@
1{-# LANGUAGE TupleSections #-}
2{-# LANGUAGE DeriveFunctor #-}
3module Data.PacketBuffer
4 ( PacketBuffer
5 , newPacketBuffer
6 , PacketOutboundEvent(..)
7 , PacketInboundEvent(..)
8 , grokOutboundPacket
9 , grokInboundPacket
10 , awaitReadyPacket
11 , packetNumbersToRequest
12 , expectingSequenceNumber
13 , nextToSendSequenceNumber
14 , retrieveForResend
15 , decompressSequenceNumbers
16 , compressSequenceNumbers
17 , pbReport
18 ) where
19
20import Data.PacketQueue as Q
21import DPut
22import DebugTag
23
24import Control.Concurrent.STM
25import Control.Monad
26import Data.Maybe
27import Data.Word
28
29data PacketBuffer a b = PacketBuffer
30 { inQueue :: PacketQueue a
31 , outBuffer :: PacketQueue b }
32
33-- | Initialize the packet buffers. Note, the capacity of the inbound packet
34-- queue is currently hardcoded to 200 packets and the capacity of the outbound
35-- buffer is hardcoded to 400 packets.
36newPacketBuffer :: STM (PacketBuffer a b)
37newPacketBuffer = PacketBuffer <$> Q.new 200 0
38 <*> Q.new 400 0
39
40-- | Input for 'grokPacket'.
41data PacketOutboundEvent b
42 = PacketSent { poSeqNum :: Word32 -- ^ Sequence number for payload.
43 , poSentPayload :: b -- ^ Payload packet we sent to them.
44 }
45 deriving Functor
46
47data PacketInboundEvent a
48 = PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload.
49 , peReceivedPayload :: a -- ^ Payload packet they sent to us.
50 , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
51 }
52 | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet.
53 , peReceivedPayload :: a -- ^ Payload packet they sent to us (ignored).
54 , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
55 }
56 deriving Functor
57
58-- | Whenever a packet is received or sent (but not resent) from the network,
59-- this function should be called to update the relevant buffers.
60--
61-- On outgoing packets, if the outbound buffer is full, this will return
62-- True. In this case, the caller may retry to enable blocking until
63-- 'grokInboundPacket' is called in another thread.
64grokOutboundPacket :: PacketBuffer a b -> PacketOutboundEvent b -> STM (Bool,(Word32,Word32))
65grokOutboundPacket (PacketBuffer _ outb) (PacketSent seqno a)
66 = do (n,r) <- Q.enqueue outb seqno a
67 return (n/=0,(n,r))
68
69grokInboundPacket :: PacketBuffer a b -> PacketInboundEvent a -> STM ()
70grokInboundPacket (PacketBuffer inb outb) (PacketReceived seqno a ack)
71 = do Q.enqueue inb seqno a
72 Q.dropPacketsBefore outb ack
73grokInboundPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno _ ack)
74 = do Q.observeOutOfBand inb seqno
75 Q.dropPacketsBefore outb ack
76
77-- | Wait until an inbound packet is ready to process. Any necessary
78-- re-ordering will have been done.
79awaitReadyPacket :: PacketBuffer a b -> STM a
80awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb
81
82-- | Obtain a list of sequence numbers that may have been dropped. This would
83-- be any number not yet received that is prior to the maxium sequence number
84-- we have received. For convenience, a lowerbound for the missing squence numbers
85-- is also returned as the second item of the pair.
86packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32)
87packetNumbersToRequest (PacketBuffer inb _) = do
88 ns <- Q.getMissing inb
89 lb <- Q.getLastDequeuedPlus1 inb
90 return (ns,lb)
91
92expectingSequenceNumber :: PacketBuffer a b -> STM Word32
93expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb
94
95nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32
96nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb
97
98-- | Retrieve already-sent packets by their sequence numbers. See
99-- 'decompressSequenceNumbers' to obtain the sequence number list from a
100-- compressed encoding. There is no need to call 'grokPacket' when sending the
101-- packets returned from this call.
102retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)]
103retrieveForResend (PacketBuffer _ outb) seqnos =
104 catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no)
105
106-- | Expand a compressed set of sequence numbers. The first sequence number is
107-- given directly and the rest are computed using 8-bit offsets. This is
108-- normally used to obtain input for 'retrieveForResend'.
109decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32]
110decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1)
111 where
112 doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32]
113 doOne 0 f addend = f (addend + 255)
114 doOne x f addend = let y = fromIntegral x + addend
115 in y : f y
116
117compressSequenceNumbers :: Word32 -> [Word32] -> [Word8]
118compressSequenceNumbers baseno xs = foldr doOne (const []) xs (baseno-1)
119 where
120 doOne :: Word32 -> (Word32 -> [Word8]) -> Word32 -> [Word8]
121 doOne y f addend = case y - addend of
122 x | x < 255 -> fromIntegral x : f y
123 | otherwise -> 0 : doOne y f (addend + 255)
124
125{-
126compressSequenceNumbers :: Word32 -> [Word32] -> [Word8]
127compressSequenceNumbers seqno xs = let r = map fromIntegral (reduceToSums ys >>= makeZeroes)
128 in dtrace XNetCrypto ("compressSequenceNumbers " ++ show seqno ++ show xs ++ " --> "++show r) r
129 where
130 ys = Prelude.map (subtract (seqno - 1)) xs
131 reduceToSums [] = []
132 reduceToSums (x:xs) = x:(reduceToSums $ Prelude.map (subtract x) xs)
133 makeZeroes :: Word32 -> [Word32]
134 -- makeZeroes 0 = []
135 makeZeroes x
136 = let (d,m)= x `divMod` 255
137 zeros= Prelude.replicate (fromIntegral d) 0
138 in zeros ++ [m]
139-}
140
141pbReport :: String -> PacketBuffer a b -> STM String
142pbReport what (PacketBuffer inb outb) = do
143 inb_seqno <- getLastDequeuedPlus1 inb
144 inb_buffend <- getLastEnqueuedPlus1 inb
145 outb_seqno <- getLastDequeuedPlus1 outb
146 outb_bufend <- getLastEnqueuedPlus1 outb
147 return $ "PacketBuffer<"++what++"> Inbound" ++ show (inb_seqno, inb_buffend)
148 ++" Outbound" ++ show (outb_seqno, outb_bufend)