{-# LANGUAGE CPP #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE TupleSections #-} module Network.Lossless where import Control.Concurrent.STM.TChan import Control.Monad import Control.Monad.STM import Data.Function import Data.Word import Data.PacketBuffer as PB import DPut import Network.QueryResponse #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted #endif data SequenceInfo = SequenceInfo { sequenceNumber :: {-# UNPACK #-} !Word32 , sequenceAck :: {-# UNPACK #-} !Word32 } deriving (Eq,Ord,Show) lossless :: Show addr => (x -> addr -> IO (PacketInboundEvent (x',addr'))) -> (SequenceInfo -> x' -> addr' -> IO (Bool,y)) -> addr -> TransportA String addr x y -> IO ( Transport String addr' x' , [Word32] -> IO () , IO ([Word32],Word32) ) lossless isLossless encode saddr udp = do pb <- atomically newPacketBuffer oob <- atomically newTChan -- Out-of-band channel, these packets (or -- errors) bypass the packet buffer to be -- received immediately. rloop <- forkIO $ do -- This thread enqueues inbound packets or writes them to the oob -- channel. myThreadId >>= flip labelThread ("lossless."++show saddr) fix $ \loop -> do awaitMessage udp $ \m -> do m' <- mapM (mapM $ uncurry isLossless) m case m' of Nothing -> do atomically $ writeTChan oob Nothing -- Quit thread here. Just (Left e) -> do atomically $ writeTChan oob (Just $ Left e) loop Just (Right event) -> do atomically $ do -- x' <- isLossless xaddr x PB.grokInboundPacket pb event case event of PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) _ -> do report <- pbReport "enqueued" pb writeTChan oob (Just $ Left report) loop let tr = Transport { awaitMessage = \kont -> do join $ atomically $ orElse (do x <- readTChan oob return $ kont $! x) (do x <- PB.awaitReadyPacket pb report <- pbReport "dequeued" pb return $ do atomically $ writeTChan oob (Just $ Left report) kont $! Just (Right x)) , sendMessage = \a' x' -> do seqno <- atomically $ do seqno <- PB.nextToSendSequenceNumber pb ack <- PB.expectingSequenceNumber pb return $ SequenceInfo seqno ack (islossy,x) <- encode seqno x' a' (isfull,nn) <- if islossy then do dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno return (False,(0,0)) -- avoid updating seqno on lossy packets. else do dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) when isfull $ do dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) atomically $ do (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) when isfull retry sendMessage udp saddr x , closeTransport = do atomically $ writeTChan oob Nothing -- quit rloop thread closeTransport udp } resend ns = do xs <- atomically $ retrieveForResend pb ns dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." forM_ xs $ \x -> do dput XNetCrypto $ shows saddr $ " <-- Resending packet." sendMessage udp saddr . snd $ x return (tr, resend, atomically $ PB.packetNumbersToRequest pb)