summaryrefslogtreecommitdiff
path: root/dht/src/Network/QueryResponse.hs
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-12-14 01:03:07 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-01 23:26:05 -0500
commitb5a3c7b92e7effcd234037241b00f9f29773d870 (patch)
tree4047e11c9102585001dd3be95855038a6816a5c2 /dht/src/Network/QueryResponse.hs
parent97043e1069e172a0f389610610892ca060f395dd (diff)
STM-based awaitMessage.
Diffstat (limited to 'dht/src/Network/QueryResponse.hs')
-rw-r--r--dht/src/Network/QueryResponse.hs65
1 files changed, 40 insertions, 25 deletions
diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs
index f62fbefe..ba686929 100644
--- a/dht/src/Network/QueryResponse.hs
+++ b/dht/src/Network/QueryResponse.hs
@@ -54,15 +54,18 @@ data Arrival err addr x
54data TransportA err addr x y = Transport 54data TransportA err addr x y = Transport
55 { -- | Blocks until an inbound packet is available. Then calls the provided 55 { -- | Blocks until an inbound packet is available. Then calls the provided
56 -- continuation with the packet and origin addres or an error condition. 56 -- continuation with the packet and origin addres or an error condition.
57 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a 57 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a)
58 -- | Send an /y/ packet to the given destination /addr/. 58 -- | Send an /y/ packet to the given destination /addr/.
59 , sendMessage :: addr -> y -> IO () 59 , sendMessage :: addr -> y -> IO ()
60 -- | Shutdown and clean up any state related to this 'Transport'. 60 -- | Shutdown and clean up any state related to this 'Transport'.
61 , closeTransport :: IO () 61 , setActive :: Bool -> IO ()
62 } 62 }
63 63
64type Transport err addr x = TransportA err addr x x 64type Transport err addr x = TransportA err addr x x
65 65
66closeTransport :: TransportA err addr x y -> IO ()
67closeTransport tr = setActive tr False
68
66-- | This function modifies a 'Transport' to use higher-level addresses and 69-- | This function modifies a 'Transport' to use higher-level addresses and
67-- packet representations. It could be used to change UDP 'ByteString's into 70-- packet representations. It could be used to change UDP 'ByteString's into
68-- bencoded syntax trees or to add an encryption layer in which addresses have 71-- bencoded syntax trees or to add an encryption layer in which addresses have
@@ -110,7 +113,7 @@ layerTransport parse encode tr =
110 (\x' addr' -> return $ encode x' addr') 113 (\x' addr' -> return $ encode x' addr')
111 tr 114 tr
112 115
113-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' 116-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan'
114-- is used to share the same underlying socket, so be sure to fork a thread for 117-- is used to share the same underlying socket, so be sure to fork a thread for
115-- both returned 'Transport's to avoid hanging. 118-- both returned 'Transport's to avoid hanging.
116partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) 119partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
@@ -118,28 +121,28 @@ partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a)))
118 -> Transport err a b 121 -> Transport err a b
119 -> IO (Transport err xaddr x, Transport err a b) 122 -> IO (Transport err xaddr x, Transport err a b)
120partitionTransportM parse encodex tr = do 123partitionTransportM parse encodex tr = do
121 mvar <- newEmptyMVar 124 tchan <- atomically newTChan
122 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do 125 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
123 awaitMessage tr $ \m -> case m of 126 awaitMessage tr $ \m -> case m of
124 Arrival adr msg -> parse (msg,adr) >>= \case 127 Arrival adr msg -> parse (msg,adr) >>= \case
125 Left (x,xaddr) -> kont $ Arrival xaddr x 128 Left (x,xaddr) -> kont $ Arrival xaddr x
126 Right y -> putMVar mvar (Just y) >> again 129 Right y -> atomically (writeTChan tchan (Just y)) >> join (atomically again)
127 ParseError e -> kont $ ParseError e 130 ParseError e -> kont $ ParseError e
128 Terminated -> putMVar mvar Nothing >> kont Terminated 131 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated
129 , sendMessage = \addr' msg' -> do 132 , sendMessage = \addr' msg' -> do
130 msg_addr <- encodex (msg',addr') 133 msg_addr <- encodex (msg',addr')
131 mapM_ (uncurry . flip $ sendMessage tr) msg_addr 134 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
132 } 135 }
133 ytr = Transport 136 ytr = Transport
134 { awaitMessage = \kont -> takeMVar mvar >>= kont . \case 137 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case
135 Nothing -> Terminated 138 Nothing -> Terminated
136 Just (y,yaddr) -> Arrival yaddr y 139 Just (y,yaddr) -> Arrival yaddr y
137 , sendMessage = sendMessage tr 140 , sendMessage = sendMessage tr
138 , closeTransport = return () 141 , setActive = const $ return ()
139 } 142 }
140 return (xtr, ytr) 143 return (xtr, ytr)
141 144
142-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' 145-- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan'
143-- is used to share the same underlying socket, so be sure to fork a thread for 146-- is used to share the same underlying socket, so be sure to fork a thread for
144-- both returned 'Transport's to avoid hanging. 147-- both returned 'Transport's to avoid hanging.
145partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) 148partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
@@ -157,14 +160,14 @@ partitionAndForkTransport ::
157 -> Transport err a b 160 -> Transport err a b
158 -> IO (Transport err xaddr x, Transport err a b) 161 -> IO (Transport err xaddr x, Transport err a b)
159partitionAndForkTransport forkedSend parse encodex tr = do 162partitionAndForkTransport forkedSend parse encodex tr = do
160 mvar <- newEmptyMVar 163 tchan <- atomically newTChan
161 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do 164 let xtr = tr { awaitMessage = \kont -> fix $ \again -> do
162 awaitMessage tr $ \case 165 awaitMessage tr $ \case
163 Arrival a b -> parse (b,a) >>= \case 166 Arrival a b -> parse (b,a) >>= \case
164 Left (x,xaddr) -> kont $ Arrival xaddr x 167 Left (x,xaddr) -> kont $ Arrival xaddr x
165 Right (b,a) -> putMVar mvar (Arrival a b) >> again 168 Right (b,a) -> atomically (writeTChan tchan (Arrival a b)) >> join (atomically again)
166 ParseError e -> kont $ ParseError e 169 ParseError e -> kont $ ParseError e
167 Terminated -> putMVar mvar Terminated >> kont Terminated 170 Terminated -> atomically (writeTChan tchan Terminated) >> kont Terminated
168 , sendMessage = \addr' msg' -> do 171 , sendMessage = \addr' msg' -> do
169 msg_addr <- encodex (msg',addr') 172 msg_addr <- encodex (msg',addr')
170 case msg_addr of 173 case msg_addr of
@@ -173,9 +176,9 @@ partitionAndForkTransport forkedSend parse encodex tr = do
173 Nothing -> return () 176 Nothing -> return ()
174 } 177 }
175 ytr = Transport 178 ytr = Transport
176 { awaitMessage = \kont -> takeMVar mvar >>= kont 179 { awaitMessage = \kont -> readTChan tchan >>= pure . kont
177 , sendMessage = sendMessage tr 180 , sendMessage = sendMessage tr
178 , closeTransport = return () 181 , setActive = \_ -> return ()
179 } 182 }
180 return (xtr, ytr) 183 return (xtr, ytr)
181 184
@@ -186,7 +189,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do
186addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x 189addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x
187addHandler onParseError f tr = tr 190addHandler onParseError f tr = tr
188 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case 191 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case
189 Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x)) 192 Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x))
190 ParseError e -> onParseError e >> kont (ParseError e) 193 ParseError e -> onParseError e >> kont (ParseError e)
191 Terminated -> kont Terminated 194 Terminated -> kont Terminated
192 } 195 }
@@ -210,14 +213,15 @@ onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (
210-- > quitServer 213-- > quitServer
211forkListener :: String -> Transport err addr x -> IO (IO ()) 214forkListener :: String -> Transport err addr x -> IO (IO ())
212forkListener name client = do 215forkListener name client = do
216 setActive client True
213 thread_id <- forkIO $ do 217 thread_id <- forkIO $ do
214 myThreadId >>= flip labelThread ("listener."++name) 218 myThreadId >>= flip labelThread ("listener."++name)
215 fix $ \loop -> awaitMessage client $ \case 219 fix $ \loop -> join $ atomically $ awaitMessage client $ \case
216 Terminated -> return () 220 Terminated -> return ()
217 _ -> loop 221 _ -> loop
218 dput XMisc $ "Listener died: " ++ name 222 dput XMisc $ "Listener died: " ++ name
219 return $ do 223 return $ do
220 closeTransport client 224 setActive client False
221 -- killThread thread_id 225 -- killThread thread_id
222 226
223-- * Implementing a query\/response 'Client'. 227-- * Implementing a query\/response 'Client'.
@@ -606,11 +610,11 @@ udpTransport' bind_address = do
606 setSocketOption sock Broadcast 1 610 setSocketOption sock Broadcast 1
607 bind sock bind_address 611 bind sock bind_address
608 isClosed <- newEmptyMVar 612 isClosed <- newEmptyMVar
613 udpTChan <- atomically newTChan
609 let tr = Transport { 614 let tr = Transport {
610 awaitMessage = \kont -> do 615 awaitMessage = \kont -> do
611 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do 616 r <- readTChan udpTChan
612 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize 617 return $ kont $! r
613 kont $! r
614 , sendMessage = case family of 618 , sendMessage = case family of
615 AF_INET6 -> \case 619 AF_INET6 -> \case
616 (SockAddrInet port addr) -> \bs -> 620 (SockAddrInet port addr) -> \bs ->
@@ -626,7 +630,8 @@ udpTransport' bind_address = do
626 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) 630 addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr)
627 addr4 -> \bs -> saferSendTo sock bs addr4 631 addr4 -> \bs -> saferSendTo sock bs addr4
628 _ -> \addr bs -> saferSendTo sock bs addr 632 _ -> \addr bs -> saferSendTo sock bs addr
629 , closeTransport = do 633 , setActive = \case
634 False -> do
630 dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address 635 dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address
631 tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. 636 tryPutMVar isClosed () -- signal awaitMessage that the transport is closed.
632#if MIN_VERSION_network (3,1,0) 637#if MIN_VERSION_network (3,1,0)
@@ -639,6 +644,14 @@ udpTransport' bind_address = do
639 let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () 644 let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return ()
640 -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. 645 -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage.
641 closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) 646 closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd)
647 True -> do
648 udpThread <- forkIO $ fix $ \again -> do
649 r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do
650 uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize
651 atomically $ writeTChan udpTChan r
652 case r of Terminated -> return ()
653 _ -> again
654 labelThread udpThread ("udp.io."++show bind_address)
642 } 655 }
643 return (tr, sock) 656 return (tr, sock)
644 657
@@ -652,13 +665,15 @@ udpTransport bind_address = fst <$> udpTransport' bind_address
652chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x 665chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
653chanTransport chanFromAddr self achan aclosed = Transport 666chanTransport chanFromAddr self achan aclosed = Transport
654 { awaitMessage = \kont -> do 667 { awaitMessage = \kont -> do
655 x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan) 668 x <- (uncurry (flip Arrival) <$> readTChan achan)
656 `orElse` 669 `orElse`
657 (readTVar aclosed >>= check >> return Terminated) 670 (readTVar aclosed >>= check >> return Terminated)
658 kont x 671 return $ kont x
659 , sendMessage = \them bs -> do 672 , sendMessage = \them bs -> do
660 atomically $ writeTChan (chanFromAddr them) (bs,self) 673 atomically $ writeTChan (chanFromAddr them) (bs,self)
661 , closeTransport = atomically $ writeTVar aclosed True 674 , setActive = \case
675 False -> atomically $ writeTVar aclosed True
676 True -> return ()
662 } 677 }
663 678
664-- | Returns a pair of transports linked together to simulate two computers talking to each other. 679-- | Returns a pair of transports linked together to simulate two computers talking to each other.