summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-24 23:08:14 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-25 19:18:13 -0500
commit9953d0a9ba7e992062ae60ae8e24054b0883b50e (patch)
tree5cbfbdcae7ed6962235055a27c09c78e3a9b0bb9
parentbc93745f7591b3b079d660ca98715911495fceeb (diff)
QueryResponse rework: awaitMessage :: STM (Arrival err addr x, IO ())
-rw-r--r--server/src/Network/QueryResponse.hs127
-rw-r--r--server/src/Network/QueryResponse/TCP.hs5
2 files changed, 63 insertions, 69 deletions
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs
index 69cc6f50..0c200916 100644
--- a/server/src/Network/QueryResponse.hs
+++ b/server/src/Network/QueryResponse.hs
@@ -19,6 +19,7 @@ import Control.Concurrent.Lifted.Instrument
19import Control.Concurrent 19import Control.Concurrent
20import GHC.Conc (labelThread) 20import GHC.Conc (labelThread)
21#endif 21#endif
22import Control.Arrow
22import Control.Concurrent.STM 23import Control.Concurrent.STM
23import Control.Exception 24import Control.Exception
24import Control.Monad 25import Control.Monad
@@ -73,7 +74,7 @@ data Arrival err addr x
73data TransportA err addr x y = Transport 74data TransportA err addr x y = Transport
74 { -- | Blocks until an inbound packet is available. Then calls the provided 75 { -- | Blocks until an inbound packet is available. Then calls the provided
75 -- continuation with the packet and origin address or an error condition. 76 -- continuation with the packet and origin address or an error condition.
76 awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) 77 awaitMessage :: STM (Arrival err addr x, IO ())
77 -- | Send an /y/ packet to the given destination /addr/. 78 -- | Send an /y/ packet to the given destination /addr/.
78 , sendMessage :: addr -> y -> IO () 79 , sendMessage :: addr -> y -> IO ()
79 -- | Shutdown and clean up any state related to this 'Transport'. 80 -- | Shutdown and clean up any state related to this 'Transport'.
@@ -84,7 +85,7 @@ type Transport err addr x = TransportA err addr x x
84 85
85nullTransport :: TransportA err addr x y 86nullTransport :: TransportA err addr x y
86nullTransport = Transport 87nullTransport = Transport
87 { awaitMessage = \_ -> retry 88 { awaitMessage = retry
88 , sendMessage = \_ _ -> return () 89 , sendMessage = \_ _ -> return ()
89 , setActive = \_ -> return () 90 , setActive = \_ -> return ()
90 } 91 }
@@ -97,7 +98,7 @@ closeTransport tr = setActive tr False
97-- bencoded syntax trees or to add an encryption layer in which addresses have 98-- bencoded syntax trees or to add an encryption layer in which addresses have
98-- associated public keys. 99-- associated public keys.
99layerTransportM :: 100layerTransportM ::
100 (x -> addr -> IO (Either err (x', addr'))) 101 (x -> addr -> STM (Either err (x', addr')))
101 -- ^ Function that attempts to transform a low-level address/packet 102 -- ^ Function that attempts to transform a low-level address/packet
102 -- pair into a higher level representation. 103 -- pair into a higher level representation.
103 -> (y' -> addr' -> IO (y, addr)) 104 -> (y' -> addr' -> IO (y, addr))
@@ -107,14 +108,15 @@ layerTransportM ::
107 -- ^ The low-level transport to be transformed. 108 -- ^ The low-level transport to be transformed.
108 -> TransportA err addr' x' y' 109 -> TransportA err addr' x' y'
109layerTransportM parse encode tr = 110layerTransportM parse encode tr =
110 tr { awaitMessage = \kont -> 111 tr { awaitMessage = do
111 awaitMessage tr $ \case 112 (m,io) <- awaitMessage tr
112 Terminated -> kont $ Terminated 113 case m of
113 Discarded -> kont $ Discarded 114 Terminated -> return $ (,) Terminated io
114 ParseError e -> kont $ ParseError e 115 Discarded -> return $ (,) Discarded io
115 Arrival addr x -> parse x addr >>= \case 116 ParseError e -> return $ (ParseError e,io)
116 Left e -> kont $ ParseError e 117 Arrival addr x -> parse x addr >>= \case
117 Right (x',addr') -> kont $ Arrival addr' x' 118 Left e -> return (ParseError e, io)
119 Right (x',addr') -> return (Arrival addr' x', io)
118 , sendMessage = \addr' msg' -> do 120 , sendMessage = \addr' msg' -> do
119 (msg,addr) <- encode msg' addr' 121 (msg,addr) <- encode msg' addr'
120 sendMessage tr addr msg 122 sendMessage tr addr msg
@@ -143,26 +145,26 @@ layerTransport parse encode tr =
143-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' 145-- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan'
144-- is used to share the same underlying socket, so be sure to fork a thread for 146-- is used to share the same underlying socket, so be sure to fork a thread for
145-- both returned 'Transport's to avoid hanging. 147-- both returned 'Transport's to avoid hanging.
146partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) 148partitionTransportM :: ((b,a) -> STM (Either (x,xaddr) (b,a)))
147 -> ((y,xaddr) -> IO (Maybe (c,a))) 149 -> ((y,xaddr) -> IO (Maybe (c,a)))
148 -> TransportA err a b c 150 -> TransportA err a b c
149 -> IO (TransportA err xaddr x y, TransportA err a b c) 151 -> IO (TransportA err xaddr x y, TransportA err a b c)
150partitionTransportM parse encodex tr = do 152partitionTransportM parse encodex tr = do
151 tchan <- atomically newTChan 153 tchan <- atomically newTChan
152 let ytr = tr { awaitMessage = \kont -> 154 let ytr = tr { awaitMessage = do
153 awaitMessage tr $ \m -> case m of 155 (m,io) <- awaitMessage tr
156 case m of
154 Arrival adr msg -> parse (msg,adr) >>= \case 157 Arrival adr msg -> parse (msg,adr) >>= \case
155 Left x -> atomically (writeTChan tchan (Just x)) >> kont Discarded 158 Left x -> return (Discarded, io >> atomically (writeTChan tchan (Just x)))
156 Right (y,yaddr) -> kont $ Arrival yaddr y 159 Right (y,yaddr) -> return (Arrival yaddr y, io)
157 ParseError e -> kont $ ParseError e 160 Terminated -> return (Terminated, io >> atomically (writeTChan tchan Nothing))
158 Discarded -> kont $ Discarded 161 _ -> return (m,io)
159 Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated
160 , sendMessage = sendMessage tr 162 , sendMessage = sendMessage tr
161 } 163 }
162 xtr = Transport 164 xtr = Transport
163 { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case 165 { awaitMessage = readTChan tchan >>= \case
164 Nothing -> Terminated 166 Nothing -> return (Terminated, return ())
165 Just (x,xaddr) -> Arrival xaddr x 167 Just (x,xaddr) -> return (Arrival xaddr x, return ())
166 , sendMessage = \addr' msg' -> do 168 , sendMessage = \addr' msg' -> do
167 msg_addr <- encodex (msg',addr') 169 msg_addr <- encodex (msg',addr')
168 mapM_ (uncurry . flip $ sendMessage tr) msg_addr 170 mapM_ (uncurry . flip $ sendMessage tr) msg_addr
@@ -180,24 +182,21 @@ partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a))
180partitionTransport parse encodex tr = 182partitionTransport parse encodex tr =
181 partitionTransportM (return . parse) (return . encodex) tr 183 partitionTransportM (return . parse) (return . encodex) tr
182 184
183-- | 185addHandler :: (Arrival err addr x -> STM (Arrival err addr x, IO ())) -> TransportA err addr x y -> TransportA err addr x y
184-- * f add x --> Nothing, consume x
185-- --> Just id, leave x to a different handler
186-- --> Just g, apply g to x and leave that to a different handler
187--
188-- Note: If you add a handler to one of the branches before applying a
189-- 'mergeTransports' combinator, then this handler may not block or return
190-- Nothing.
191addHandler :: (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y
192addHandler f tr = tr 186addHandler f tr = tr
193 { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case 187 { awaitMessage = do
194 Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) 188 (m,io1) <- awaitMessage tr
195 m -> kont m 189 (m', io2) <- f m
190 return (m', io1 >> io2)
196 } 191 }
197 192
193forArrival :: Applicative m => (addr -> x -> IO ()) -> Arrival err addr x -> m (Arrival err addr x, IO ())
194forArrival f (Arrival addr x) = pure (Arrival addr x, f addr x)
195forArrival _ m = pure (m, return ())
196
198-- | Modify a 'Transport' to invoke an action upon every received packet. 197-- | Modify a 'Transport' to invoke an action upon every received packet.
199onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x 198onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x
200onInbound f tr = addHandler (\addr x -> f addr x >> return (Just id)) tr 199onInbound f tr = addHandler (forArrival f) tr
201 200
202-- * Using a query\/response client. 201-- * Using a query\/response client.
203 202
@@ -217,10 +216,13 @@ forkListener name onParseError client = do
217 setActive client True 216 setActive client True
218 thread_id <- forkIO $ do 217 thread_id <- forkIO $ do
219 myThreadId >>= flip labelThread ("listener."++name) 218 myThreadId >>= flip labelThread ("listener."++name)
220 fix $ \loop -> join $ atomically $ awaitMessage client $ \case 219 fix $ \loop -> do
221 Terminated -> return () 220 (m,io) <- atomically $ awaitMessage client
222 ParseError e -> onParseError e >> loop 221 io
223 _ -> loop 222 case m of
223 Terminated -> return ()
224 ParseError e -> onParseError e >> loop
225 _ -> loop
224 dput XMisc $ "Listener died: " ++ name 226 dput XMisc $ "Listener died: " ++ name
225 return $ do 227 return $ do
226 setActive client False 228 setActive client False
@@ -539,40 +541,35 @@ transactionMethods methods generate = transactionMethods' id id methods generate
539-- throws an exception. 541-- throws an exception.
540handleMessage :: 542handleMessage ::
541 ClientA err meth tid addr x y 543 ClientA err meth tid addr x y
542 -> addr 544 -> Arrival err addr x
543 -> x 545 -> STM (Arrival err addr x, IO ())
544 -> IO (Maybe (x -> x)) 546handleMessage (Client net d err pending whoami responseID) msg@(Arrival addr plain) = do
545handleMessage (Client net d err pending whoami responseID) addr plain = do
546 -- Just (Left e) -> do reportParseError err e 547 -- Just (Left e) -> do reportParseError err e
547 -- return $! Just id 548 -- return $! Just id
548 -- Just (Right (plain, addr)) -> do 549 -- Just (Right (plain, addr)) -> do
549 case classifyInbound d plain of 550 case classifyInbound d plain of
550 IsQuery meth tid -> case lookupHandler d meth of 551 IsQuery meth tid -> case lookupHandler d meth of
551 Nothing -> do reportMissingHandler err meth addr plain 552 Nothing -> return (msg, reportMissingHandler err meth addr plain)
552 return $! Just id 553 Just m -> return $ (,) Discarded $ do
553 Just m -> do
554 self <- whoami (Just addr) 554 self <- whoami (Just addr)
555 tid' <- responseID tid 555 tid' <- responseID tid
556 either (\e -> do reportParseError err e 556 either (\e -> reportParseError err e)
557 return $! Just id) 557 (\iom -> iom >>= mapM_ (sendMessage net addr))
558 (>>= \m -> do mapM_ (sendMessage net addr) m
559 return $! Nothing)
560 (dispatchQuery m tid' self plain addr) 558 (dispatchQuery m tid' self plain addr)
561 IsUnsolicited action -> do 559 IsUnsolicited action -> return $ (,) Discarded $ do
562 self <- whoami (Just addr) 560 self <- whoami (Just addr)
563 action self addr 561 _ <- action self addr
564 return Nothing 562 return ()
565 IsResponse tid -> do 563 IsResponse tid -> return $ (,) Discarded $ do
566 action <- atomically $ do 564 action <- atomically $ do
567 ts0 <- readTVar pending 565 ts0 <- readTVar pending
568 (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0 566 (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0
569 writeTVar pending ts 567 writeTVar pending ts
570 return action 568 return action
571 action 569 action
572 return $! Nothing 570 IsUnknown e -> return (msg, reportUnknown err addr plain e)
573 IsUnknown e -> do reportUnknown err addr plain e
574 return $! Just id
575 -- Nothing -> return $! id 571 -- Nothing -> return $! id
572handleMessage _ msg = return (msg, return ())
576 573
577-- * UDP Datagrams. 574-- * UDP Datagrams.
578 575
@@ -629,9 +626,7 @@ udpTransport' bind_address = do
629 isClosed <- newEmptyMVar 626 isClosed <- newEmptyMVar
630 udpTChan <- atomically newTChan 627 udpTChan <- atomically newTChan
631 let tr = Transport { 628 let tr = Transport {
632 awaitMessage = \kont -> do 629 awaitMessage = fmap (,return()) $ readTChan udpTChan
633 r <- readTChan udpTChan
634 return $ kont $! r
635 , sendMessage = case family of 630 , sendMessage = case family of
636 AF_INET6 -> \case 631 AF_INET6 -> \case
637 (SockAddrInet port addr) -> \bs -> 632 (SockAddrInet port addr) -> \bs ->
@@ -681,11 +676,9 @@ udpTransport bind_address = fst <$> udpTransport' bind_address
681 676
682chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x 677chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x
683chanTransport chanFromAddr self achan aclosed = Transport 678chanTransport chanFromAddr self achan aclosed = Transport
684 { awaitMessage = \kont -> do 679 { awaitMessage = fmap (, return ()) $
685 x <- (uncurry (flip Arrival) <$> readTChan achan) 680 orElse (uncurry (flip Arrival) <$> readTChan achan)
686 `orElse` 681 (readTVar aclosed >>= check >> return Terminated)
687 (readTVar aclosed >>= check >> return Terminated)
688 return $ kont x
689 , sendMessage = \them bs -> do 682 , sendMessage = \them bs -> do
690 atomically $ writeTChan (chanFromAddr them) (bs,self) 683 atomically $ writeTChan (chanFromAddr them) (bs,self)
691 , setActive = \case 684 , setActive = \case
@@ -720,8 +713,8 @@ mergeTransports tmap = do
720 -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap 713 -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap
721 -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap 714 -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap
722 return Transport 715 return Transport
723 { awaitMessage = \kont -> 716 { awaitMessage =
724 foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n) 717 foldrWithKey (\k (ByAddress tr) n -> (first (decorateAddr k) <$> awaitMessage tr) `orElse` n)
725 retry 718 retry
726 tmap 719 tmap
727 , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of 720 , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of
diff --git a/server/src/Network/QueryResponse/TCP.hs b/server/src/Network/QueryResponse/TCP.hs
index 8b1b432b..24aacd98 100644
--- a/server/src/Network/QueryResponse/TCP.hs
+++ b/server/src/Network/QueryResponse/TCP.hs
@@ -213,8 +213,9 @@ tcpTransport maxcon stream = do
213 msgvar <- atomically newEmptyTMVar 213 msgvar <- atomically newEmptyTMVar
214 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 214 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
215 return $ (,) tcpcache Transport 215 return $ (,) tcpcache Transport
216 { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do 216 { awaitMessage = do
217 f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated) 217 x <- takeTMVar msgvar
218 return (x, return ())
218 , sendMessage = \addr (bDoCon,y) -> do 219 , sendMessage = \addr (bDoCon,y) -> do
219 void . forkLabeled "tcp-send" $ do 220 void . forkLabeled "tcp-send" $ do
220 msock <- acquireConnection msgvar tcpcache stream addr bDoCon 221 msock <- acquireConnection msgvar tcpcache stream addr bDoCon