diff options
Diffstat (limited to 'src/Network/Lossless.hs')
-rw-r--r-- | src/Network/Lossless.hs | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs index bdbeb3a2..f48dc8fd 100644 --- a/src/Network/Lossless.hs +++ b/src/Network/Lossless.hs | |||
@@ -27,7 +27,7 @@ data SequenceInfo = SequenceInfo | |||
27 | 27 | ||
28 | lossless :: Show addr => | 28 | lossless :: Show addr => |
29 | (x -> addr -> IO (PacketInboundEvent (x',addr'))) | 29 | (x -> addr -> IO (PacketInboundEvent (x',addr'))) |
30 | -> (SequenceInfo -> x' -> addr' -> IO y) | 30 | -> (SequenceInfo -> x' -> addr' -> IO (Bool,y)) |
31 | -> addr | 31 | -> addr |
32 | -> TransportA String addr x y | 32 | -> TransportA String addr x y |
33 | -> IO ( Transport String addr' x' | 33 | -> IO ( Transport String addr' x' |
@@ -39,44 +39,54 @@ lossless isLossless encode saddr udp = do | |||
39 | oob <- atomically newTChan -- Out-of-band channel, these packets (or | 39 | oob <- atomically newTChan -- Out-of-band channel, these packets (or |
40 | -- errors) bypass the packet buffer to be | 40 | -- errors) bypass the packet buffer to be |
41 | -- received immediately. | 41 | -- received immediately. |
42 | rloop <- forkIO $ fix $ \loop -> do | 42 | rloop <- forkIO $ do |
43 | -- This thread enqueues inbound packets or writes them to the oob | 43 | -- This thread enqueues inbound packets or writes them to the oob |
44 | -- channel. | 44 | -- channel. |
45 | myThreadId >>= flip labelThread ("lossless."++show saddr) | 45 | myThreadId >>= flip labelThread ("lossless."++show saddr) |
46 | fix $ \loop -> do | ||
46 | awaitMessage udp $ \m -> do | 47 | awaitMessage udp $ \m -> do |
47 | forM_ m $ \raw -> do | 48 | m' <- mapM (mapM $ uncurry isLossless) m |
48 | m' <- mapM (uncurry isLossless) raw | ||
49 | case m' of | 49 | case m' of |
50 | Left e -> do | 50 | Nothing -> do |
51 | atomically $ writeTChan oob (Left e) | 51 | atomically $ writeTChan oob Nothing |
52 | -- Quit thread here. | ||
53 | Just (Left e) -> do | ||
54 | atomically $ writeTChan oob (Just $ Left e) | ||
52 | loop | 55 | loop |
53 | Right event -> do | 56 | Just (Right event) -> do |
54 | atomically $ do | 57 | atomically $ do |
55 | -- x' <- isLossless xaddr x | 58 | -- x' <- isLossless xaddr x |
56 | PB.grokInboundPacket pb event | 59 | PB.grokInboundPacket pb event |
57 | case event of | 60 | case event of |
58 | PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event) | 61 | PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) |
59 | _ -> do | 62 | _ -> do |
60 | report <- pbReport "enqueued" pb | 63 | report <- pbReport "enqueued" pb |
61 | writeTChan oob (Left report) | 64 | writeTChan oob (Just $ Left report) |
62 | loop | 65 | loop |
63 | let tr = Transport | 66 | let tr = Transport |
64 | { awaitMessage = \kont -> do | 67 | { awaitMessage = \kont -> do |
65 | join $ atomically $ orElse | 68 | join $ atomically $ orElse |
66 | (do x <- readTChan oob | 69 | (do x <- readTChan oob |
67 | return $ kont $! Just x) | 70 | return $ kont $! x) |
68 | (do x <- PB.awaitReadyPacket pb | 71 | (do x <- PB.awaitReadyPacket pb |
69 | report <- pbReport "dequeued" pb | 72 | report <- pbReport "dequeued" pb |
70 | return $ do | 73 | return $ do |
71 | dput XNetCrypto report | 74 | atomically $ writeTChan oob (Just $ Left report) |
72 | kont $! Just (Right x)) | 75 | kont $! Just (Right x)) |
73 | , sendMessage = \a' x' -> do | 76 | , sendMessage = \a' x' -> do |
74 | seqno <- atomically $ do | 77 | seqno <- atomically $ do |
75 | seqno <- PB.nextToSendSequenceNumber pb | 78 | seqno <- PB.nextToSendSequenceNumber pb |
76 | ack <- PB.expectingSequenceNumber pb | 79 | ack <- PB.expectingSequenceNumber pb |
77 | return $ SequenceInfo seqno ack | 80 | return $ SequenceInfo seqno ack |
78 | x <- encode seqno x' a' | 81 | (islossy,x) <- encode seqno x' a' |
79 | (isfull,nn) <- atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | 82 | (isfull,nn) <- |
83 | if islossy | ||
84 | then do | ||
85 | dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno | ||
86 | return (False,(0,0)) -- avoid updating seqno on lossy packets. | ||
87 | else do | ||
88 | dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno | ||
89 | atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) | ||
80 | when isfull $ do | 90 | when isfull $ do |
81 | dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) | 91 | dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) |
82 | atomically $ do | 92 | atomically $ do |
@@ -84,7 +94,7 @@ lossless isLossless encode saddr udp = do | |||
84 | when isfull retry | 94 | when isfull retry |
85 | sendMessage udp saddr x | 95 | sendMessage udp saddr x |
86 | , closeTransport = do | 96 | , closeTransport = do |
87 | killThread rloop | 97 | atomically $ writeTChan oob Nothing -- quit rloop thread |
88 | closeTransport udp | 98 | closeTransport udp |
89 | } | 99 | } |
90 | resend ns = do | 100 | resend ns = do |