summaryrefslogtreecommitdiff
path: root/dht/src/Network/Lossless.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/src/Network/Lossless.hs
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/src/Network/Lossless.hs')
-rw-r--r--dht/src/Network/Lossless.hs124
1 files changed, 124 insertions, 0 deletions
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs
new file mode 100644
index 00000000..861792ab
--- /dev/null
+++ b/dht/src/Network/Lossless.hs
@@ -0,0 +1,124 @@
1-- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable
2-- transport over an underlying lossy one.
3--
4-- It was written to be a helper to 'Network.Tox.Session' but it is
5-- representation-agnostic and so could potentially be used on an unrelated
6-- lossy transport.
7{-# LANGUAGE CPP #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE TupleSections #-}
10module Network.Lossless where
11
12import Control.Concurrent.STM.TChan
13import Control.Monad
14import Control.Monad.STM
15import Data.Function
16import Data.Word
17import System.IO.Error
18
19import Data.PacketBuffer as PB
20import DPut
21import DebugTag
22import Network.QueryResponse
23
24#ifdef THREAD_DEBUG
25import Control.Concurrent.Lifted.Instrument
26#else
27import Control.Concurrent.Lifted
28#endif
29
30-- | Sequencing information for a packet.
31data SequenceInfo = SequenceInfo
32 { sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'.
33 , sequenceAck :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet.
34 }
35 deriving (Eq,Ord,Show)
36
37data OutgoingInfo y = OutgoingInfo
38 { oIsLossy :: Bool -- ^ True if the packet is treated as lossy.
39 , oEncoded :: y -- ^ The packet.
40 , oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure.
41 }
42
43-- | Obtain a reliable transport form an unreliable one.
44lossless :: Show addr =>
45 (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets.
46 -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets.
47 -> addr -- ^ The remote address for this session.
48 -> TransportA String addr x y -- ^ An unreliable lossy transport.
49
50 -> IO ( Transport String addr' x' -- ^ A reliable lossless transport.
51 , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent.
52 , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request.
53 )
54lossless isLossless encode saddr udp = do
55 pb <- atomically newPacketBuffer
56 oob <- atomically newTChan -- Out-of-band channel, these packets (or
57 -- errors) bypass the packet buffer to be
58 -- received immediately.
59 rloop <- forkIO $ do
60 -- This thread enqueues inbound packets or writes them to the oob
61 -- channel.
62 myThreadId >>= flip labelThread ("lossless."++show saddr)
63 fix $ \loop -> do
64 awaitMessage udp $ \m -> do
65 m' <- mapM (mapM $ uncurry isLossless) m
66 case m' of
67 Nothing -> do
68 atomically $ writeTChan oob Nothing
69 -- Quit thread here.
70 Just (Left e) -> do
71 atomically $ writeTChan oob (Just $ Left e)
72 loop
73 Just (Right event) -> do
74 atomically $ do
75 -- x' <- isLossless xaddr x
76 PB.grokInboundPacket pb event
77 case event of
78 PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event)
79 _ -> do
80 report <- pbReport "enqueued" pb
81 writeTChan oob (Just $ Left report)
82 loop
83 let tr = Transport
84 { awaitMessage = \kont -> do
85 join $ atomically $ orElse
86 (do x <- readTChan oob
87 return $ kont $! x)
88 (do x <- PB.awaitReadyPacket pb
89 report <- pbReport "dequeued" pb
90 return $ do
91 atomically $ writeTChan oob (Just $ Left report)
92 kont $! Just (Right x))
93 , sendMessage = \a' x' -> do
94 seqno <- atomically $ do
95 seqno <- PB.nextToSendSequenceNumber pb
96 ack <- PB.expectingSequenceNumber pb
97 return $ SequenceInfo seqno ack
98 OutgoingInfo islossy x oops <- encode seqno x' a'
99 (isfull,nn) <-
100 if islossy
101 then do
102 dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno
103 return (False,(0,0)) -- avoid updating seqno on lossy packets.
104 else do
105 dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno
106 atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
107 when isfull $ do
108 dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
109 atomically $ do
110 (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
111 when isfull retry
112 let sendit = sendMessage udp saddr x
113 maybe sendit (catchIOError sendit) oops
114 , closeTransport = do
115 atomically $ writeTChan oob Nothing -- quit rloop thread
116 closeTransport udp
117 }
118 resend ns = do
119 xs <- atomically $ retrieveForResend pb ns
120 dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets."
121 forM_ xs $ \x -> do
122 dput XNetCrypto $ shows saddr $ " <-- Resending packet."
123 sendMessage udp saddr . snd $ x
124 return (tr, resend, atomically $ PB.packetNumbersToRequest pb)