blob: 45241b6d914b5a4f3a1fbf33df5276683f69d48d (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TupleSections #-}
module Network.Lossless where
import Control.Concurrent.STM.TChan
import Control.Monad
import Control.Monad.STM
import Data.Function
import Data.Word
import Data.PacketBuffer as PB
import Network.QueryResponse
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
#endif
data SequenceInfo = SequenceInfo
{ sequenceNumber :: {-# UNPACK #-} !Word32
, sequenceAck :: {-# UNPACK #-} !Word32
}
deriving (Eq,Ord,Show)
lossless :: (x -> addr -> IO (PacketInboundEvent (x',addr')))
-> (SequenceInfo -> x' -> addr' -> IO y)
-> addr
-> TransportA err addr x y
-> IO ( Transport err addr' x'
, [Word32] -> IO ()
, IO ([Word32],Word32)
)
lossless isLossless encode saddr udp = do
pb <- atomically newPacketBuffer
oob <- atomically newTChan -- Out-of-band channel, these packets (or
-- errors) bypass the packet buffer to be
-- received immediately.
rloop <- forkIO $ fix $ \loop -> do
-- This thread enqueues inbound packets or writes them to the oob
-- channel.
awaitMessage udp $ \m -> do
forM_ m $ \raw -> do
m' <- mapM (uncurry isLossless) raw
case m' of
Left e -> do
atomically $ writeTChan oob (Left e)
loop
Right event -> do
atomically $ do
-- x' <- isLossless xaddr x
PB.grokInboundPacket pb event
case event of
PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event)
_ -> return ()
loop
let tr = Transport
{ awaitMessage = \kont -> do
join $ atomically $ orElse
(do x <- readTChan oob
return $ kont $! Just x)
(do x <- PB.awaitReadyPacket pb
return $ kont $! Just (Right x))
, sendMessage = \a' x' -> do
seqno <- atomically $ do
seqno <- PB.nextToSendSequenceNumber pb
ack <- PB.expectingSequenceNumber pb
return $ SequenceInfo seqno ack
x <- encode seqno x' a'
atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
sendMessage udp saddr x
, closeTransport = do
killThread rloop
closeTransport udp
}
resend ns = do
xs <- atomically $ retrieveForResend pb ns
mapM_ (sendMessage udp saddr . snd) xs
return (tr, resend, atomically $ PB.packetNumbersToRequest pb)
|