diff options
Diffstat (limited to 'server/src/Network/QueryResponse.hs')
-rw-r--r-- | server/src/Network/QueryResponse.hs | 127 |
1 files changed, 60 insertions, 67 deletions
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs index 69cc6f50..0c200916 100644 --- a/server/src/Network/QueryResponse.hs +++ b/server/src/Network/QueryResponse.hs | |||
@@ -19,6 +19,7 @@ import Control.Concurrent.Lifted.Instrument | |||
19 | import Control.Concurrent | 19 | import Control.Concurrent |
20 | import GHC.Conc (labelThread) | 20 | import GHC.Conc (labelThread) |
21 | #endif | 21 | #endif |
22 | import Control.Arrow | ||
22 | import Control.Concurrent.STM | 23 | import Control.Concurrent.STM |
23 | import Control.Exception | 24 | import Control.Exception |
24 | import Control.Monad | 25 | import Control.Monad |
@@ -73,7 +74,7 @@ data Arrival err addr x | |||
73 | data TransportA err addr x y = Transport | 74 | data TransportA err addr x y = Transport |
74 | { -- | Blocks until an inbound packet is available. Then calls the provided | 75 | { -- | Blocks until an inbound packet is available. Then calls the provided |
75 | -- continuation with the packet and origin address or an error condition. | 76 | -- continuation with the packet and origin address or an error condition. |
76 | awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) | 77 | awaitMessage :: STM (Arrival err addr x, IO ()) |
77 | -- | Send an /y/ packet to the given destination /addr/. | 78 | -- | Send an /y/ packet to the given destination /addr/. |
78 | , sendMessage :: addr -> y -> IO () | 79 | , sendMessage :: addr -> y -> IO () |
79 | -- | Shutdown and clean up any state related to this 'Transport'. | 80 | -- | Shutdown and clean up any state related to this 'Transport'. |
@@ -84,7 +85,7 @@ type Transport err addr x = TransportA err addr x x | |||
84 | 85 | ||
85 | nullTransport :: TransportA err addr x y | 86 | nullTransport :: TransportA err addr x y |
86 | nullTransport = Transport | 87 | nullTransport = Transport |
87 | { awaitMessage = \_ -> retry | 88 | { awaitMessage = retry |
88 | , sendMessage = \_ _ -> return () | 89 | , sendMessage = \_ _ -> return () |
89 | , setActive = \_ -> return () | 90 | , setActive = \_ -> return () |
90 | } | 91 | } |
@@ -97,7 +98,7 @@ closeTransport tr = setActive tr False | |||
97 | -- bencoded syntax trees or to add an encryption layer in which addresses have | 98 | -- bencoded syntax trees or to add an encryption layer in which addresses have |
98 | -- associated public keys. | 99 | -- associated public keys. |
99 | layerTransportM :: | 100 | layerTransportM :: |
100 | (x -> addr -> IO (Either err (x', addr'))) | 101 | (x -> addr -> STM (Either err (x', addr'))) |
101 | -- ^ Function that attempts to transform a low-level address/packet | 102 | -- ^ Function that attempts to transform a low-level address/packet |
102 | -- pair into a higher level representation. | 103 | -- pair into a higher level representation. |
103 | -> (y' -> addr' -> IO (y, addr)) | 104 | -> (y' -> addr' -> IO (y, addr)) |
@@ -107,14 +108,15 @@ layerTransportM :: | |||
107 | -- ^ The low-level transport to be transformed. | 108 | -- ^ The low-level transport to be transformed. |
108 | -> TransportA err addr' x' y' | 109 | -> TransportA err addr' x' y' |
109 | layerTransportM parse encode tr = | 110 | layerTransportM parse encode tr = |
110 | tr { awaitMessage = \kont -> | 111 | tr { awaitMessage = do |
111 | awaitMessage tr $ \case | 112 | (m,io) <- awaitMessage tr |
112 | Terminated -> kont $ Terminated | 113 | case m of |
113 | Discarded -> kont $ Discarded | 114 | Terminated -> return $ (,) Terminated io |
114 | ParseError e -> kont $ ParseError e | 115 | Discarded -> return $ (,) Discarded io |
115 | Arrival addr x -> parse x addr >>= \case | 116 | ParseError e -> return $ (ParseError e,io) |
116 | Left e -> kont $ ParseError e | 117 | Arrival addr x -> parse x addr >>= \case |
117 | Right (x',addr') -> kont $ Arrival addr' x' | 118 | Left e -> return (ParseError e, io) |
119 | Right (x',addr') -> return (Arrival addr' x', io) | ||
118 | , sendMessage = \addr' msg' -> do | 120 | , sendMessage = \addr' msg' -> do |
119 | (msg,addr) <- encode msg' addr' | 121 | (msg,addr) <- encode msg' addr' |
120 | sendMessage tr addr msg | 122 | sendMessage tr addr msg |
@@ -143,26 +145,26 @@ layerTransport parse encode tr = | |||
143 | -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' | 145 | -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' |
144 | -- is used to share the same underlying socket, so be sure to fork a thread for | 146 | -- is used to share the same underlying socket, so be sure to fork a thread for |
145 | -- both returned 'Transport's to avoid hanging. | 147 | -- both returned 'Transport's to avoid hanging. |
146 | partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) | 148 | partitionTransportM :: ((b,a) -> STM (Either (x,xaddr) (b,a))) |
147 | -> ((y,xaddr) -> IO (Maybe (c,a))) | 149 | -> ((y,xaddr) -> IO (Maybe (c,a))) |
148 | -> TransportA err a b c | 150 | -> TransportA err a b c |
149 | -> IO (TransportA err xaddr x y, TransportA err a b c) | 151 | -> IO (TransportA err xaddr x y, TransportA err a b c) |
150 | partitionTransportM parse encodex tr = do | 152 | partitionTransportM parse encodex tr = do |
151 | tchan <- atomically newTChan | 153 | tchan <- atomically newTChan |
152 | let ytr = tr { awaitMessage = \kont -> | 154 | let ytr = tr { awaitMessage = do |
153 | awaitMessage tr $ \m -> case m of | 155 | (m,io) <- awaitMessage tr |
156 | case m of | ||
154 | Arrival adr msg -> parse (msg,adr) >>= \case | 157 | Arrival adr msg -> parse (msg,adr) >>= \case |
155 | Left x -> atomically (writeTChan tchan (Just x)) >> kont Discarded | 158 | Left x -> return (Discarded, io >> atomically (writeTChan tchan (Just x))) |
156 | Right (y,yaddr) -> kont $ Arrival yaddr y | 159 | Right (y,yaddr) -> return (Arrival yaddr y, io) |
157 | ParseError e -> kont $ ParseError e | 160 | Terminated -> return (Terminated, io >> atomically (writeTChan tchan Nothing)) |
158 | Discarded -> kont $ Discarded | 161 | _ -> return (m,io) |
159 | Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated | ||
160 | , sendMessage = sendMessage tr | 162 | , sendMessage = sendMessage tr |
161 | } | 163 | } |
162 | xtr = Transport | 164 | xtr = Transport |
163 | { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case | 165 | { awaitMessage = readTChan tchan >>= \case |
164 | Nothing -> Terminated | 166 | Nothing -> return (Terminated, return ()) |
165 | Just (x,xaddr) -> Arrival xaddr x | 167 | Just (x,xaddr) -> return (Arrival xaddr x, return ()) |
166 | , sendMessage = \addr' msg' -> do | 168 | , sendMessage = \addr' msg' -> do |
167 | msg_addr <- encodex (msg',addr') | 169 | msg_addr <- encodex (msg',addr') |
168 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr | 170 | mapM_ (uncurry . flip $ sendMessage tr) msg_addr |
@@ -180,24 +182,21 @@ partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) | |||
180 | partitionTransport parse encodex tr = | 182 | partitionTransport parse encodex tr = |
181 | partitionTransportM (return . parse) (return . encodex) tr | 183 | partitionTransportM (return . parse) (return . encodex) tr |
182 | 184 | ||
183 | -- | | 185 | addHandler :: (Arrival err addr x -> STM (Arrival err addr x, IO ())) -> TransportA err addr x y -> TransportA err addr x y |
184 | -- * f add x --> Nothing, consume x | ||
185 | -- --> Just id, leave x to a different handler | ||
186 | -- --> Just g, apply g to x and leave that to a different handler | ||
187 | -- | ||
188 | -- Note: If you add a handler to one of the branches before applying a | ||
189 | -- 'mergeTransports' combinator, then this handler may not block or return | ||
190 | -- Nothing. | ||
191 | addHandler :: (addr -> x -> IO (Maybe (x -> x))) -> TransportA err addr x y -> TransportA err addr x y | ||
192 | addHandler f tr = tr | 186 | addHandler f tr = tr |
193 | { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case | 187 | { awaitMessage = do |
194 | Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) | 188 | (m,io1) <- awaitMessage tr |
195 | m -> kont m | 189 | (m', io2) <- f m |
190 | return (m', io1 >> io2) | ||
196 | } | 191 | } |
197 | 192 | ||
193 | forArrival :: Applicative m => (addr -> x -> IO ()) -> Arrival err addr x -> m (Arrival err addr x, IO ()) | ||
194 | forArrival f (Arrival addr x) = pure (Arrival addr x, f addr x) | ||
195 | forArrival _ m = pure (m, return ()) | ||
196 | |||
198 | -- | Modify a 'Transport' to invoke an action upon every received packet. | 197 | -- | Modify a 'Transport' to invoke an action upon every received packet. |
199 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x | 198 | onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x |
200 | onInbound f tr = addHandler (\addr x -> f addr x >> return (Just id)) tr | 199 | onInbound f tr = addHandler (forArrival f) tr |
201 | 200 | ||
202 | -- * Using a query\/response client. | 201 | -- * Using a query\/response client. |
203 | 202 | ||
@@ -217,10 +216,13 @@ forkListener name onParseError client = do | |||
217 | setActive client True | 216 | setActive client True |
218 | thread_id <- forkIO $ do | 217 | thread_id <- forkIO $ do |
219 | myThreadId >>= flip labelThread ("listener."++name) | 218 | myThreadId >>= flip labelThread ("listener."++name) |
220 | fix $ \loop -> join $ atomically $ awaitMessage client $ \case | 219 | fix $ \loop -> do |
221 | Terminated -> return () | 220 | (m,io) <- atomically $ awaitMessage client |
222 | ParseError e -> onParseError e >> loop | 221 | io |
223 | _ -> loop | 222 | case m of |
223 | Terminated -> return () | ||
224 | ParseError e -> onParseError e >> loop | ||
225 | _ -> loop | ||
224 | dput XMisc $ "Listener died: " ++ name | 226 | dput XMisc $ "Listener died: " ++ name |
225 | return $ do | 227 | return $ do |
226 | setActive client False | 228 | setActive client False |
@@ -539,40 +541,35 @@ transactionMethods methods generate = transactionMethods' id id methods generate | |||
539 | -- throws an exception. | 541 | -- throws an exception. |
540 | handleMessage :: | 542 | handleMessage :: |
541 | ClientA err meth tid addr x y | 543 | ClientA err meth tid addr x y |
542 | -> addr | 544 | -> Arrival err addr x |
543 | -> x | 545 | -> STM (Arrival err addr x, IO ()) |
544 | -> IO (Maybe (x -> x)) | 546 | handleMessage (Client net d err pending whoami responseID) msg@(Arrival addr plain) = do |
545 | handleMessage (Client net d err pending whoami responseID) addr plain = do | ||
546 | -- Just (Left e) -> do reportParseError err e | 547 | -- Just (Left e) -> do reportParseError err e |
547 | -- return $! Just id | 548 | -- return $! Just id |
548 | -- Just (Right (plain, addr)) -> do | 549 | -- Just (Right (plain, addr)) -> do |
549 | case classifyInbound d plain of | 550 | case classifyInbound d plain of |
550 | IsQuery meth tid -> case lookupHandler d meth of | 551 | IsQuery meth tid -> case lookupHandler d meth of |
551 | Nothing -> do reportMissingHandler err meth addr plain | 552 | Nothing -> return (msg, reportMissingHandler err meth addr plain) |
552 | return $! Just id | 553 | Just m -> return $ (,) Discarded $ do |
553 | Just m -> do | ||
554 | self <- whoami (Just addr) | 554 | self <- whoami (Just addr) |
555 | tid' <- responseID tid | 555 | tid' <- responseID tid |
556 | either (\e -> do reportParseError err e | 556 | either (\e -> reportParseError err e) |
557 | return $! Just id) | 557 | (\iom -> iom >>= mapM_ (sendMessage net addr)) |
558 | (>>= \m -> do mapM_ (sendMessage net addr) m | ||
559 | return $! Nothing) | ||
560 | (dispatchQuery m tid' self plain addr) | 558 | (dispatchQuery m tid' self plain addr) |
561 | IsUnsolicited action -> do | 559 | IsUnsolicited action -> return $ (,) Discarded $ do |
562 | self <- whoami (Just addr) | 560 | self <- whoami (Just addr) |
563 | action self addr | 561 | _ <- action self addr |
564 | return Nothing | 562 | return () |
565 | IsResponse tid -> do | 563 | IsResponse tid -> return $ (,) Discarded $ do |
566 | action <- atomically $ do | 564 | action <- atomically $ do |
567 | ts0 <- readTVar pending | 565 | ts0 <- readTVar pending |
568 | (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0 | 566 | (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0 |
569 | writeTVar pending ts | 567 | writeTVar pending ts |
570 | return action | 568 | return action |
571 | action | 569 | action |
572 | return $! Nothing | 570 | IsUnknown e -> return (msg, reportUnknown err addr plain e) |
573 | IsUnknown e -> do reportUnknown err addr plain e | ||
574 | return $! Just id | ||
575 | -- Nothing -> return $! id | 571 | -- Nothing -> return $! id |
572 | handleMessage _ msg = return (msg, return ()) | ||
576 | 573 | ||
577 | -- * UDP Datagrams. | 574 | -- * UDP Datagrams. |
578 | 575 | ||
@@ -629,9 +626,7 @@ udpTransport' bind_address = do | |||
629 | isClosed <- newEmptyMVar | 626 | isClosed <- newEmptyMVar |
630 | udpTChan <- atomically newTChan | 627 | udpTChan <- atomically newTChan |
631 | let tr = Transport { | 628 | let tr = Transport { |
632 | awaitMessage = \kont -> do | 629 | awaitMessage = fmap (,return()) $ readTChan udpTChan |
633 | r <- readTChan udpTChan | ||
634 | return $ kont $! r | ||
635 | , sendMessage = case family of | 630 | , sendMessage = case family of |
636 | AF_INET6 -> \case | 631 | AF_INET6 -> \case |
637 | (SockAddrInet port addr) -> \bs -> | 632 | (SockAddrInet port addr) -> \bs -> |
@@ -681,11 +676,9 @@ udpTransport bind_address = fst <$> udpTransport' bind_address | |||
681 | 676 | ||
682 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x | 677 | chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x |
683 | chanTransport chanFromAddr self achan aclosed = Transport | 678 | chanTransport chanFromAddr self achan aclosed = Transport |
684 | { awaitMessage = \kont -> do | 679 | { awaitMessage = fmap (, return ()) $ |
685 | x <- (uncurry (flip Arrival) <$> readTChan achan) | 680 | orElse (uncurry (flip Arrival) <$> readTChan achan) |
686 | `orElse` | 681 | (readTVar aclosed >>= check >> return Terminated) |
687 | (readTVar aclosed >>= check >> return Terminated) | ||
688 | return $ kont x | ||
689 | , sendMessage = \them bs -> do | 682 | , sendMessage = \them bs -> do |
690 | atomically $ writeTChan (chanFromAddr them) (bs,self) | 683 | atomically $ writeTChan (chanFromAddr them) (bs,self) |
691 | , setActive = \case | 684 | , setActive = \case |
@@ -720,8 +713,8 @@ mergeTransports tmap = do | |||
720 | -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap | 713 | -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap |
721 | -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap | 714 | -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap |
722 | return Transport | 715 | return Transport |
723 | { awaitMessage = \kont -> | 716 | { awaitMessage = |
724 | foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n) | 717 | foldrWithKey (\k (ByAddress tr) n -> (first (decorateAddr k) <$> awaitMessage tr) `orElse` n) |
725 | retry | 718 | retry |
726 | tmap | 719 | tmap |
727 | , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of | 720 | , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of |