-- | This module uses 'Data.PacketBuffer' appropriately to implement a reliable -- transport over an underlying lossy one. -- -- It was written to be a helper to 'Network.Tox.Session' but it is -- representation-agnostic and so could potentially be used on an unrelated -- lossy transport. {-# LANGUAGE CPP #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE TupleSections #-} module Network.Lossless where import Control.Concurrent.STM import Control.Concurrent.STM.TChan import Control.Monad import Control.Monad.STM import Data.Function import Data.Word import System.IO.Error import Data.PacketBuffer as PB import DPut import DebugTag import Network.QueryResponse #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted #endif -- | Sequencing information for a packet. data SequenceInfo = SequenceInfo { sequenceNumber :: {-# UNPACK #-} !Word32 -- ^ Packets are ordered by their 'sequenceNumber'. , sequenceAck :: {-# UNPACK #-} !Word32 -- ^ This is the sender's latest received in-order packet. } deriving (Eq,Ord,Show) data OutgoingInfo y = OutgoingInfo { oIsLossy :: Bool -- ^ True if the packet is treated as lossy. , oEncoded :: y -- ^ The packet. , oHandleException :: Maybe (IOError -> IO ()) -- ^ Optionally handle send failure. } -- | Obtain a reliable transport form an unreliable one. lossless :: Show addr => String -- ^ Label for debugging. -> (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. -> addr -- ^ The remote address for this session. -> TransportA String addr x y -- ^ An unreliable lossy transport. -> IO ( Transport String addr' x' -- A reliable lossless transport. , [Word32] -> IO () -- Use this to request lost packets be re-sent. , IO ([Word32],Word32) -- Use this to discover missing packets to request. ) lossless lbl isLossless encode saddr udp = do pb <- atomically newPacketBuffer oob <- atomically newTChan -- Out-of-band channel, these packets (or -- errors) bypass the packet buffer to be -- received immediately. rloop <- forkIO $ do -- This thread enqueues inbound packets or writes them to the oob -- channel. fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do m' <- case m of Terminated -> return Nothing ParseError e -> return $ Just (Left e) Arrival a x -> Just . Right <$> isLossless x a case m' of Nothing -> do atomically $ writeTChan oob Terminated -- Quit thread here. Just (Left e) -> do atomically $ writeTChan oob (ParseError e) loop Just (Right event) -> do atomically $ do -- x' <- isLossless xaddr x PB.grokInboundPacket pb event case event of PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event) _ -> do report <- pbReport "enqueued" pb writeTChan oob (ParseError report) loop labelThread rloop ("lossless."++lbl) term <- newTVarIO retry -- In case awaitMessage is called multiple times beyond termination, -- we will use this STM action stop it from waiting on the oob TChan. -- XXX: This shouldn't be neccessary and might be costly. let tr = Transport { awaitMessage = \kont -> orElse (do x <- readTChan oob `orElse` join (readTVar term) return $ kont $! x) (do x <- PB.awaitReadyPacket pb report <- pbReport "dequeued" pb return $ do atomically $ writeTChan oob (ParseError report) kont $! uncurry (flip Arrival) x) , sendMessage = \a' x' -> do seqno <- atomically $ do seqno <- PB.nextToSendSequenceNumber pb ack <- PB.expectingSequenceNumber pb return $ SequenceInfo seqno ack OutgoingInfo islossy x oops <- encode seqno x' a' (isfull,nn) <- if islossy then do dput XNetCrypto $ mappend lbl $ " <-- Lossy packet " ++ show seqno return (False,(0,0)) -- avoid updating seqno on lossy packets. else do dput XNetCrypto $ mappend lbl $ " <-- Lossless packet " ++ show seqno atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) when isfull $ do dput XNetCrypto $ mappend lbl $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) atomically $ do (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) when isfull retry let sendit = sendMessage udp saddr x maybe sendit (catchIOError sendit) oops , setActive = \case False -> do atomically $ do writeTChan oob Terminated -- quit rloop thread writeTVar term (return Terminated) setActive udp False True -> return () } resend ns = do xs <- atomically $ retrieveForResend pb ns dput XNetCrypto $ mappend lbl $ " <-- Resending " ++ show (length xs) ++ " packets." forM_ xs $ \x -> do dput XNetCrypto $ mappend lbl $ " <-- Resending packet." sendMessage udp saddr . snd $ x return (tr, resend, atomically $ PB.packetNumbersToRequest pb)