diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/Lossless.hs | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs new file mode 100644 index 00000000..45241b6d --- /dev/null +++ b/src/Network/Lossless.hs | |||
@@ -0,0 +1,80 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE LambdaCase #-} | ||
3 | {-# LANGUAGE TupleSections #-} | ||
4 | module Network.Lossless where | ||
5 | |||
6 | import Control.Concurrent.STM.TChan | ||
7 | import Control.Monad | ||
8 | import Control.Monad.STM | ||
9 | import Data.Function | ||
10 | import Data.Word | ||
11 | |||
12 | import Data.PacketBuffer as PB | ||
13 | import Network.QueryResponse | ||
14 | |||
15 | #ifdef THREAD_DEBUG | ||
16 | import Control.Concurrent.Lifted.Instrument | ||
17 | #else | ||
18 | import Control.Concurrent.Lifted | ||
19 | #endif | ||
20 | |||
21 | data SequenceInfo = SequenceInfo | ||
22 | { sequenceNumber :: {-# UNPACK #-} !Word32 | ||
23 | , sequenceAck :: {-# UNPACK #-} !Word32 | ||
24 | } | ||
25 | deriving (Eq,Ord,Show) | ||
26 | |||
27 | lossless :: (x -> addr -> IO (PacketInboundEvent (x',addr'))) | ||
28 | -> (SequenceInfo -> x' -> addr' -> IO y) | ||
29 | -> addr | ||
30 | -> TransportA err addr x y | ||
31 | -> IO ( Transport err addr' x' | ||
32 | , [Word32] -> IO () | ||
33 | , IO ([Word32],Word32) | ||
34 | ) | ||
35 | lossless isLossless encode saddr udp = do | ||
36 | pb <- atomically newPacketBuffer | ||
37 | oob <- atomically newTChan -- Out-of-band channel, these packets (or | ||
38 | -- errors) bypass the packet buffer to be | ||
39 | -- received immediately. | ||
40 | rloop <- forkIO $ fix $ \loop -> do | ||
41 | -- This thread enqueues inbound packets or writes them to the oob | ||
42 | -- channel. | ||
43 | awaitMessage udp $ \m -> do | ||
44 | forM_ m $ \raw -> do | ||
45 | m' <- mapM (uncurry isLossless) raw | ||
46 | case m' of | ||
47 | Left e -> do | ||
48 | atomically $ writeTChan oob (Left e) | ||
49 | loop | ||
50 | Right event -> do | ||
51 | atomically $ do | ||
52 | -- x' <- isLossless xaddr x | ||
53 | PB.grokInboundPacket pb event | ||
54 | case event of | ||
55 | PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event) | ||
56 | _ -> return () | ||
57 | loop | ||
58 | let tr = Transport | ||
59 | { awaitMessage = \kont -> do | ||
60 | join $ atomically $ orElse | ||
61 | (do x <- readTChan oob | ||
62 | return $ kont $! Just x) | ||
63 | (do x <- PB.awaitReadyPacket pb | ||
64 | return $ kont $! Just (Right x)) | ||
65 | , sendMessage = \a' x' -> do | ||
66 | seqno <- atomically $ do | ||
67 | seqno <- PB.nextToSendSequenceNumber pb | ||
68 | ack <- PB.expectingSequenceNumber pb | ||
69 | return $ SequenceInfo seqno ack | ||
70 | x <- encode seqno x' a' | ||
71 | atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | ||
72 | sendMessage udp saddr x | ||
73 | , closeTransport = do | ||
74 | killThread rloop | ||
75 | closeTransport udp | ||
76 | } | ||
77 | resend ns = do | ||
78 | xs <- atomically $ retrieveForResend pb ns | ||
79 | mapM_ (sendMessage udp saddr . snd) xs | ||
80 | return (tr, resend, atomically $ PB.packetNumbersToRequest pb) | ||