diff options
author | Joe Crayne <joe@jerkface.net> | 2019-12-14 01:03:07 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 23:26:05 -0500 |
commit | b5a3c7b92e7effcd234037241b00f9f29773d870 (patch) | |
tree | 4047e11c9102585001dd3be95855038a6816a5c2 /dht/src/Network/Lossless.hs | |
parent | 97043e1069e172a0f389610610892ca060f395dd (diff) |
STM-based awaitMessage.
Diffstat (limited to 'dht/src/Network/Lossless.hs')
-rw-r--r-- | dht/src/Network/Lossless.hs | 33 |
1 files changed, 18 insertions, 15 deletions
diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs index 861792ab..5a313aed 100644 --- a/dht/src/Network/Lossless.hs +++ b/dht/src/Network/Lossless.hs | |||
@@ -59,37 +59,38 @@ lossless isLossless encode saddr udp = do | |||
59 | rloop <- forkIO $ do | 59 | rloop <- forkIO $ do |
60 | -- This thread enqueues inbound packets or writes them to the oob | 60 | -- This thread enqueues inbound packets or writes them to the oob |
61 | -- channel. | 61 | -- channel. |
62 | myThreadId >>= flip labelThread ("lossless."++show saddr) | 62 | fix $ \loop -> join $ atomically $ awaitMessage udp $ \m -> do |
63 | fix $ \loop -> do | 63 | m' <- case m of Terminated -> return Nothing |
64 | awaitMessage udp $ \m -> do | 64 | ParseError e -> return $ Just (Left e) |
65 | m' <- mapM (mapM $ uncurry isLossless) m | 65 | Arrival a x -> Just . Right <$> isLossless x a |
66 | case m' of | 66 | case m' of |
67 | Nothing -> do | 67 | Nothing -> do |
68 | atomically $ writeTChan oob Nothing | 68 | atomically $ writeTChan oob Terminated |
69 | -- Quit thread here. | 69 | -- Quit thread here. |
70 | Just (Left e) -> do | 70 | Just (Left e) -> do |
71 | atomically $ writeTChan oob (Just $ Left e) | 71 | atomically $ writeTChan oob (ParseError e) |
72 | loop | 72 | loop |
73 | Just (Right event) -> do | 73 | Just (Right event) -> do |
74 | atomically $ do | 74 | atomically $ do |
75 | -- x' <- isLossless xaddr x | 75 | -- x' <- isLossless xaddr x |
76 | PB.grokInboundPacket pb event | 76 | PB.grokInboundPacket pb event |
77 | case event of | 77 | case event of |
78 | PacketReceivedLossy {} -> writeTChan oob (Just $ Right $ peReceivedPayload event) | 78 | PacketReceivedLossy {} -> writeTChan oob (uncurry (flip Arrival) $ peReceivedPayload event) |
79 | _ -> do | 79 | _ -> do |
80 | report <- pbReport "enqueued" pb | 80 | report <- pbReport "enqueued" pb |
81 | writeTChan oob (Just $ Left report) | 81 | writeTChan oob (ParseError report) |
82 | loop | 82 | loop |
83 | labelThread rloop ("lossless."++show saddr) | ||
83 | let tr = Transport | 84 | let tr = Transport |
84 | { awaitMessage = \kont -> do | 85 | { awaitMessage = \kont -> |
85 | join $ atomically $ orElse | 86 | orElse |
86 | (do x <- readTChan oob | 87 | (do x <- readTChan oob |
87 | return $ kont $! x) | 88 | return $ kont $! x) |
88 | (do x <- PB.awaitReadyPacket pb | 89 | (do x <- PB.awaitReadyPacket pb |
89 | report <- pbReport "dequeued" pb | 90 | report <- pbReport "dequeued" pb |
90 | return $ do | 91 | return $ do |
91 | atomically $ writeTChan oob (Just $ Left report) | 92 | atomically $ writeTChan oob (ParseError report) |
92 | kont $! Just (Right x)) | 93 | kont $! uncurry (flip Arrival) x) |
93 | , sendMessage = \a' x' -> do | 94 | , sendMessage = \a' x' -> do |
94 | seqno <- atomically $ do | 95 | seqno <- atomically $ do |
95 | seqno <- PB.nextToSendSequenceNumber pb | 96 | seqno <- PB.nextToSendSequenceNumber pb |
@@ -111,9 +112,11 @@ lossless isLossless encode saddr udp = do | |||
111 | when isfull retry | 112 | when isfull retry |
112 | let sendit = sendMessage udp saddr x | 113 | let sendit = sendMessage udp saddr x |
113 | maybe sendit (catchIOError sendit) oops | 114 | maybe sendit (catchIOError sendit) oops |
114 | , closeTransport = do | 115 | , setActive = \case |
115 | atomically $ writeTChan oob Nothing -- quit rloop thread | 116 | False -> do |
116 | closeTransport udp | 117 | atomically $ writeTChan oob Terminated -- quit rloop thread |
118 | setActive udp False | ||
119 | True -> return () | ||
117 | } | 120 | } |
118 | resend ns = do | 121 | resend ns = do |
119 | xs <- atomically $ retrieveForResend pb ns | 122 | xs <- atomically $ retrieveForResend pb ns |