{-# LANGUAGE CPP #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE TupleSections #-} module Network.Lossless where import Control.Concurrent.STM.TChan import Control.Monad import Control.Monad.STM import Data.Function import Data.Word import Data.PacketBuffer as PB import Network.QueryResponse #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted #endif data SequenceInfo = SequenceInfo { sequenceNumber :: {-# UNPACK #-} !Word32 , sequenceAck :: {-# UNPACK #-} !Word32 } deriving (Eq,Ord,Show) lossless :: (x -> addr -> IO (PacketInboundEvent (x',addr'))) -> (SequenceInfo -> x' -> addr' -> IO y) -> addr -> TransportA err addr x y -> IO ( Transport err addr' x' , [Word32] -> IO () , IO ([Word32],Word32) ) lossless isLossless encode saddr udp = do pb <- atomically newPacketBuffer oob <- atomically newTChan -- Out-of-band channel, these packets (or -- errors) bypass the packet buffer to be -- received immediately. rloop <- forkIO $ fix $ \loop -> do -- This thread enqueues inbound packets or writes them to the oob -- channel. awaitMessage udp $ \m -> do forM_ m $ \raw -> do m' <- mapM (uncurry isLossless) raw case m' of Left e -> do atomically $ writeTChan oob (Left e) loop Right event -> do atomically $ do -- x' <- isLossless xaddr x PB.grokInboundPacket pb event case event of PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event) _ -> return () loop let tr = Transport { awaitMessage = \kont -> do join $ atomically $ orElse (do x <- readTChan oob return $ kont $! Just x) (do x <- PB.awaitReadyPacket pb return $ kont $! Just (Right x)) , sendMessage = \a' x' -> do seqno <- atomically $ do seqno <- PB.nextToSendSequenceNumber pb ack <- PB.expectingSequenceNumber pb return $ SequenceInfo seqno ack x <- encode seqno x' a' atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) sendMessage udp saddr x , closeTransport = do killThread rloop closeTransport udp } resend ns = do xs <- atomically $ retrieveForResend pb ns mapM_ (sendMessage udp saddr . snd) xs return (tr, resend, atomically $ PB.packetNumbersToRequest pb)