1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
-- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable
-- transport over an underlying lossy one.
--
-- It was written to be a helper to 'Network.Tox.Session' but it is
-- representation-agnostic and so could potentially be used on an unrelated
-- lossy transport.
{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TupleSections #-}
module Network.Lossless where
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Monad
import Control.Monad.STM
import Data.Function
import Data.Word
import System.IO.Error
import Data.PacketBuffer as PB
import DPut
import DebugTag
import Network.QueryResponse
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
#endif
-- | Sequencing information for a packet.
data SequenceInfo = SequenceInfo
{ sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'.
, sequenceAck :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet.
}
deriving (Eq,Ord,Show)
data OutgoingInfo y = OutgoingInfo
{ oIsLossy :: Bool -- ^ True if the packet is treated as lossy.
, oEncoded :: y -- ^ The packet.
, oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure.
}
-- | Obtain a reliable transport form an unreliable one.
lossless :: Show addr =>
String -- ^ Label for debugging.
-> (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets.
-> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets.
-> addr -- ^ The remote address for this session.
-> TransportA String addr x y -- ^ An unreliable lossy transport.
-> IO ( Transport String addr' x' -- A reliable lossless transport.
, [Word32] -> IO () -- Use this to request lost packets be re-sent.
, IO ([Word32],Word32) -- Use this to discover missing packets to request.
)
lossless lbl isLossless encode saddr udp = do
pb <- atomically newPacketBuffer
oob <- atomically newTChan -- Out-of-band channel, these packets (or
-- errors) bypass the packet buffer to be
-- received immediately.
rloop <- forkIO $ do
-- This thread enqueues inbound packets or writes them to the oob
-- channel.
fix $ \loop -> do
(m,io) <- atomically $ awaitMessage udp
io
m' <- case m of Terminated -> return Nothing
ParseError e -> return $ Just (Left e)
Arrival a x -> Just . Right <$> isLossless x a
case m' of
Nothing -> do
atomically $ writeTChan oob Terminated
-- Quit thread here.
Just (Left e) -> do
atomically $ writeTChan oob (ParseError e)
loop
Just (Right event) -> do
atomically $ do
-- x' <- isLossless xaddr x
PB.grokInboundPacket pb event
case event of
PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event)
_ -> do
report <- pbReport "enqueued" pb
writeTChan oob (ParseError report)
loop
labelThread rloop ("lossless."++lbl)
term <- newTVarIO retry -- In case awaitMessage is called multiple times beyond termination,
-- we will use this STM action stop it from waiting on the oob TChan.
-- XXX: This shouldn't be neccessary and might be costly.
let tr = Transport
{ awaitMessage =
orElse
(do x <- readTChan oob `orElse` join (readTVar term)
return (x, return ()))
(do x <- PB.awaitReadyPacket pb
report <- pbReport "dequeued" pb
return $ (,) (uncurry (flip Arrival) x) $ do
atomically $ writeTChan oob (ParseError report))
, sendMessage = \a' x' -> do
seqno <- atomically $ do
seqno <- PB.nextToSendSequenceNumber pb
ack <- PB.expectingSequenceNumber pb
return $ SequenceInfo seqno ack
OutgoingInfo islossy x oops <- encode seqno x' a'
(isfull,nn) <-
if islossy
then do
dput XNetCrypto $ mappend lbl $ " <-- Lossy packet " ++ show seqno
return (False,(0,0)) -- avoid updating seqno on lossy packets.
else do
dput XNetCrypto $ mappend lbl $ " <-- Lossless packet " ++ show seqno
atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
when isfull $ do
dput XNetCrypto $ mappend lbl $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
atomically $ do
(isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
when isfull retry
let sendit = sendMessage udp saddr x
maybe sendit (catchIOError sendit) oops
, setActive = \case
False -> do
atomically $ do
writeTChan oob Terminated -- quit rloop thread
writeTVar term (return Terminated)
setActive udp False
True -> return ()
}
resend ns = do
xs <- atomically $ retrieveForResend pb ns
dput XNetCrypto $ mappend lbl $ " <-- Resending " ++ show (length xs) ++ " packets."
forM_ xs $ \x -> do
dput XNetCrypto $ mappend lbl $ " <-- Resending packet."
sendMessage udp saddr . snd $ x
return (tr, resend, atomically $ PB.packetNumbersToRequest pb)
|