diff options
Diffstat (limited to 'dht/src/Network/Lossless.hs')
-rw-r--r-- | dht/src/Network/Lossless.hs | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs index 5a313aed..079f4d07 100644 --- a/dht/src/Network/Lossless.hs +++ b/dht/src/Network/Lossless.hs | |||
@@ -9,6 +9,7 @@ | |||
9 | {-# LANGUAGE TupleSections #-} | 9 | {-# LANGUAGE TupleSections #-} |
10 | module Network.Lossless where | 10 | module Network.Lossless where |
11 | 11 | ||
12 | import Control.Concurrent.STM | ||
12 | import Control.Concurrent.STM.TChan | 13 | import Control.Concurrent.STM.TChan |
13 | import Control.Monad | 14 | import Control.Monad |
14 | import Control.Monad.STM | 15 | import Control.Monad.STM |
@@ -42,7 +43,8 @@ data OutgoingInfo y = OutgoingInfo | |||
42 | 43 | ||
43 | -- | Obtain a reliable transport form an unreliable one. | 44 | -- | Obtain a reliable transport form an unreliable one. |
44 | lossless :: Show addr => | 45 | lossless :: Show addr => |
45 | (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. | 46 | String -- ^ Label for debugging. |
47 | -> (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. | ||
46 | -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. | 48 | -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. |
47 | -> addr -- ^ The remote address for this session. | 49 | -> addr -- ^ The remote address for this session. |
48 | -> TransportA String addr x y -- ^ An unreliable lossy transport. | 50 | -> TransportA String addr x y -- ^ An unreliable lossy transport. |
@@ -51,7 +53,7 @@ lossless :: Show addr => | |||
51 | , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent. | 53 | , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent. |
52 | , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request. | 54 | , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request. |
53 | ) | 55 | ) |
54 | lossless isLossless encode saddr udp = do | 56 | lossless lbl isLossless encode saddr udp = do |
55 | pb <- atomically newPacketBuffer | 57 | pb <- atomically newPacketBuffer |
56 | oob <- atomically newTChan -- Out-of-band channel, these packets (or | 58 | oob <- atomically newTChan -- Out-of-band channel, these packets (or |
57 | -- errors) bypass the packet buffer to be | 59 | -- errors) bypass the packet buffer to be |
@@ -80,11 +82,14 @@ lossless isLossless encode saddr udp = do | |||
80 | report <- pbReport "enqueued" pb | 82 | report <- pbReport "enqueued" pb |
81 | writeTChan oob (ParseError report) | 83 | writeTChan oob (ParseError report) |
82 | loop | 84 | loop |
83 | labelThread rloop ("lossless."++show saddr) | 85 | labelThread rloop ("lossless."++lbl) |
86 | term <- newTVarIO retry -- In case awaitMessage is called multiple times beyond termination, | ||
87 | -- we will use this STM action stop it from waiting on the oob TChan. | ||
88 | -- XXX: This shouldn't be neccessary and might be costly. | ||
84 | let tr = Transport | 89 | let tr = Transport |
85 | { awaitMessage = \kont -> | 90 | { awaitMessage = \kont -> |
86 | orElse | 91 | orElse |
87 | (do x <- readTChan oob | 92 | (do x <- readTChan oob `orElse` join (readTVar term) |
88 | return $ kont $! x) | 93 | return $ kont $! x) |
89 | (do x <- PB.awaitReadyPacket pb | 94 | (do x <- PB.awaitReadyPacket pb |
90 | report <- pbReport "dequeued" pb | 95 | report <- pbReport "dequeued" pb |
@@ -100,13 +105,13 @@ lossless isLossless encode saddr udp = do | |||
100 | (isfull,nn) <- | 105 | (isfull,nn) <- |
101 | if islossy | 106 | if islossy |
102 | then do | 107 | then do |
103 | dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno | 108 | dput XNetCrypto $ mappend lbl $ " <-- Lossy packet " ++ show seqno |
104 | return (False,(0,0)) -- avoid updating seqno on lossy packets. | 109 | return (False,(0,0)) -- avoid updating seqno on lossy packets. |
105 | else do | 110 | else do |
106 | dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno | 111 | dput XNetCrypto $ mappend lbl $ " <-- Lossless packet " ++ show seqno |
107 | atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | 112 | atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) |
108 | when isfull $ do | 113 | when isfull $ do |
109 | dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) | 114 | dput XNetCrypto $ mappend lbl $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) |
110 | atomically $ do | 115 | atomically $ do |
111 | (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | 116 | (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) |
112 | when isfull retry | 117 | when isfull retry |
@@ -114,14 +119,16 @@ lossless isLossless encode saddr udp = do | |||
114 | maybe sendit (catchIOError sendit) oops | 119 | maybe sendit (catchIOError sendit) oops |
115 | , setActive = \case | 120 | , setActive = \case |
116 | False -> do | 121 | False -> do |
117 | atomically $ writeTChan oob Terminated -- quit rloop thread | 122 | atomically $ do |
123 | writeTChan oob Terminated -- quit rloop thread | ||
124 | writeTVar term (return Terminated) | ||
118 | setActive udp False | 125 | setActive udp False |
119 | True -> return () | 126 | True -> return () |
120 | } | 127 | } |
121 | resend ns = do | 128 | resend ns = do |
122 | xs <- atomically $ retrieveForResend pb ns | 129 | xs <- atomically $ retrieveForResend pb ns |
123 | dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." | 130 | dput XNetCrypto $ mappend lbl $ " <-- Resending " ++ show (length xs) ++ " packets." |
124 | forM_ xs $ \x -> do | 131 | forM_ xs $ \x -> do |
125 | dput XNetCrypto $ shows saddr $ " <-- Resending packet." | 132 | dput XNetCrypto $ mappend lbl $ " <-- Resending packet." |
126 | sendMessage udp saddr . snd $ x | 133 | sendMessage udp saddr . snd $ x |
127 | return (tr, resend, atomically $ PB.packetNumbersToRequest pb) | 134 | return (tr, resend, atomically $ PB.packetNumbersToRequest pb) |