summaryrefslogtreecommitdiff
path: root/src/Network/Lossless.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/Lossless.hs')
-rw-r--r--src/Network/Lossless.hs80
1 files changed, 80 insertions, 0 deletions
diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs
new file mode 100644
index 00000000..45241b6d
--- /dev/null
+++ b/src/Network/Lossless.hs
@@ -0,0 +1,80 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE LambdaCase #-}
3{-# LANGUAGE TupleSections #-}
4module Network.Lossless where
5
6import Control.Concurrent.STM.TChan
7import Control.Monad
8import Control.Monad.STM
9import Data.Function
10import Data.Word
11
12import Data.PacketBuffer as PB
13import Network.QueryResponse
14
15#ifdef THREAD_DEBUG
16import Control.Concurrent.Lifted.Instrument
17#else
18import Control.Concurrent.Lifted
19#endif
20
21data SequenceInfo = SequenceInfo
22 { sequenceNumber :: {-# UNPACK #-} !Word32
23 , sequenceAck :: {-# UNPACK #-} !Word32
24 }
25 deriving (Eq,Ord,Show)
26
27lossless :: (x -> addr -> IO (PacketInboundEvent (x',addr')))
28 -> (SequenceInfo -> x' -> addr' -> IO y)
29 -> addr
30 -> TransportA err addr x y
31 -> IO ( Transport err addr' x'
32 , [Word32] -> IO ()
33 , IO ([Word32],Word32)
34 )
35lossless isLossless encode saddr udp = do
36 pb <- atomically newPacketBuffer
37 oob <- atomically newTChan -- Out-of-band channel, these packets (or
38 -- errors) bypass the packet buffer to be
39 -- received immediately.
40 rloop <- forkIO $ fix $ \loop -> do
41 -- This thread enqueues inbound packets or writes them to the oob
42 -- channel.
43 awaitMessage udp $ \m -> do
44 forM_ m $ \raw -> do
45 m' <- mapM (uncurry isLossless) raw
46 case m' of
47 Left e -> do
48 atomically $ writeTChan oob (Left e)
49 loop
50 Right event -> do
51 atomically $ do
52 -- x' <- isLossless xaddr x
53 PB.grokInboundPacket pb event
54 case event of
55 PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event)
56 _ -> return ()
57 loop
58 let tr = Transport
59 { awaitMessage = \kont -> do
60 join $ atomically $ orElse
61 (do x <- readTChan oob
62 return $ kont $! Just x)
63 (do x <- PB.awaitReadyPacket pb
64 return $ kont $! Just (Right x))
65 , sendMessage = \a' x' -> do
66 seqno <- atomically $ do
67 seqno <- PB.nextToSendSequenceNumber pb
68 ack <- PB.expectingSequenceNumber pb
69 return $ SequenceInfo seqno ack
70 x <- encode seqno x' a'
71 atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
72 sendMessage udp saddr x
73 , closeTransport = do
74 killThread rloop
75 closeTransport udp
76 }
77 resend ns = do
78 xs <- atomically $ retrieveForResend pb ns
79 mapM_ (sendMessage udp saddr . snd) xs
80 return (tr, resend, atomically $ PB.packetNumbersToRequest pb)