summaryrefslogtreecommitdiff
path: root/dht/src/Network/Lossless.hs
blob: 5a313aedbd644a7444f87fc08d25ce0437c3b432 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
-- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable
-- transport over an underlying lossy one.
--
-- It was written to be a helper to 'Network.Tox.Session' but it is
-- representation-agnostic and so could potentially be used on an unrelated
-- lossy transport.
{-# LANGUAGE CPP           #-}
{-# LANGUAGE LambdaCase    #-}
{-# LANGUAGE TupleSections #-}
module Network.Lossless where

import Control.Concurrent.STM.TChan
import Control.Monad
import Control.Monad.STM
import Data.Function
import Data.Word
import System.IO.Error

import Data.PacketBuffer as PB
import DPut
import DebugTag
import Network.QueryResponse

#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
#endif

-- | Sequencing information for a packet.
data SequenceInfo = SequenceInfo
    { sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'.
    , sequenceAck    :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet.
    }
 deriving (Eq,Ord,Show)

data OutgoingInfo y = OutgoingInfo
    { oIsLossy         :: Bool                     -- ^ True if the packet is treated as lossy.
    , oEncoded         :: y                        -- ^ The packet.
    , oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure.
    }

-- | Obtain a reliable transport form an unreliable one.
lossless :: Show addr =>
    (x -> addr -> IO (PacketInboundEvent (x',addr')))       -- ^ Used to classify newly arrived packets.
    -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets.
    -> addr                                                 -- ^ The remote address for this session.
    -> TransportA String addr x y                           -- ^ An unreliable lossy transport.

    -> IO ( Transport String addr' x' -- ^ A reliable lossless transport.
          , [Word32] -> IO ()         -- ^ Use this to request lost packets be re-sent.
          , IO ([Word32],Word32)      -- ^ Use this to discover missing packets to request.
          )
lossless isLossless encode saddr udp = do
    pb <- atomically newPacketBuffer
    oob <- atomically newTChan -- Out-of-band channel, these packets (or
                               -- errors) bypass the packet buffer to be
                               -- received immediately.
    rloop <- forkIO $ do
        -- This thread enqueues inbound packets or writes them to the oob
        -- channel.
        fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do
            m' <- case m of Terminated   -> return Nothing
                            ParseError e -> return $ Just (Left e)
                            Arrival a x  -> Just . Right <$> isLossless x a
            case m' of
                Nothing -> do
                    atomically $ writeTChan oob Terminated
                    -- Quit thread here.
                Just (Left e) -> do
                    atomically $ writeTChan oob (ParseError e)
                    loop
                Just (Right event) -> do
                    atomically $ do
                        -- x' <- isLossless xaddr x
                        PB.grokInboundPacket pb event
                        case event of
                            PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event)
                            _                      -> do
                                report <- pbReport "enqueued" pb
                                writeTChan oob (ParseError report)
                    loop
    labelThread rloop ("lossless."++show saddr)
    let tr = Transport
            { awaitMessage = \kont ->
                orElse
                    (do x <- readTChan oob
                        return $ kont $! x)
                    (do x <- PB.awaitReadyPacket pb
                        report <- pbReport "dequeued" pb
                        return $ do
                            atomically $ writeTChan oob (ParseError report)
                            kont $! uncurry (flip Arrival) x)
            , sendMessage = \a' x' -> do
                seqno <- atomically $ do
                    seqno <- PB.nextToSendSequenceNumber pb
                    ack   <- PB.expectingSequenceNumber pb
                    return $ SequenceInfo seqno ack
                OutgoingInfo islossy x oops <- encode seqno x' a'
                (isfull,nn) <-
                    if islossy
                        then do
                            dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno
                            return (False,(0,0)) -- avoid updating seqno on lossy packets.
                        else do
                            dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno
                            atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
                when isfull $ do
                    dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
                    atomically $ do
                        (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
                        when isfull retry
                let sendit = sendMessage udp saddr x
                maybe sendit (catchIOError sendit) oops
            , setActive = \case
                False -> do
                    atomically $ writeTChan oob Terminated -- quit rloop thread
                    setActive udp False
                True -> return ()
            }
        resend ns = do
            xs <- atomically $ retrieveForResend pb ns
            dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets."
            forM_ xs $ \x -> do
                dput XNetCrypto $ shows saddr $ " <-- Resending packet."
                sendMessage udp saddr . snd $ x
    return (tr, resend, atomically $ PB.packetNumbersToRequest pb)