summaryrefslogtreecommitdiff
path: root/src/Network/Lossless.hs
blob: 45241b6d914b5a4f3a1fbf33df5276683f69d48d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
{-# LANGUAGE CPP           #-}
{-# LANGUAGE LambdaCase    #-}
{-# LANGUAGE TupleSections #-}
module Network.Lossless where

import Control.Concurrent.STM.TChan
import Control.Monad
import Control.Monad.STM
import Data.Function
import Data.Word

import Data.PacketBuffer as PB
import Network.QueryResponse

#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
#endif

data SequenceInfo = SequenceInfo
    { sequenceNumber :: {-# UNPACK #-} !Word32
    , sequenceAck    :: {-# UNPACK #-} !Word32
    }
 deriving (Eq,Ord,Show)

lossless :: (x -> addr -> IO (PacketInboundEvent (x',addr')))
            -> (SequenceInfo -> x' -> addr' -> IO y)
            -> addr
            -> TransportA err addr x y
            -> IO ( Transport err addr' x'
                  , [Word32] -> IO ()
                  , IO ([Word32],Word32)
                  )
lossless isLossless encode saddr udp = do
    pb <- atomically newPacketBuffer
    oob <- atomically newTChan -- Out-of-band channel, these packets (or
                               -- errors) bypass the packet buffer to be
                               -- received immediately.
    rloop <- forkIO $ fix $ \loop -> do
        -- This thread enqueues inbound packets or writes them to the oob
        -- channel.
        awaitMessage udp $ \m -> do
            forM_ m $ \raw -> do
            m' <- mapM (uncurry isLossless) raw
            case m' of
                Left e -> do
                    atomically $ writeTChan oob (Left e)
                    loop
                Right event -> do
                    atomically $ do
                        -- x' <- isLossless xaddr x
                        PB.grokInboundPacket pb event
                        case event of
                            PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event)
                            _                      -> return ()
                    loop
    let tr = Transport
            { awaitMessage = \kont -> do
                join $ atomically $ orElse
                    (do x <- readTChan oob
                        return $ kont $! Just x)
                    (do x <- PB.awaitReadyPacket pb
                        return $ kont $! Just (Right x))
            , sendMessage = \a' x' -> do
                seqno <- atomically $ do
                    seqno <- PB.nextToSendSequenceNumber pb
                    ack   <- PB.expectingSequenceNumber pb
                    return $ SequenceInfo seqno ack
                x <- encode seqno x' a'
                atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
                sendMessage udp saddr x
            , closeTransport = do
                killThread rloop
                closeTransport udp
            }
        resend ns = do
            xs <- atomically $ retrieveForResend pb ns
            mapM_ (sendMessage udp saddr . snd) xs
    return (tr, resend, atomically $ PB.packetNumbersToRequest pb)