1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
-- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable
-- transport over an underlying lossy one.
--
-- It was written to be a helper to 'Network.Tox.Session' but it is
-- representation-agnostic and so could potentially be used on an unrelated
-- lossy transport.
{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TupleSections #-}
module Network.Lossless where
import Control.Concurrent.STM.TChan
import Control.Monad
import Control.Monad.STM
import Data.Function
import Data.Word
import System.IO.Error
import Data.PacketBuffer as PB
import DPut
import DebugTag
import Network.QueryResponse
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
#endif
-- | Sequencing information for a packet.
data SequenceInfo = SequenceInfo
{ sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'.
, sequenceAck :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet.
}
deriving (Eq,Ord,Show)
data OutgoingInfo y = OutgoingInfo
{ oIsLossy :: Bool -- ^ True if the packet is treated as lossy.
, oEncoded :: y -- ^ The packet.
, oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure.
}
-- | Obtain a reliable transport form an unreliable one.
lossless :: Show addr =>
(x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets.
-> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets.
-> addr -- ^ The remote address for this session.
-> TransportA String addr x y -- ^ An unreliable lossy transport.
-> IO ( Transport String addr' x' -- ^ A reliable lossless transport.
, [Word32] -> IO () -- ^ Use this to request lost packets be re-sent.
, IO ([Word32],Word32) -- ^ Use this to discover missing packets to request.
)
lossless isLossless encode saddr udp = do
pb <- atomically newPacketBuffer
oob <- atomically newTChan -- Out-of-band channel, these packets (or
-- errors) bypass the packet buffer to be
-- received immediately.
rloop <- forkIO $ do
-- This thread enqueues inbound packets or writes them to the oob
-- channel.
myThreadId >>= flip labelThread ("lossless."++show saddr)
fix $ \loop -> do
awaitMessage udp $ \m -> do
m' <- mapM (mapM $ uncurry isLossless) m
case m' of
Nothing -> do
atomically $ writeTChan oob Nothing
-- Quit thread here.
Just (Left e) -> do
atomically $ writeTChan oob (Just $ Left e)
loop
Just (Right event) -> do
atomically $ do
-- x' <- isLossless xaddr x
PB.grokInboundPacket pb event
case event of
PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event)
_ -> do
report <- pbReport "enqueued" pb
writeTChan oob (Just $ Left report)
loop
let tr = Transport
{ awaitMessage = \kont -> do
join $ atomically $ orElse
(do x <- readTChan oob
return $ kont $! x)
(do x <- PB.awaitReadyPacket pb
report <- pbReport "dequeued" pb
return $ do
atomically $ writeTChan oob (Just $ Left report)
kont $! Just (Right x))
, sendMessage = \a' x' -> do
seqno <- atomically $ do
seqno <- PB.nextToSendSequenceNumber pb
ack <- PB.expectingSequenceNumber pb
return $ SequenceInfo seqno ack
OutgoingInfo islossy x oops <- encode seqno x' a'
(isfull,nn) <-
if islossy
then do
dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno
return (False,(0,0)) -- avoid updating seqno on lossy packets.
else do
dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno
atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
when isfull $ do
dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
atomically $ do
(isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
when isfull retry
let sendit = sendMessage udp saddr x
maybe sendit (catchIOError sendit) oops
, closeTransport = do
atomically $ writeTChan oob Nothing -- quit rloop thread
closeTransport udp
}
resend ns = do
xs <- atomically $ retrieveForResend pb ns
dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets."
forM_ xs $ \x -> do
dput XNetCrypto $ shows saddr $ " <-- Resending packet."
sendMessage udp saddr . snd $ x
return (tr, resend, atomically $ PB.packetNumbersToRequest pb)
|