From d4c209fb9543019461bcf612da67708aeabcdce2 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sat, 25 Jan 2020 01:02:33 -0500 Subject: Ported dhtd to reworked QueryResponse design. --- dht/src/Network/Lossless.hs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'dht/src/Network/Lossless.hs') diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs index 41203ca5..7ccceec1 100644 --- a/dht/src/Network/Lossless.hs +++ b/dht/src/Network/Lossless.hs @@ -61,7 +61,9 @@ lossless lbl isLossless encode saddr udp = do rloop <- forkIO $ do -- This thread enqueues inbound packets or writes them to the oob -- channel. - fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do + fix $ \loop -> do + (m,io) <- atomically $ awaitMessage udp + io m' <- case m of Terminated -> return Nothing ParseError e -> return $ Just (Left e) Arrival a x -> Just . Right <$> isLossless x a @@ -87,15 +89,14 @@ lossless lbl isLossless encode saddr udp = do -- we will use this STM action stop it from waiting on the oob TChan. -- XXX: This shouldn't be neccessary and might be costly. let tr = Transport - { awaitMessage = \kont -> + { awaitMessage = orElse (do x <- readTChan oob `orElse` join (readTVar term) - return $ kont $! x) + return (x, return ())) (do x <- PB.awaitReadyPacket pb report <- pbReport "dequeued" pb - return $ do - atomically $ writeTChan oob (ParseError report) - kont $! uncurry (flip Arrival) x) + return $ (,) (uncurry (flip Arrival) x) $ do + atomically $ writeTChan oob (ParseError report)) , sendMessage = \a' x' -> do seqno <- atomically $ do seqno <- PB.nextToSendSequenceNumber pb -- cgit v1.2.3