summaryrefslogtreecommitdiff
path: root/dht/src/Network/Lossless.hs
diff options
context:
space:
mode:
Diffstat (limited to 'dht/src/Network/Lossless.hs')
-rw-r--r--dht/src/Network/Lossless.hs27
1 files changed, 17 insertions, 10 deletions
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs
index 5a313aed..079f4d07 100644
--- a/dht/src/Network/Lossless.hs
+++ b/dht/src/Network/Lossless.hs
@@ -9,6 +9,7 @@
9{-# LANGUAGE TupleSections #-} 9{-# LANGUAGE TupleSections #-}
10module Network.Lossless where 10module Network.Lossless where
11 11
12import Control.Concurrent.STM
12import Control.Concurrent.STM.TChan 13import Control.Concurrent.STM.TChan
13import Control.Monad 14import Control.Monad
14import Control.Monad.STM 15import Control.Monad.STM
@@ -42,7 +43,8 @@ data OutgoingInfo y = OutgoingInfo
42 43
43-- | Obtain a reliable transport form an unreliable one. 44-- | Obtain a reliable transport form an unreliable one.
44lossless :: Show addr => 45lossless :: Show addr =>
45 (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. 46 String -- ^ Label for debugging.
47 -> (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets.
46 -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. 48 -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets.
47 -> addr -- ^ The remote address for this session. 49 -> addr -- ^ The remote address for this session.
48 -> TransportA String addr x y -- ^ An unreliable lossy transport. 50 -> TransportA String addr x y -- ^ An unreliable lossy transport.
@@ -51,7 +53,7 @@ lossless :: Show addr =>
51 , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent. 53 , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent.
52 , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request. 54 , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request.
53 ) 55 )
54lossless isLossless encode saddr udp = do 56lossless lbl isLossless encode saddr udp = do
55 pb <- atomically newPacketBuffer 57 pb <- atomically newPacketBuffer
56 oob <- atomically newTChan -- Out-of-band channel, these packets (or 58 oob <- atomically newTChan -- Out-of-band channel, these packets (or
57 -- errors) bypass the packet buffer to be 59 -- errors) bypass the packet buffer to be
@@ -80,11 +82,14 @@ lossless isLossless encode saddr udp = do
80 report <- pbReport "enqueued" pb 82 report <- pbReport "enqueued" pb
81 writeTChan oob (ParseError report) 83 writeTChan oob (ParseError report)
82 loop 84 loop
83 labelThread rloop ("lossless."++show saddr) 85 labelThread rloop ("lossless."++lbl)
86 term <- newTVarIO retry -- In case awaitMessage is called multiple times beyond termination,
87 -- we will use this STM action stop it from waiting on the oob TChan.
88 -- XXX: This shouldn't be neccessary and might be costly.
84 let tr = Transport 89 let tr = Transport
85 { awaitMessage = \kont -> 90 { awaitMessage = \kont ->
86 orElse 91 orElse
87 (do x <- readTChan oob 92 (do x <- readTChan oob `orElse` join (readTVar term)
88 return $ kont $! x) 93 return $ kont $! x)
89 (do x <- PB.awaitReadyPacket pb 94 (do x <- PB.awaitReadyPacket pb
90 report <- pbReport "dequeued" pb 95 report <- pbReport "dequeued" pb
@@ -100,13 +105,13 @@ lossless isLossless encode saddr udp = do
100 (isfull,nn) <- 105 (isfull,nn) <-
101 if islossy 106 if islossy
102 then do 107 then do
103 dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno 108 dput XNetCrypto $ mappend lbl $ " <-- Lossy packet " ++ show seqno
104 return (False,(0,0)) -- avoid updating seqno on lossy packets. 109 return (False,(0,0)) -- avoid updating seqno on lossy packets.
105 else do 110 else do
106 dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno 111 dput XNetCrypto $ mappend lbl $ " <-- Lossless packet " ++ show seqno
107 atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) 112 atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
108 when isfull $ do 113 when isfull $ do
109 dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) 114 dput XNetCrypto $ mappend lbl $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
110 atomically $ do 115 atomically $ do
111 (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) 116 (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
112 when isfull retry 117 when isfull retry
@@ -114,14 +119,16 @@ lossless isLossless encode saddr udp = do
114 maybe sendit (catchIOError sendit) oops 119 maybe sendit (catchIOError sendit) oops
115 , setActive = \case 120 , setActive = \case
116 False -> do 121 False -> do
117 atomically $ writeTChan oob Terminated -- quit rloop thread 122 atomically $ do
123 writeTChan oob Terminated -- quit rloop thread
124 writeTVar term (return Terminated)
118 setActive udp False 125 setActive udp False
119 True -> return () 126 True -> return ()
120 } 127 }
121 resend ns = do 128 resend ns = do
122 xs <- atomically $ retrieveForResend pb ns 129 xs <- atomically $ retrieveForResend pb ns
123 dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." 130 dput XNetCrypto $ mappend lbl $ " <-- Resending " ++ show (length xs) ++ " packets."
124 forM_ xs $ \x -> do 131 forM_ xs $ \x -> do
125 dput XNetCrypto $ shows saddr $ " <-- Resending packet." 132 dput XNetCrypto $ mappend lbl $ " <-- Resending packet."
126 sendMessage udp saddr . snd $ x 133 sendMessage udp saddr . snd $ x
127 return (tr, resend, atomically $ PB.packetNumbersToRequest pb) 134 return (tr, resend, atomically $ PB.packetNumbersToRequest pb)