diff options
author | Joe Crayne <joe@jerkface.net> | 2019-12-14 01:03:07 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 23:26:05 -0500 |
commit | b5a3c7b92e7effcd234037241b00f9f29773d870 (patch) | |
tree | 4047e11c9102585001dd3be95855038a6816a5c2 /dht/src/Network/QueryResponse.hs | |
parent | 97043e1069e172a0f389610610892ca060f395dd (diff) |
STM-based awaitMessage.
Diffstat (limited to 'dht/src/Network/QueryResponse.hs')
-rw-r--r-- | dht/src/Network/QueryResponse.hs | 65 |
1 files changed, 40 insertions, 25 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index f62fbefe..ba686929 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs | |||
@@ -54,15 +54,18 @@ data Arrival err addr x | |||
54 | data TransportA err addr x y = Transport | 54 | data TransportA err addr x y = Transport |
55 | { -- | Blocks until an inbound packet is available. Then calls the provided | 55 | { -- | Blocks until an inbound packet is available. Then calls the provided |
56 | -- continuation with the packet and origin addres or an error condition. | 56 | -- continuation with the packet and origin addres or an error condition. |
57 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a | 57 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) |
58 | -- | Send an /y/ packet to the given destination /addr/. | 58 | -- | Send an /y/ packet to the given destination /addr/. |
59 | , sendMessage :: addr -> y -> IO () | 59 | , sendMessage :: addr -> y -> IO () |
60 | -- | Shutdown and clean up any state related to this 'Transport'. | 60 | -- | Shutdown and clean up any state related to this 'Transport'. |
61 | , closeTransport :: IO () | 61 | , setActive :: Bool -> IO () |
62 | } | 62 | } |
63 | 63 | ||
64 | type Transport err addr x = TransportA err addr x x | 64 | type Transport err addr x = TransportA err addr x x |
65 | 65 | ||
66 | closeTransport :: TransportA err addr x y -> IO () | ||
67 | closeTransport tr = setActive tr False | ||
68 | |||
66 | -- | This function modifies a 'Transport' to use higher-level addresses and | 69 | -- | This function modifies a 'Transport' to use higher-level addresses and |
67 | -- packet representations. It could be used to change UDP 'ByteString's into | 70 | -- packet representations. It could be used to change UDP 'ByteString's into |
68 | -- bencoded syntax trees or to add an encryption layer in which addresses have | 71 | -- bencoded syntax trees or to add an encryption layer in which addresses have |
@@ -110,7 +113,7 @@ layerTransport parse encode tr = | |||
110 | (\x' addr' -> return $ encode x' addr') | 113 | (\x' addr' -> return $ encode x' addr') |
111 | tr | 114 | tr |
112 | 115 | ||
113 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | 116 | -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' |
114 | -- is used to share the same underlying socket, so be sure to fork a thread for | 117 | -- is used to share the same underlying socket, so be sure to fork a thread for |
115 | -- both returned 'Transport's to avoid hanging. | 118 | -- both returned 'Transport's to avoid hanging. |
116 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | 119 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) |
@@ -118,28 +121,28 @@ partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | |||
118 | -> Transport err a b | 121 | -> Transport err a b |
119 | -> IO (Transport err xaddr x, Transport err a b) | 122 | -> IO (Transport err xaddr x, Transport err a b) |
120 | partitionTransportM parse encodex tr = do | 123 | partitionTransportM parse encodex tr = do |
121 | mvar <- newEmptyMVar | 124 | tchan <- atomically newTChan |
122 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | 125 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do |
123 | awaitMessage tr $ \m -> case m of | 126 | awaitMessage tr $ \m -> case m of |
124 | Arrival adr msg -> parse (msg,adr) >>= \case | 127 | Arrival adr msg -> parse (msg,adr) >>= \case |
125 | Left (x,xaddr) -> kont $ Arrival xaddr x | 128 | Left (x,xaddr) -> kont $ Arrival xaddr x |
126 | Right y -> putMVar mvar (Just y) >> again | 129 | Right y -> atomically (writeTChan tchan (Just y)) >> join (atomically again) |
127 | ParseError e -> kont $ ParseError e | 130 | ParseError e -> kont $ ParseError e |
128 | Terminated -> putMVar mvar Nothing >> kont Terminated | 131 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated |
129 | , sendMessage = \addr' msg' -> do | 132 | , sendMessage = \addr' msg' -> do |
130 | msg_addr <- encodex (msg',addr') | 133 | msg_addr <- encodex (msg',addr') |
131 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | 134 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr |
132 | } | 135 | } |
133 | ytr = Transport | 136 | ytr = Transport |
134 | { awaitMessage = \kont -> takeMVar mvar >>= kont . \case | 137 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case |
135 | Nothing -> Terminated | 138 | Nothing -> Terminated |
136 | Just (y,yaddr) -> Arrival yaddr y | 139 | Just (y,yaddr) -> Arrival yaddr y |
137 | , sendMessage = sendMessage tr | 140 | , sendMessage = sendMessage tr |
138 | , closeTransport = return () | 141 | , setActive = const $ return () |
139 | } | 142 | } |
140 | return (xtr, ytr) | 143 | return (xtr, ytr) |
141 | 144 | ||
142 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' | 145 | -- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan' |
143 | -- is used to share the same underlying socket, so be sure to fork a thread for | 146 | -- is used to share the same underlying socket, so be sure to fork a thread for |
144 | -- both returned 'Transport's to avoid hanging. | 147 | -- both returned 'Transport's to avoid hanging. |
145 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | 148 | partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) |
@@ -157,14 +160,14 @@ partitionAndForkTransport :: | |||
157 | -> Transport err a b | 160 | -> Transport err a b |
158 | -> IO (Transport err xaddr x, Transport err a b) | 161 | -> IO (Transport err xaddr x, Transport err a b) |
159 | partitionAndForkTransport forkedSend parse encodex tr = do | 162 | partitionAndForkTransport forkedSend parse encodex tr = do |
160 | mvar <- newEmptyMVar | 163 | tchan <- atomically newTChan |
161 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do | 164 | let xtr = tr { awaitMessage = \kont -> fix $ \again -> do |
162 | awaitMessage tr $ \case | 165 | awaitMessage tr $ \case |
163 | Arrival a b -> parse (b,a) >>= \case | 166 | Arrival a b -> parse (b,a) >>= \case |
164 | Left (x,xaddr) -> kont $ Arrival xaddr x | 167 | Left (x,xaddr) -> kont $ Arrival xaddr x |
165 | Right (b,a) -> putMVar mvar (Arrival a b) >> again | 168 | Right (b,a) -> atomically (writeTChan tchan (Arrival a b)) >> join (atomically again) |
166 | ParseError e -> kont $ ParseError e | 169 | ParseError e -> kont $ ParseError e |
167 | Terminated -> putMVar mvar Terminated >> kont Terminated | 170 | Terminated -> atomically (writeTChan tchan Terminated) >> kont Terminated |
168 | , sendMessage = \addr' msg' -> do | 171 | , sendMessage = \addr' msg' -> do |
169 | msg_addr <- encodex (msg',addr') | 172 | msg_addr <- encodex (msg',addr') |
170 | case msg_addr of | 173 | case msg_addr of |
@@ -173,9 +176,9 @@ partitionAndForkTransport forkedSend parse encodex tr = do | |||
173 | Nothing -> return () | 176 | Nothing -> return () |
174 | } | 177 | } |
175 | ytr = Transport | 178 | ytr = Transport |
176 | { awaitMessage = \kont -> takeMVar mvar >>= kont | 179 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont |
177 | , sendMessage = sendMessage tr | 180 | , sendMessage = sendMessage tr |
178 | , closeTransport = return () | 181 | , setActive = \_ -> return () |
179 | } | 182 | } |
180 | return (xtr, ytr) | 183 | return (xtr, ytr) |
181 | 184 | ||
@@ -186,7 +189,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do | |||
186 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x | 189 | addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x |
187 | addHandler onParseError f tr = tr | 190 | addHandler onParseError f tr = tr |
188 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case | 191 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case |
189 | Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x)) | 192 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) |
190 | ParseError e -> onParseError e >> kont (ParseError e) | 193 | ParseError e -> onParseError e >> kont (ParseError e) |
191 | Terminated -> kont Terminated | 194 | Terminated -> kont Terminated |
192 | } | 195 | } |
@@ -210,14 +213,15 @@ onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return ( | |||
210 | -- > quitServer | 213 | -- > quitServer |
211 | forkListener :: String -> Transport err addr x -> IO (IO ()) | 214 | forkListener :: String -> Transport err addr x -> IO (IO ()) |
212 | forkListener name client = do | 215 | forkListener name client = do |
216 | setActive client True | ||
213 | thread_id <- forkIO $ do | 217 | thread_id <- forkIO $ do |
214 | myThreadId >>= flip labelThread ("listener."++name) | 218 | myThreadId >>= flip labelThread ("listener."++name) |
215 | fix $ \loop -> awaitMessage client $ \case | 219 | fix $ \loop -> join $ atomically $ awaitMessage client $ \case |
216 | Terminated -> return () | 220 | Terminated -> return () |
217 | _ -> loop | 221 | _ -> loop |
218 | dput XMisc $ "Listener died: " ++ name | 222 | dput XMisc $ "Listener died: " ++ name |
219 | return $ do | 223 | return $ do |
220 | closeTransport client | 224 | setActive client False |
221 | -- killThread thread_id | 225 | -- killThread thread_id |
222 | 226 | ||
223 | -- * Implementing a query\/response 'Client'. | 227 | -- * Implementing a query\/response 'Client'. |
@@ -606,11 +610,11 @@ udpTransport' bind_address = do | |||
606 | setSocketOption sock Broadcast 1 | 610 | setSocketOption sock Broadcast 1 |
607 | bind sock bind_address | 611 | bind sock bind_address |
608 | isClosed <- newEmptyMVar | 612 | isClosed <- newEmptyMVar |
613 | udpTChan <- atomically newTChan | ||
609 | let tr = Transport { | 614 | let tr = Transport { |
610 | awaitMessage = \kont -> do | 615 | awaitMessage = \kont -> do |
611 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do | 616 | r <- readTChan udpTChan |
612 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize | 617 | return $ kont $! r |
613 | kont $! r | ||
614 | , sendMessage = case family of | 618 | , sendMessage = case family of |
615 | AF_INET6 -> \case | 619 | AF_INET6 -> \case |
616 | (SockAddrInet port addr) -> \bs -> | 620 | (SockAddrInet port addr) -> \bs -> |
@@ -626,7 +630,8 @@ udpTransport' bind_address = do | |||
626 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) | 630 | addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) |
627 | addr4 -> \bs -> saferSendTo sock bs addr4 | 631 | addr4 -> \bs -> saferSendTo sock bs addr4 |
628 | _ -> \addr bs -> saferSendTo sock bs addr | 632 | _ -> \addr bs -> saferSendTo sock bs addr |
629 | , closeTransport = do | 633 | , setActive = \case |
634 | False -> do | ||
630 | dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address | 635 | dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address |
631 | tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. | 636 | tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. |
632 | #if MIN_VERSION_network (3,1,0) | 637 | #if MIN_VERSION_network (3,1,0) |
@@ -639,6 +644,14 @@ udpTransport' bind_address = do | |||
639 | let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () | 644 | let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () |
640 | -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. | 645 | -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. |
641 | closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) | 646 | closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) |
647 | True -> do | ||
648 | udpThread <- forkIO $ fix $ \again -> do | ||
649 | r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do | ||
650 | uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize | ||
651 | atomically $ writeTChan udpTChan r | ||
652 | case r of Terminated -> return () | ||
653 | _ -> again | ||
654 | labelThread udpThread ("udp.io."++show bind_address) | ||
642 | } | 655 | } |
643 | return (tr, sock) | 656 | return (tr, sock) |
644 | 657 | ||
@@ -652,13 +665,15 @@ udpTransport bind_address = fst <$> udpTransport' bind_address | |||
652 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | 665 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x |
653 | chanTransport chanFromAddr self achan aclosed = Transport | 666 | chanTransport chanFromAddr self achan aclosed = Transport |
654 | { awaitMessage = \kont -> do | 667 | { awaitMessage = \kont -> do |
655 | x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan) | 668 | x <- (uncurry (flip Arrival) <$> readTChan achan) |
656 | `orElse` | 669 | `orElse` |
657 | (readTVar aclosed >>= check >> return Terminated) | 670 | (readTVar aclosed >>= check >> return Terminated) |
658 | kont x | 671 | return $ kont x |
659 | , sendMessage = \them bs -> do | 672 | , sendMessage = \them bs -> do |
660 | atomically $ writeTChan (chanFromAddr them) (bs,self) | 673 | atomically $ writeTChan (chanFromAddr them) (bs,self) |
661 | , closeTransport = atomically $ writeTVar aclosed True | 674 | , setActive = \case |
675 | False -> atomically $ writeTVar aclosed True | ||
676 | True -> return () | ||
662 | } | 677 | } |
663 | 678 | ||
664 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. | 679 | -- | Returns a pair of transports linked together to simulate two computers talking to each other. |