summaryrefslogtreecommitdiff
path: root/src/Network/Lossless.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/Lossless.hs')
-rw-r--r--src/Network/Lossless.hs38
1 files changed, 24 insertions, 14 deletions
diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs
index bdbeb3a2..f48dc8fd 100644
--- a/src/Network/Lossless.hs
+++ b/src/Network/Lossless.hs
@@ -27,7 +27,7 @@ data SequenceInfo = SequenceInfo
27 27
28lossless :: Show addr => 28lossless :: Show addr =>
29 (x -> addr -> IO (PacketInboundEvent (x',addr'))) 29 (x -> addr -> IO (PacketInboundEvent (x',addr')))
30 -> (SequenceInfo -> x' -> addr' -> IO y) 30 -> (SequenceInfo -> x' -> addr' -> IO (Bool,y))
31 -> addr 31 -> addr
32 -> TransportA String addr x y 32 -> TransportA String addr x y
33 -> IO ( Transport String addr' x' 33 -> IO ( Transport String addr' x'
@@ -39,44 +39,54 @@ lossless isLossless encode saddr udp = do
39 oob <- atomically newTChan -- Out-of-band channel, these packets (or 39 oob <- atomically newTChan -- Out-of-band channel, these packets (or
40 -- errors) bypass the packet buffer to be 40 -- errors) bypass the packet buffer to be
41 -- received immediately. 41 -- received immediately.
42 rloop <- forkIO $ fix $ \loop -> do 42 rloop <- forkIO $ do
43 -- This thread enqueues inbound packets or writes them to the oob 43 -- This thread enqueues inbound packets or writes them to the oob
44 -- channel. 44 -- channel.
45 myThreadId >>= flip labelThread ("lossless."++show saddr) 45 myThreadId >>= flip labelThread ("lossless."++show saddr)
46 fix $ \loop -> do
46 awaitMessage udp $ \m -> do 47 awaitMessage udp $ \m -> do
47 forM_ m $ \raw -> do 48 m' <- mapM (mapM $ uncurry isLossless) m
48 m' <- mapM (uncurry isLossless) raw
49 case m' of 49 case m' of
50 Left e -> do 50 Nothing -> do
51 atomically $ writeTChan oob (Left e) 51 atomically $ writeTChan oob Nothing
52 -- Quit thread here.
53 Just (Left e) -> do
54 atomically $ writeTChan oob (Just $ Left e)
52 loop 55 loop
53 Right event -> do 56 Just (Right event) -> do
54 atomically $ do 57 atomically $ do
55 -- x' <- isLossless xaddr x 58 -- x' <- isLossless xaddr x
56 PB.grokInboundPacket pb event 59 PB.grokInboundPacket pb event
57 case event of 60 case event of
58 PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event) 61 PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event)
59 _ -> do 62 _ -> do
60 report <- pbReport "enqueued" pb 63 report <- pbReport "enqueued" pb
61 writeTChan oob (Left report) 64 writeTChan oob (Just $ Left report)
62 loop 65 loop
63 let tr = Transport 66 let tr = Transport
64 { awaitMessage = \kont -> do 67 { awaitMessage = \kont -> do
65 join $ atomically $ orElse 68 join $ atomically $ orElse
66 (do x <- readTChan oob 69 (do x <- readTChan oob
67 return $ kont $! Just x) 70 return $ kont $! x)
68 (do x <- PB.awaitReadyPacket pb 71 (do x <- PB.awaitReadyPacket pb
69 report <- pbReport "dequeued" pb 72 report <- pbReport "dequeued" pb
70 return $ do 73 return $ do
71 dput XNetCrypto report 74 atomically $ writeTChan oob (Just $ Left report)
72 kont $! Just (Right x)) 75 kont $! Just (Right x))
73 , sendMessage = \a' x' -> do 76 , sendMessage = \a' x' -> do
74 seqno <- atomically $ do 77 seqno <- atomically $ do
75 seqno <- PB.nextToSendSequenceNumber pb 78 seqno <- PB.nextToSendSequenceNumber pb
76 ack <- PB.expectingSequenceNumber pb 79 ack <- PB.expectingSequenceNumber pb
77 return $ SequenceInfo seqno ack 80 return $ SequenceInfo seqno ack
78 x <- encode seqno x' a' 81 (islossy,x) <- encode seqno x' a'
79 (isfull,nn) <- atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) 82 (isfull,nn) <-
83 if islossy
84 then do
85 dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno
86 return (False,(0,0)) -- avoid updating seqno on lossy packets.
87 else do
88 dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno
89 atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x)
80 when isfull $ do 90 when isfull $ do
81 dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) 91 dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno)
82 atomically $ do 92 atomically $ do
@@ -84,7 +94,7 @@ lossless isLossless encode saddr udp = do
84 when isfull retry 94 when isfull retry
85 sendMessage udp saddr x 95 sendMessage udp saddr x
86 , closeTransport = do 96 , closeTransport = do
87 killThread rloop 97 atomically $ writeTChan oob Nothing -- quit rloop thread
88 closeTransport udp 98 closeTransport udp
89 } 99 }
90 resend ns = do 100 resend ns = do