summaryrefslogtreecommitdiff
path: root/dht/src/Network/Lossless.hs
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-12-14 01:03:07 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-01 23:26:05 -0500
commitb5a3c7b92e7effcd234037241b00f9f29773d870 (patch)
tree4047e11c9102585001dd3be95855038a6816a5c2 /dht/src/Network/Lossless.hs
parent97043e1069e172a0f389610610892ca060f395dd (diff)
STM-based awaitMessage.
Diffstat (limited to 'dht/src/Network/Lossless.hs')
-rw-r--r--dht/src/Network/Lossless.hs33
1 files changed, 18 insertions, 15 deletions
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs
index 861792ab..5a313aed 100644
--- a/dht/src/Network/Lossless.hs
+++ b/dht/src/Network/Lossless.hs
@@ -59,37 +59,38 @@ lossless isLossless encode saddr udp = do
59 rloop <- forkIO $ do 59 rloop <- forkIO $ do
60 -- This thread enqueues inbound packets or writes them to the oob 60 -- This thread enqueues inbound packets or writes them to the oob
61 -- channel. 61 -- channel.
62 myThreadId >>= flip labelThread ("lossless."++show saddr) 62 fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do
63 fix $ \loop -> do 63 m' <- case m of Terminated -> return Nothing
64 awaitMessage udp $ \m -> do 64 ParseError e -> return $ Just (Left e)
65 m' <- mapM (mapM $ uncurry isLossless) m 65 Arrival a x -> Just . Right <$> isLossless x a
66 case m' of 66 case m' of
67 Nothing -> do 67 Nothing -> do
68 atomically $ writeTChan oob Nothing 68 atomically $ writeTChan oob Terminated
69 -- Quit thread here. 69 -- Quit thread here.
70 Just (Left e) -> do 70 Just (Left e) -> do
71 atomically $ writeTChan oob (Just $ Left e) 71 atomically $ writeTChan oob (ParseError e)
72 loop 72 loop
73 Just (Right event) -> do 73 Just (Right event) -> do
74 atomically $ do 74 atomically $ do
75 -- x' <- isLossless xaddr x 75 -- x' <- isLossless xaddr x
76 PB.grokInboundPacket pb event 76 PB.grokInboundPacket pb event
77 case event of 77 case event of
78 PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) 78 PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event)
79 _ -> do 79 _ -> do
80 report <- pbReport "enqueued" pb 80 report <- pbReport "enqueued" pb
81 writeTChan oob (Just $ Left report) 81 writeTChan oob (ParseError report)
82 loop 82 loop
83 labelThread rloop ("lossless."++show saddr)
83 let tr = Transport 84 let tr = Transport
84 { awaitMessage = \kont -> do 85 { awaitMessage = \kont ->
85 join $ atomically $ orElse 86 orElse
86 (do x <- readTChan oob 87 (do x <- readTChan oob
87 return $ kont $! x) 88 return $ kont $! x)
88 (do x <- PB.awaitReadyPacket pb 89 (do x <- PB.awaitReadyPacket pb
89 report <- pbReport "dequeued" pb 90 report <- pbReport "dequeued" pb
90 return $ do 91 return $ do
91 atomically $ writeTChan oob (Just $ Left report) 92 atomically $ writeTChan oob (ParseError report)
92 kont $! Just (Right x)) 93 kont $! uncurry (flip Arrival) x)
93 , sendMessage = \a' x' -> do 94 , sendMessage = \a' x' -> do
94 seqno <- atomically $ do 95 seqno <- atomically $ do
95 seqno <- PB.nextToSendSequenceNumber pb 96 seqno <- PB.nextToSendSequenceNumber pb
@@ -111,9 +112,11 @@ lossless isLossless encode saddr udp = do
111 when isfull retry 112 when isfull retry
112 let sendit = sendMessage udp saddr x 113 let sendit = sendMessage udp saddr x
113 maybe sendit (catchIOError sendit) oops 114 maybe sendit (catchIOError sendit) oops
114 , closeTransport = do 115 , setActive = \case
115 atomically $ writeTChan oob Nothing -- quit rloop thread 116 False -> do
116 closeTransport udp 117 atomically $ writeTChan oob Terminated -- quit rloop thread
118 setActive udp False
119 True -> return ()
117 } 120 }
118 resend ns = do 121 resend ns = do
119 xs <- atomically $ retrieveForResend pb ns 122 xs <- atomically $ retrieveForResend pb ns