From b5a3c7b92e7effcd234037241b00f9f29773d870 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sat, 14 Dec 2019 01:03:07 -0500 Subject: STM-based awaitMessage. --- dht/src/Network/QueryResponse.hs | 65 ++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 25 deletions(-) (limited to 'dht/src/Network/QueryResponse.hs') diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index f62fbefe..ba686929 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs @@ -54,15 +54,18 @@ data Arrival err addr x data TransportA err addr x y = Transport { -- | Blocks until an inbound packet is available. Then calls the provided -- continuation with the packet and origin addres or an error condition. - awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a + awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) -- | Send an /y/ packet to the given destination /addr/. , sendMessage :: addr -> y -> IO () -- | Shutdown and clean up any state related to this 'Transport'. - , closeTransport :: IO () + , setActive :: Bool -> IO () } type Transport err addr x = TransportA err addr x x +closeTransport :: TransportA err addr x y -> IO () +closeTransport tr = setActive tr False + -- | This function modifies a 'Transport' to use higher-level addresses and -- packet representations. It could be used to change UDP 'ByteString's into -- bencoded syntax trees or to add an encryption layer in which addresses have @@ -110,7 +113,7 @@ layerTransport parse encode tr = (\x' addr' -> return $ encode x' addr') tr --- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' +-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' -- is used to share the same underlying socket, so be sure to fork a thread for -- both returned 'Transport's to avoid hanging. partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) @@ -118,28 +121,28 @@ partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) -> Transport err a b -> IO (Transport err xaddr x, Transport err a b) partitionTransportM parse encodex tr = do - mvar <- newEmptyMVar + tchan <- atomically newTChan let xtr = tr { awaitMessage = \kont -> fix $ \again -> do awaitMessage tr $ \m -> case m of Arrival adr msg -> parse (msg,adr) >>= \case Left (x,xaddr) -> kont $ Arrival xaddr x - Right y -> putMVar mvar (Just y) >> again + Right y -> atomically (writeTChan tchan (Just y)) >> join (atomically again) ParseError e -> kont $ ParseError e - Terminated -> putMVar mvar Nothing >> kont Terminated + Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated , sendMessage = \addr' msg' -> do msg_addr <- encodex (msg',addr') mapM_ (uncurry . flip $ sendMessage tr) msg_addr } ytr = Transport - { awaitMessage = \kont -> takeMVar mvar >>= kont . \case + { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case Nothing -> Terminated Just (y,yaddr) -> Arrival yaddr y , sendMessage = sendMessage tr - , closeTransport = return () + , setActive = const $ return () } return (xtr, ytr) --- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' +-- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan' -- is used to share the same underlying socket, so be sure to fork a thread for -- both returned 'Transport's to avoid hanging. partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) @@ -157,14 +160,14 @@ partitionAndForkTransport :: -> Transport err a b -> IO (Transport err xaddr x, Transport err a b) partitionAndForkTransport forkedSend parse encodex tr = do - mvar <- newEmptyMVar + tchan <- atomically newTChan let xtr = tr { awaitMessage = \kont -> fix $ \again -> do awaitMessage tr $ \case Arrival a b -> parse (b,a) >>= \case Left (x,xaddr) -> kont $ Arrival xaddr x - Right (b,a) -> putMVar mvar (Arrival a b) >> again + Right (b,a) -> atomically (writeTChan tchan (Arrival a b)) >> join (atomically again) ParseError e -> kont $ ParseError e - Terminated -> putMVar mvar Terminated >> kont Terminated + Terminated -> atomically (writeTChan tchan Terminated) >> kont Terminated , sendMessage = \addr' msg' -> do msg_addr <- encodex (msg',addr') case msg_addr of @@ -173,9 +176,9 @@ partitionAndForkTransport forkedSend parse encodex tr = do Nothing -> return () } ytr = Transport - { awaitMessage = \kont -> takeMVar mvar >>= kont + { awaitMessage = \kont -> readTChan tchan >>= pure . kont , sendMessage = sendMessage tr - , closeTransport = return () + , setActive = \_ -> return () } return (xtr, ytr) @@ -186,7 +189,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x addHandler onParseError f tr = tr { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case - Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x)) + Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) ParseError e -> onParseError e >> kont (ParseError e) Terminated -> kont Terminated } @@ -210,14 +213,15 @@ onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return ( -- > quitServer forkListener :: String -> Transport err addr x -> IO (IO ()) forkListener name client = do + setActive client True thread_id <- forkIO $ do myThreadId >>= flip labelThread ("listener."++name) - fix $ \loop -> awaitMessage client $ \case + fix $ \loop -> join $ atomically $ awaitMessage client $ \case Terminated -> return () _ -> loop dput XMisc $ "Listener died: " ++ name return $ do - closeTransport client + setActive client False -- killThread thread_id -- * Implementing a query\/response 'Client'. @@ -606,11 +610,11 @@ udpTransport' bind_address = do setSocketOption sock Broadcast 1 bind sock bind_address isClosed <- newEmptyMVar + udpTChan <- atomically newTChan let tr = Transport { awaitMessage = \kont -> do - r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do - uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize - kont $! r + r <- readTChan udpTChan + return $ kont $! r , sendMessage = case family of AF_INET6 -> \case (SockAddrInet port addr) -> \bs -> @@ -626,7 +630,8 @@ udpTransport' bind_address = do addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) addr4 -> \bs -> saferSendTo sock bs addr4 _ -> \addr bs -> saferSendTo sock bs addr - , closeTransport = do + , setActive = \case + False -> do dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. #if MIN_VERSION_network (3,1,0) @@ -639,6 +644,14 @@ udpTransport' bind_address = do let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) + True -> do + udpThread <- forkIO $ fix $ \again -> do + r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do + uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize + atomically $ writeTChan udpTChan r + case r of Terminated -> return () + _ -> again + labelThread udpThread ("udp.io."++show bind_address) } return (tr, sock) @@ -652,13 +665,15 @@ udpTransport bind_address = fst <$> udpTransport' bind_address chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x chanTransport chanFromAddr self achan aclosed = Transport { awaitMessage = \kont -> do - x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan) + x <- (uncurry (flip Arrival) <$> readTChan achan) `orElse` - (readTVar aclosed >>= check >> return Terminated) - kont x + (readTVar aclosed >>= check >> return Terminated) + return $ kont x , sendMessage = \them bs -> do atomically $ writeTChan (chanFromAddr them) (bs,self) - , closeTransport = atomically $ writeTVar aclosed True + , setActive = \case + False -> atomically $ writeTVar aclosed True + True -> return () } -- | Returns a pair of transports linked together to simulate two computers talking to each other. -- cgit v1.2.3