From 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sat, 28 Sep 2019 13:43:29 -0400 Subject: Factor out some new libraries word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search --- src/Network/Lossless.hs | 124 ------------------------------------------------ 1 file changed, 124 deletions(-) delete mode 100644 src/Network/Lossless.hs (limited to 'src/Network/Lossless.hs') diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs deleted file mode 100644 index 861792ab..00000000 --- a/src/Network/Lossless.hs +++ /dev/null @@ -1,124 +0,0 @@ --- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable --- transport over an underlying lossy one. --- --- It was written to be a helper to 'Network.Tox.Session' but it is --- representation-agnostic and so could potentially be used on an unrelated --- lossy transport. -{-# LANGUAGE CPP #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE TupleSections #-} -module Network.Lossless where - -import Control.Concurrent.STM.TChan -import Control.Monad -import Control.Monad.STM -import Data.Function -import Data.Word -import System.IO.Error - -import Data.PacketBuffer as PB -import DPut -import DebugTag -import Network.QueryResponse - -#ifdef THREAD_DEBUG -import Control.Concurrent.Lifted.Instrument -#else -import Control.Concurrent.Lifted -#endif - --- | Sequencing information for a packet. -data SequenceInfo = SequenceInfo - { sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'. - , sequenceAck :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet. - } - deriving (Eq,Ord,Show) - -data OutgoingInfo y = OutgoingInfo - { oIsLossy :: Bool -- ^ True if the packet is treated as lossy. - , oEncoded :: y -- ^ The packet. - , oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure. - } - --- | Obtain a reliable transport form an unreliable one. -lossless :: Show addr => - (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. - -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. - -> addr -- ^ The remote address for this session. - -> TransportA String addr x y -- ^ An unreliable lossy transport. - - -> IO ( Transport String addr' x' -- ^ A reliable lossless transport. - , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent. - , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request. - ) -lossless isLossless encode saddr udp = do - pb <- atomically newPacketBuffer - oob <- atomically newTChan -- Out-of-band channel, these packets (or - -- errors) bypass the packet buffer to be - -- received immediately. - rloop <- forkIO $ do - -- This thread enqueues inbound packets or writes them to the oob - -- channel. - myThreadId >>= flip labelThread ("lossless."++show saddr) - fix $ \loop -> do - awaitMessage udp $ \m -> do - m' <- mapM (mapM $ uncurry isLossless) m - case m' of - Nothing -> do - atomically $ writeTChan oob Nothing - -- Quit thread here. - Just (Left e) -> do - atomically $ writeTChan oob (Just $ Left e) - loop - Just (Right event) -> do - atomically $ do - -- x' <- isLossless xaddr x - PB.grokInboundPacket pb event - case event of - PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) - _ -> do - report <- pbReport "enqueued" pb - writeTChan oob (Just $ Left report) - loop - let tr = Transport - { awaitMessage = \kont -> do - join $ atomically $ orElse - (do x <- readTChan oob - return $ kont $! x) - (do x <- PB.awaitReadyPacket pb - report <- pbReport "dequeued" pb - return $ do - atomically $ writeTChan oob (Just $ Left report) - kont $! Just (Right x)) - , sendMessage = \a' x' -> do - seqno <- atomically $ do - seqno <- PB.nextToSendSequenceNumber pb - ack <- PB.expectingSequenceNumber pb - return $ SequenceInfo seqno ack - OutgoingInfo islossy x oops <- encode seqno x' a' - (isfull,nn) <- - if islossy - then do - dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno - return (False,(0,0)) -- avoid updating seqno on lossy packets. - else do - dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno - atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) - when isfull $ do - dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) - atomically $ do - (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) - when isfull retry - let sendit = sendMessage udp saddr x - maybe sendit (catchIOError sendit) oops - , closeTransport = do - atomically $ writeTChan oob Nothing -- quit rloop thread - closeTransport udp - } - resend ns = do - xs <- atomically $ retrieveForResend pb ns - dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." - forM_ xs $ \x -> do - dput XNetCrypto $ shows saddr $ " <-- Resending packet." - sendMessage udp saddr . snd $ x - return (tr, resend, atomically $ PB.packetNumbersToRequest pb) -- cgit v1.2.3