summaryrefslogtreecommitdiff
path: root/src/Network/Lossless.hs
blob: bdbeb3a2b5bd41875060bcf45d05719b4a3b62a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
{-# LANGUAGE CPP           #-}
{-# LANGUAGE LambdaCase    #-}
{-# LANGUAGE TupleSections #-}
module Network.Lossless where

import Control.Concurrent.STM.TChan
import Control.Monad
import Control.Monad.STM
import Data.Function
import Data.Word

import Data.PacketBuffer as PB
import DPut
import Network.QueryResponse

#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
#endif

data SequenceInfo = SequenceInfo
    { sequenceNumber :: {-# UNPACK #-} !Word32
    , sequenceAck    :: {-# UNPACK #-} !Word32
    }
 deriving (Eq,Ord,Show)

lossless :: Show addr =>
            (x -> addr -> IO (PacketInboundEvent (x',addr')))
            -> (SequenceInfo -> x' -> addr' -> IO y)
            -> addr
            -> TransportA String addr x y
            -> IO ( Transport String addr' x'
                  , [Word32] -> IO ()
                  , IO ([Word32],Word32)
                  )
lossless isLossless encode saddr udp = do
    pb <- atomically newPacketBuffer
    oob <- atomically newTChan -- Out-of-band channel, these packets (or
                               -- errors) bypass the packet buffer to be
                               -- received immediately.
    rloop <- forkIO $ fix $ \loop -> do
        -- This thread enqueues inbound packets or writes them to the oob
        -- channel.
        myThreadId >>= flip labelThread ("lossless."++show saddr)
        awaitMessage udp $ \m -> do
            forM_ m $ \raw -> do
            m' <- mapM (uncurry isLossless) raw
            case m' of
                Left e -> do
                    atomically $ writeTChan oob (Left e)
                    loop
                Right event -> do
                    atomically $ do
                        -- x' <- isLossless xaddr x
                        PB.grokInboundPacket pb event
                        case event of
                            PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event)
                            _                      -> do
                                report <- pbReport "enqueued" pb
                                writeTChan oob (Left report)
                    loop
    let tr = Transport
            { awaitMessage = \kont -> do
                join $ atomically $ orElse
                    (do x <- readTChan oob
                        return $ kont $! Just x)
                    (do x <- PB.awaitReadyPacket pb
                        report <- pbReport "dequeued" pb
                        return $ do
                            dput XNetCrypto report
                            kont $! Just (Right x))
            , sendMessage = \a' x' -> do
                seqno <- atomically $ do
                    seqno <- PB.nextToSendSequenceNumber pb
                    ack   <- PB.expectingSequenceNumber pb
                    return $ SequenceInfo seqno ack
                x <- encode seqno x' a'
                (isfull,nn) <- atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
                when isfull $ do
                    dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
                    atomically $ do
                        (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
                        when isfull retry
                sendMessage udp saddr x
            , closeTransport = do
                killThread rloop
                closeTransport udp
            }
        resend ns = do
            xs <- atomically $ retrieveForResend pb ns
            dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets."
            forM_ xs $ \x -> do
                dput XNetCrypto $ shows saddr $ " <-- Resending packet."
                sendMessage udp saddr . snd $ x
    return (tr, resend, atomically $ PB.packetNumbersToRequest pb)