diff options
Diffstat (limited to 'src/Network/Lossless.hs')
-rw-r--r-- | src/Network/Lossless.hs | 124 |
1 files changed, 0 insertions, 124 deletions
diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs deleted file mode 100644 index 861792ab..00000000 --- a/src/Network/Lossless.hs +++ /dev/null | |||
@@ -1,124 +0,0 @@ | |||
1 | -- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable | ||
2 | -- transport over an underlying lossy one. | ||
3 | -- | ||
4 | -- It was written to be a helper to 'Network.Tox.Session' but it is | ||
5 | -- representation-agnostic and so could potentially be used on an unrelated | ||
6 | -- lossy transport. | ||
7 | {-# LANGUAGE CPP #-} | ||
8 | {-# LANGUAGE LambdaCase #-} | ||
9 | {-# LANGUAGE TupleSections #-} | ||
10 | module Network.Lossless where | ||
11 | |||
12 | import Control.Concurrent.STM.TChan | ||
13 | import Control.Monad | ||
14 | import Control.Monad.STM | ||
15 | import Data.Function | ||
16 | import Data.Word | ||
17 | import System.IO.Error | ||
18 | |||
19 | import Data.PacketBuffer as PB | ||
20 | import DPut | ||
21 | import DebugTag | ||
22 | import Network.QueryResponse | ||
23 | |||
24 | #ifdef THREAD_DEBUG | ||
25 | import Control.Concurrent.Lifted.Instrument | ||
26 | #else | ||
27 | import Control.Concurrent.Lifted | ||
28 | #endif | ||
29 | |||
30 | -- | Sequencing information for a packet. | ||
31 | data SequenceInfo = SequenceInfo | ||
32 | { sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'. | ||
33 | , sequenceAck :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet. | ||
34 | } | ||
35 | deriving (Eq,Ord,Show) | ||
36 | |||
37 | data OutgoingInfo y = OutgoingInfo | ||
38 | { oIsLossy :: Bool -- ^ True if the packet is treated as lossy. | ||
39 | , oEncoded :: y -- ^ The packet. | ||
40 | , oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure. | ||
41 | } | ||
42 | |||
43 | -- | Obtain a reliable transport form an unreliable one. | ||
44 | lossless :: Show addr => | ||
45 | (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. | ||
46 | -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. | ||
47 | -> addr -- ^ The remote address for this session. | ||
48 | -> TransportA String addr x y -- ^ An unreliable lossy transport. | ||
49 | |||
50 | -> IO ( Transport String addr' x' -- ^ A reliable lossless transport. | ||
51 | , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent. | ||
52 | , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request. | ||
53 | ) | ||
54 | lossless isLossless encode saddr udp = do | ||
55 | pb <- atomically newPacketBuffer | ||
56 | oob <- atomically newTChan -- Out-of-band channel, these packets (or | ||
57 | -- errors) bypass the packet buffer to be | ||
58 | -- received immediately. | ||
59 | rloop <- forkIO $ do | ||
60 | -- This thread enqueues inbound packets or writes them to the oob | ||
61 | -- channel. | ||
62 | myThreadId >>= flip labelThread ("lossless."++show saddr) | ||
63 | fix $ \loop -> do | ||
64 | awaitMessage udp $ \m -> do | ||
65 | m' <- mapM (mapM $ uncurry isLossless) m | ||
66 | case m' of | ||
67 | Nothing -> do | ||
68 | atomically $ writeTChan oob Nothing | ||
69 | -- Quit thread here. | ||
70 | Just (Left e) -> do | ||
71 | atomically $ writeTChan oob (Just $ Left e) | ||
72 | loop | ||
73 | Just (Right event) -> do | ||
74 | atomically $ do | ||
75 | -- x' <- isLossless xaddr x | ||
76 | PB.grokInboundPacket pb event | ||
77 | case event of | ||
78 | PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) | ||
79 | _ -> do | ||
80 | report <- pbReport "enqueued" pb | ||
81 | writeTChan oob (Just $ Left report) | ||
82 | loop | ||
83 | let tr = Transport | ||
84 | { awaitMessage = \kont -> do | ||
85 | join $ atomically $ orElse | ||
86 | (do x <- readTChan oob | ||
87 | return $ kont $! x) | ||
88 | (do x <- PB.awaitReadyPacket pb | ||
89 | report <- pbReport "dequeued" pb | ||
90 | return $ do | ||
91 | atomically $ writeTChan oob (Just $ Left report) | ||
92 | kont $! Just (Right x)) | ||
93 | , sendMessage = \a' x' -> do | ||
94 | seqno <- atomically $ do | ||
95 | seqno <- PB.nextToSendSequenceNumber pb | ||
96 | ack <- PB.expectingSequenceNumber pb | ||
97 | return $ SequenceInfo seqno ack | ||
98 | OutgoingInfo islossy x oops <- encode seqno x' a' | ||
99 | (isfull,nn) <- | ||
100 | if islossy | ||
101 | then do | ||
102 | dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno | ||
103 | return (False,(0,0)) -- avoid updating seqno on lossy packets. | ||
104 | else do | ||
105 | dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno | ||
106 | atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | ||
107 | when isfull $ do | ||
108 | dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) | ||
109 | atomically $ do | ||
110 | (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | ||
111 | when isfull retry | ||
112 | let sendit = sendMessage udp saddr x | ||
113 | maybe sendit (catchIOError sendit) oops | ||
114 | , closeTransport = do | ||
115 | atomically $ writeTChan oob Nothing -- quit rloop thread | ||
116 | closeTransport udp | ||
117 | } | ||
118 | resend ns = do | ||
119 | xs <- atomically $ retrieveForResend pb ns | ||
120 | dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." | ||
121 | forM_ xs $ \x -> do | ||
122 | dput XNetCrypto $ shows saddr $ " <-- Resending packet." | ||
123 | sendMessage udp saddr . snd $ x | ||
124 | return (tr, resend, atomically $ PB.packetNumbersToRequest pb) | ||