summaryrefslogtreecommitdiff
path: root/dht/src/Network/Lossless.hs
blob: 41203ca5ccfbac7dbef8fd686600c368c5fc3713 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
-- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable
-- transport over an underlying lossy one.
--
-- It was written to be a helper to 'Network.Tox.Session' but it is
-- representation-agnostic and so could potentially be used on an unrelated
-- lossy transport.
{-# LANGUAGE CPP           #-}
{-# LANGUAGE LambdaCase    #-}
{-# LANGUAGE TupleSections #-}
module Network.Lossless where

import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Monad
import Control.Monad.STM
import Data.Function
import Data.Word
import System.IO.Error

import Data.PacketBuffer as PB
import DPut
import DebugTag
import Network.QueryResponse

#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
#endif

-- | Sequencing information for a packet.
data SequenceInfo = SequenceInfo
    { sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'.
    , sequenceAck    :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet.
    }
 deriving (Eq,Ord,Show)

data OutgoingInfo y = OutgoingInfo
    { oIsLossy         :: Bool                     -- ^ True if the packet is treated as lossy.
    , oEncoded         :: y                        -- ^ The packet.
    , oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure.
    }

-- | Obtain a reliable transport form an unreliable one.
lossless :: Show addr =>
    String -- ^ Label for debugging.
    -> (x -> addr -> IO (PacketInboundEvent (x',addr')))    -- ^ Used to classify newly arrived packets.
    -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets.
    -> addr                                                 -- ^ The remote address for this session.
    -> TransportA String addr x y                           -- ^ An unreliable lossy transport.

    -> IO ( Transport String addr' x' -- A reliable lossless transport.
          , [Word32] -> IO ()         -- Use this to request lost packets be re-sent.
          , IO ([Word32],Word32)      -- Use this to discover missing packets to request.
          )
lossless lbl isLossless encode saddr udp = do
    pb <- atomically newPacketBuffer
    oob <- atomically newTChan -- Out-of-band channel, these packets (or
                               -- errors) bypass the packet buffer to be
                               -- received immediately.
    rloop <- forkIO $ do
        -- This thread enqueues inbound packets or writes them to the oob
        -- channel.
        fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do
            m' <- case m of Terminated   -> return Nothing
                            ParseError e -> return $ Just (Left e)
                            Arrival a x  -> Just . Right <$> isLossless x a
            case m' of
                Nothing -> do
                    atomically $ writeTChan oob Terminated
                    -- Quit thread here.
                Just (Left e) -> do
                    atomically $ writeTChan oob (ParseError e)
                    loop
                Just (Right event) -> do
                    atomically $ do
                        -- x' <- isLossless xaddr x
                        PB.grokInboundPacket pb event
                        case event of
                            PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event)
                            _                      -> do
                                report <- pbReport "enqueued" pb
                                writeTChan oob (ParseError report)
                    loop
    labelThread rloop ("lossless."++lbl)
    term <- newTVarIO retry -- In case awaitMessage is called multiple times beyond termination,
                            -- we will use this STM action stop it from waiting on the oob TChan.
                            -- XXX: This shouldn't be neccessary and might be costly.
    let tr = Transport
            { awaitMessage = \kont ->
                orElse
                    (do x <- readTChan oob `orElse` join (readTVar term)
                        return $ kont $! x)
                    (do x <- PB.awaitReadyPacket pb
                        report <- pbReport "dequeued" pb
                        return $ do
                            atomically $ writeTChan oob (ParseError report)
                            kont $! uncurry (flip Arrival) x)
            , sendMessage = \a' x' -> do
                seqno <- atomically $ do
                    seqno <- PB.nextToSendSequenceNumber pb
                    ack   <- PB.expectingSequenceNumber pb
                    return $ SequenceInfo seqno ack
                OutgoingInfo islossy x oops <- encode seqno x' a'
                (isfull,nn) <-
                    if islossy
                        then do
                            dput XNetCrypto $ mappend lbl $ " <-- Lossy packet " ++ show seqno
                            return (False,(0,0)) -- avoid updating seqno on lossy packets.
                        else do
                            dput XNetCrypto $ mappend lbl $ " <-- Lossless packet " ++ show seqno
                            atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
                when isfull $ do
                    dput XNetCrypto $ mappend lbl $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
                    atomically $ do
                        (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
                        when isfull retry
                let sendit = sendMessage udp saddr x
                maybe sendit (catchIOError sendit) oops
            , setActive = \case
                False -> do
                    atomically $ do
                        writeTChan oob Terminated -- quit rloop thread
                        writeTVar term (return Terminated)
                    setActive udp False
                True -> return ()
            }
        resend ns = do
            xs <- atomically $ retrieveForResend pb ns
            dput XNetCrypto $ mappend lbl $ " <-- Resending " ++ show (length xs) ++ " packets."
            forM_ xs $ \x -> do
                dput XNetCrypto $ mappend lbl $ " <-- Resending packet."
                sendMessage udp saddr . snd $ x
    return (tr, resend, atomically $ PB.packetNumbersToRequest pb)