From 97043e1069e172a0f389610610892ca060f395dd Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 13 Dec 2019 20:04:01 -0500 Subject: QueryResponse: Minor simplification of awaitMessage. --- dht/src/Network/QueryResponse.hs | 473 ++++++++++++++++++++------------------- 1 file changed, 242 insertions(+), 231 deletions(-) diff --git a/dht/src/Network/QueryResponse.hs b/dht/src/Network/QueryResponse.hs index 5fcd1989..f62fbefe 100644 --- a/dht/src/Network/QueryResponse.hs +++ b/dht/src/Network/QueryResponse.hs @@ -44,13 +44,17 @@ import DPut import DebugTag import Data.TableMethods +-- | An inbound packet or condition raised while monitoring a connection. +data Arrival err addr x + = Terminated -- ^ Virtual message that signals EOF. + | ParseError !err -- ^ A badly-formed message was recieved. + | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. + -- | Three methods are required to implement a datagram based query\/response protocol. data TransportA err addr x y = Transport - { -- | Blocks until an inbound packet is available. Returns 'Nothing' when - -- no more packets are expected due to a shutdown or close event. - -- Otherwise, the packet will be parsed as type /x/ and an origin address - -- /addr/. Parse failure is indicated by the type 'err'. - awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a + { -- | Blocks until an inbound packet is available. Then calls the provided + -- continuation with the packet and origin addres or an error condition. + awaitMessage :: forall a. (Arrival err addr x -> IO a) -> IO a -- | Send an /y/ packet to the given destination /addr/. , sendMessage :: addr -> y -> IO () -- | Shutdown and clean up any state related to this 'Transport'. @@ -75,7 +79,12 @@ layerTransportM :: -> TransportA err addr' x' y' layerTransportM parse encode tr = tr { awaitMessage = \kont -> - awaitMessage tr $ \m -> mapM (mapM $ uncurry parse) m >>= kont . fmap join + awaitMessage tr $ \case + Terminated -> kont $ Terminated + ParseError e -> kont $ ParseError e + Arrival addr x -> parse x addr >>= \case + Left e -> kont $ ParseError e + Right (x',addr') -> kont $ Arrival addr' x' , sendMessage = \addr' msg' -> do (msg,addr) <- encode msg' addr' sendMessage tr addr msg @@ -101,16 +110,6 @@ layerTransport parse encode tr = (\x' addr' -> return $ encode x' addr') tr --- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' --- is used to share the same underlying socket, so be sure to fork a thread for --- both returned 'Transport's to avoid hanging. -partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) - -> ((x,xaddr) -> Maybe (b,a)) - -> Transport err a b - -> IO (Transport err xaddr x, Transport err a b) -partitionTransport parse encodex tr = - partitionTransportM (return . parse) (return . encodex) tr - -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' -- is used to share the same underlying socket, so be sure to fork a thread for -- both returned 'Transport's to avoid hanging. @@ -122,22 +121,35 @@ partitionTransportM parse encodex tr = do mvar <- newEmptyMVar let xtr = tr { awaitMessage = \kont -> fix $ \again -> do awaitMessage tr $ \m -> case m of - Just (Right msg) -> parse msg >>= - either (kont . Just . Right) - (\y -> putMVar mvar (Just y) >> again) - Just (Left e) -> kont $ Just (Left e) - Nothing -> putMVar mvar Nothing >> kont Nothing + Arrival adr msg -> parse (msg,adr) >>= \case + Left (x,xaddr) -> kont $ Arrival xaddr x + Right y -> putMVar mvar (Just y) >> again + ParseError e -> kont $ ParseError e + Terminated -> putMVar mvar Nothing >> kont Terminated , sendMessage = \addr' msg' -> do msg_addr <- encodex (msg',addr') mapM_ (uncurry . flip $ sendMessage tr) msg_addr } ytr = Transport - { awaitMessage = \kont -> takeMVar mvar >>= kont . fmap Right + { awaitMessage = \kont -> takeMVar mvar >>= kont . \case + Nothing -> Terminated + Just (y,yaddr) -> Arrival yaddr y , sendMessage = sendMessage tr , closeTransport = return () } return (xtr, ytr) +-- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' +-- is used to share the same underlying socket, so be sure to fork a thread for +-- both returned 'Transport's to avoid hanging. +partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) + -> ((x,xaddr) -> Maybe (b,a)) + -> Transport err a b + -> IO (Transport err xaddr x, Transport err a b) +partitionTransport parse encodex tr = + partitionTransportM (return . parse) (return . encodex) tr + + partitionAndForkTransport :: (dst -> msg -> IO ()) -> ((b,a) -> IO (Either (x,xaddr) (b,a))) @@ -147,12 +159,12 @@ partitionAndForkTransport :: partitionAndForkTransport forkedSend parse encodex tr = do mvar <- newEmptyMVar let xtr = tr { awaitMessage = \kont -> fix $ \again -> do - awaitMessage tr $ \m -> case m of - Just (Right msg) -> parse msg >>= - either (kont . Just . Right) - (\y -> putMVar mvar (Just y) >> again) - Just (Left e) -> kont $ Just (Left e) - Nothing -> putMVar mvar Nothing >> kont Nothing + awaitMessage tr $ \case + Arrival a b -> parse (b,a) >>= \case + Left (x,xaddr) -> kont $ Arrival xaddr x + Right (b,a) -> putMVar mvar (Arrival a b) >> again + ParseError e -> kont $ ParseError e + Terminated -> putMVar mvar Terminated >> kont Terminated , sendMessage = \addr' msg' -> do msg_addr <- encodex (msg',addr') case msg_addr of @@ -161,7 +173,7 @@ partitionAndForkTransport forkedSend parse encodex tr = do Nothing -> return () } ytr = Transport - { awaitMessage = \kont -> takeMVar mvar >>= kont . fmap Right + { awaitMessage = \kont -> takeMVar mvar >>= kont , sendMessage = sendMessage tr , closeTransport = return () } @@ -173,11 +185,10 @@ partitionAndForkTransport forkedSend parse encodex tr = do -- --> Just g, apply g to x and leave that to a different handler addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x addHandler onParseError f tr = tr - { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do - case m of - Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) - Just (Left e ) -> onParseError e >> kont (Just $ Left e) - Nothing -> kont $ Nothing + { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case + Arrival addr x -> f addr x >>= maybe eat (kont . Arrival addr . ($ x)) + ParseError e -> onParseError e >> kont (ParseError e) + Terminated -> kont Terminated } -- | Modify a 'Transport' to invoke an action upon every received packet. @@ -201,12 +212,186 @@ forkListener :: String -> Transport err addr x -> IO (IO ()) forkListener name client = do thread_id <- forkIO $ do myThreadId >>= flip labelThread ("listener."++name) - fix $ \loop -> awaitMessage client $ maybe (return ()) (const loop) + fix $ \loop -> awaitMessage client $ \case + Terminated -> return () + _ -> loop dput XMisc $ "Listener died: " ++ name return $ do closeTransport client -- killThread thread_id +-- * Implementing a query\/response 'Client'. + +-- | These methods indicate what should be done upon various conditions. Write +-- to a log file, make debug prints, or simply ignore them. +-- +-- [ /addr/ ] Address of remote peer. +-- +-- [ /x/ ] Incoming or outgoing packet. +-- +-- [ /meth/ ] Method id of incoming or outgoing request. +-- +-- [ /tid/ ] Transaction id for outgoing packet. +-- +-- [ /err/ ] Error information, typically a 'String'. +data ErrorReporter addr x meth tid err = ErrorReporter + { -- | Incoming: failed to parse packet. + reportParseError :: err -> IO () + -- | Incoming: no handler for request. + , reportMissingHandler :: meth -> addr -> x -> IO () + -- | Incoming: unable to identify request. + , reportUnknown :: addr -> x -> err -> IO () + } + +ignoreErrors :: ErrorReporter addr x meth tid err +ignoreErrors = ErrorReporter + { reportParseError = \_ -> return () + , reportMissingHandler = \_ _ _ -> return () + , reportUnknown = \_ _ _ -> return () + } + +logErrors :: ( Show addr + , Show meth + ) => ErrorReporter addr x meth tid String +logErrors = ErrorReporter + { reportParseError = \err -> dput XMisc err + , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" + , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err + } + +printErrors :: ( Show addr + , Show meth + ) => Handle -> ErrorReporter addr x meth tid String +printErrors h = ErrorReporter + { reportParseError = \err -> hPutStrLn h err + , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" + , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err + } + +-- Change the /err/ type for an 'ErrorReporter'. +instance Contravariant (ErrorReporter addr x meth tid) where + -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 + contramap f (ErrorReporter pe mh unk) + = ErrorReporter (\e -> pe (f e)) + mh + (\addr x e -> unk addr x (f e)) + +-- | An incoming message can be classified into three cases. +data MessageClass err meth tid addr x + = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response + -- should include the provided /tid/ value. + | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. + | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked + -- with the source and destination address of a message. If it handles the + -- message, it should return Nothing. Otherwise, it should return a transform + -- (usually /id/) to apply before the next handler examines it. + | IsUnknown err -- ^ None of the above. + +-- | Handler for an inbound query of type /x/ from an address of type _addr_. +data MethodHandler err tid addr x = forall a b. MethodHandler + { -- | Parse the query into a more specific type for this method. + methodParse :: x -> Either err a + -- | Serialize the response for transmission, given a context /ctx/ and the origin + -- and destination addresses. + , methodSerialize :: tid -> addr -> addr -> b -> x + -- | Fully typed action to perform upon the query. The remote origin + -- address of the query is provided to the handler. + , methodAction :: addr -> a -> IO b + } + -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. + | forall a. NoReply + { -- | Parse the query into a more specific type for this method. + methodParse :: x -> Either err a + -- | Fully typed action to perform upon the query. The remote origin + -- address of the query is provided to the handler. + , noreplyAction :: addr -> a -> IO () + } + +-- | To dispatch responses to our outbound queries, we require three +-- primitives. See the 'transactionMethods' function to create these +-- primitives out of a lookup table and a generator for transaction ids. +-- +-- The type variable /d/ is used to represent the current state of the +-- transaction generator and the table of pending transactions. +data TransactionMethods d qid addr x = TransactionMethods + { + -- | Before a query is sent, this function stores an 'MVar' to which the + -- response will be written too. The returned /qid/ is a transaction id + -- that can be used to forget the 'MVar' if the remote peer is not + -- responding. + dispatchRegister :: POSIXTime -- time of expiry + -> (Maybe x -> IO ()) -- callback upon response (or timeout) + -> addr + -> d + -> STM (qid, d) + -- | This method is invoked when an incoming packet /x/ indicates it is + -- a response to the transaction with id /qid/. The returned IO action + -- will write the packet to the correct 'MVar' thus completing the + -- dispatch. + , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) + -- | When a timeout interval elapses, this method is called to remove the + -- transaction from the table. + , dispatchCancel :: qid -> d -> STM d + } + +-- | A set of methods necessary for dispatching incoming packets. +data DispatchMethods tbl err meth tid addr x = DispatchMethods + { -- | Classify an inbound packet as a query or response. + classifyInbound :: x -> MessageClass err meth tid addr x + -- | Lookup the handler for a inbound query. + , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) + -- | Methods for handling incoming responses. + , tableMethods :: TransactionMethods tbl tid addr x + } + +-- | All inputs required to implement a query\/response client. +data Client err meth tid addr x = forall tbl. Client + { -- | The 'Transport' used to dispatch and receive packets. + clientNet :: Transport err addr x + -- | Methods for handling inbound packets. + , clientDispatcher :: DispatchMethods tbl err meth tid addr x + -- | Methods for reporting various conditions. + , clientErrorReporter :: ErrorReporter addr x meth tid err + -- | State necessary for routing inbound responses and assigning unique + -- /tid/ values for outgoing queries. + , clientPending :: TVar tbl + -- | An action yielding this client\'s own address. It is invoked once + -- on each outbound and inbound packet. It is valid for this to always + -- return the same value. + -- + -- The argument, if supplied, is the remote address for the transaction. + -- This can be used to maintain consistent aliases for specific peers. + , clientAddress :: Maybe addr -> IO addr + -- | Transform a query /tid/ value to an appropriate response /tid/ + -- value. Normally, this would be the identity transformation, but if + -- /tid/ includes a unique cryptographic nonce, then it should be + -- generated here. + , clientResponseId :: tid -> IO tid + } + +-- | These four parameters are required to implement an outgoing query. A +-- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that +-- might be returned by 'lookupHandler'. +data MethodSerializer tid addr x meth a b = MethodSerializer + { -- | Returns the microseconds to wait for a response to this query being + -- sent to the given address. The /addr/ may also be modified to add + -- routing information. + methodTimeout :: addr -> STM (addr,Int) + -- | A method identifier used for error reporting. This needn't be the + -- same as the /meth/ argument to 'MethodHandler', but it is suggested. + , method :: meth + -- | Serialize the outgoing query /a/ into a transmittable packet /x/. + -- The /addr/ arguments are, respectively, our own origin address and the + -- destination of the request. The /tid/ argument is useful for attaching + -- auxiliary notations on all outgoing packets. + , wrapQuery :: tid -> addr -> addr -> a -> x + -- | Parse an inbound packet /x/ into a response /b/ for this query. + , unwrapResponse :: x -> b + } + +microsecondsDiff :: Int -> POSIXTime +microsecondsDiff us = fromIntegral us / 1000000 + asyncQuery_ :: Client err meth tid addr x -> MethodSerializer tid addr x meth a b -> a @@ -276,64 +461,6 @@ sendQuery c@(Client net d err pending whoami _) meth q addr0 = do atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending return Nothing --- * Implementing a query\/response 'Client'. - --- | All inputs required to implement a query\/response client. -data Client err meth tid addr x = forall tbl. Client - { -- | The 'Transport' used to dispatch and receive packets. - clientNet :: Transport err addr x - -- | Methods for handling inbound packets. - , clientDispatcher :: DispatchMethods tbl err meth tid addr x - -- | Methods for reporting various conditions. - , clientErrorReporter :: ErrorReporter addr x meth tid err - -- | State necessary for routing inbound responses and assigning unique - -- /tid/ values for outgoing queries. - , clientPending :: TVar tbl - -- | An action yielding this client\'s own address. It is invoked once - -- on each outbound and inbound packet. It is valid for this to always - -- return the same value. - -- - -- The argument, if supplied, is the remote address for the transaction. - -- This can be used to maintain consistent aliases for specific peers. - , clientAddress :: Maybe addr -> IO addr - -- | Transform a query /tid/ value to an appropriate response /tid/ - -- value. Normally, this would be the identity transformation, but if - -- /tid/ includes a unique cryptographic nonce, then it should be - -- generated here. - , clientResponseId :: tid -> IO tid - } - --- | An incoming message can be classified into three cases. -data MessageClass err meth tid addr x - = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response - -- should include the provided /tid/ value. - | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. - | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked - -- with the source and destination address of a message. If it handles the - -- message, it should return Nothing. Otherwise, it should return a transform - -- (usually /id/) to apply before the next handler examines it. - | IsUnknown err -- ^ None of the above. - --- | Handler for an inbound query of type /x/ from an address of type _addr_. -data MethodHandler err tid addr x = forall a b. MethodHandler - { -- | Parse the query into a more specific type for this method. - methodParse :: x -> Either err a - -- | Serialize the response for transmission, given a context /ctx/ and the origin - -- and destination addresses. - , methodSerialize :: tid -> addr -> addr -> b -> x - -- | Fully typed action to perform upon the query. The remote origin - -- address of the query is provided to the handler. - , methodAction :: addr -> a -> IO b - } - -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. - | forall a. NoReply - { -- | Parse the query into a more specific type for this method. - methodParse :: x -> Either err a - -- | Fully typed action to perform upon the query. The remote origin - -- address of the query is provided to the handler. - , noreplyAction :: addr -> a -> IO () - } - contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x contramapAddr f (MethodHandler p s a) = MethodHandler @@ -343,7 +470,6 @@ contramapAddr f (MethodHandler p s a) contramapAddr f (NoReply p a) = NoReply p (\addr arg -> a (f addr) arg) - -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the -- parse is successful, the returned IO action will construct our reply if -- there is one. Otherwise, a parse err is returned. @@ -358,65 +484,6 @@ dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = dispatchQuery (NoReply unwrapQ f) tid self x addr = fmap (\a -> f addr a >> return Nothing) $ unwrapQ x --- | These four parameters are required to implement an outgoing query. A --- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that --- might be returned by 'lookupHandler'. -data MethodSerializer tid addr x meth a b = MethodSerializer - { -- | Returns the microseconds to wait for a response to this query being - -- sent to the given address. The /addr/ may also be modified to add - -- routing information. - methodTimeout :: addr -> STM (addr,Int) - -- | A method identifier used for error reporting. This needn't be the - -- same as the /meth/ argument to 'MethodHandler', but it is suggested. - , method :: meth - -- | Serialize the outgoing query /a/ into a transmittable packet /x/. - -- The /addr/ arguments are, respectively, our own origin address and the - -- destination of the request. The /tid/ argument is useful for attaching - -- auxiliary notations on all outgoing packets. - , wrapQuery :: tid -> addr -> addr -> a -> x - -- | Parse an inbound packet /x/ into a response /b/ for this query. - , unwrapResponse :: x -> b - } - - --- | To dispatch responses to our outbound queries, we require three --- primitives. See the 'transactionMethods' function to create these --- primitives out of a lookup table and a generator for transaction ids. --- --- The type variable /d/ is used to represent the current state of the --- transaction generator and the table of pending transactions. -data TransactionMethods d qid addr x = TransactionMethods - { - -- | Before a query is sent, this function stores an 'MVar' to which the - -- response will be written too. The returned /qid/ is a transaction id - -- that can be used to forget the 'MVar' if the remote peer is not - -- responding. - dispatchRegister :: POSIXTime -- time of expiry - -> (Maybe x -> IO ()) -- callback upon response (or timeout) - -> addr - -> d - -> STM (qid, d) - -- | This method is invoked when an incoming packet /x/ indicates it is - -- a response to the transaction with id /qid/. The returned IO action - -- will write the packet to the correct 'MVar' thus completing the - -- dispatch. - , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) - -- | When a timeout interval elapses, this method is called to remove the - -- transaction from the table. - , dispatchCancel :: qid -> d -> STM d - } - --- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a --- function for generating unique transaction ids. -transactionMethods :: - TableMethods t tid -- ^ Table methods to lookup values by /tid/. - -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. - -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x -transactionMethods methods generate = transactionMethods' id id methods generate - -microsecondsDiff :: Int -> POSIXTime -microsecondsDiff us = fromIntegral us / 1000000 - -- | Like 'transactionMethods' but allows extra information to be stored in the -- table of pending transactions. This also enables multiple 'Client's to -- share a single transaction table. @@ -439,69 +506,13 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr Nothing -> return ((g,t), return ()) } --- | A set of methods necessary for dispatching incoming packets. -data DispatchMethods tbl err meth tid addr x = DispatchMethods - { -- | Classify an inbound packet as a query or response. - classifyInbound :: x -> MessageClass err meth tid addr x - -- | Lookup the handler for a inbound query. - , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) - -- | Methods for handling incoming responses. - , tableMethods :: TransactionMethods tbl tid addr x - } - --- | These methods indicate what should be done upon various conditions. Write --- to a log file, make debug prints, or simply ignore them. --- --- [ /addr/ ] Address of remote peer. --- --- [ /x/ ] Incoming or outgoing packet. --- --- [ /meth/ ] Method id of incoming or outgoing request. --- --- [ /tid/ ] Transaction id for outgoing packet. --- --- [ /err/ ] Error information, typically a 'String'. -data ErrorReporter addr x meth tid err = ErrorReporter - { -- | Incoming: failed to parse packet. - reportParseError :: err -> IO () - -- | Incoming: no handler for request. - , reportMissingHandler :: meth -> addr -> x -> IO () - -- | Incoming: unable to identify request. - , reportUnknown :: addr -> x -> err -> IO () - } - -ignoreErrors :: ErrorReporter addr x meth tid err -ignoreErrors = ErrorReporter - { reportParseError = \_ -> return () - , reportMissingHandler = \_ _ _ -> return () - , reportUnknown = \_ _ _ -> return () - } - -logErrors :: ( Show addr - , Show meth - ) => ErrorReporter addr x meth tid String -logErrors = ErrorReporter - { reportParseError = \err -> dput XMisc err - , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" - , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err - } - -printErrors :: ( Show addr - , Show meth - ) => Handle -> ErrorReporter addr x meth tid String -printErrors h = ErrorReporter - { reportParseError = \err -> hPutStrLn h err - , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" - , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err - } - --- Change the /err/ type for an 'ErrorReporter'. -instance Contravariant (ErrorReporter addr x meth tid) where - -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 - contramap f (ErrorReporter pe mh unk) - = ErrorReporter (\e -> pe (f e)) - mh - (\addr x e -> unk addr x (f e)) +-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a +-- function for generating unique transaction ids. +transactionMethods :: + TableMethods t tid -- ^ Table methods to lookup values by /tid/. + -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. + -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x +transactionMethods methods generate = transactionMethods' id id methods generate -- | Handle a single inbound packet and then invoke the given continuation. -- The 'forkListener' function is implemented by passing this function to 'fix' @@ -559,14 +570,14 @@ sockAddrFamily _ = AF_CAN -- SockAddrCan constructor depre -- | Packets with an empty payload may trigger EOF exception. -- 'udpTransport' uses this function to avoid throwing in that -- case. -ignoreEOF :: Socket -> MVar () -> a -> IOError -> IO (Maybe a) +ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x) ignoreEOF sock isClosed def e = do done <- tryReadMVar isClosed case done of Just () -> do close sock dput XMisc "Closing UDP socket." - pure Nothing - _ -> if isEOFError e then pure $ Just def + pure Terminated + _ -> if isEOFError e then pure def else throwIO e -- | Hard-coded maximum packet size for incoming UDP Packets received via @@ -585,13 +596,6 @@ saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) then return () else throw e --- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The --- argument is the listen-address for incoming packets. This is a useful --- low-level 'Transport' that can be transformed for higher-level protocols --- using 'layerTransport'. -udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) -udpTransport bind_address = fst <$> udpTransport' bind_address - -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) udpTransport' bind_address = do @@ -604,8 +608,8 @@ udpTransport' bind_address = do isClosed <- newEmptyMVar let tr = Transport { awaitMessage = \kont -> do - r <- handle (ignoreEOF sock isClosed $ Right (B.empty, SockAddrInet 0 0)) $ do - Just . Right <$!> B.recvFrom sock udpBufferSize + r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do + uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize kont $! r , sendMessage = case family of AF_INET6 -> \case @@ -638,13 +642,20 @@ udpTransport' bind_address = do } return (tr, sock) +-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The +-- argument is the listen-address for incoming packets. This is a useful +-- low-level 'Transport' that can be transformed for higher-level protocols +-- using 'layerTransport'. +udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) +udpTransport bind_address = fst <$> udpTransport' bind_address + chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x chanTransport chanFromAddr self achan aclosed = Transport { awaitMessage = \kont -> do - x <- atomically $ (Just <$> readTChan achan) + x <- atomically $ (uncurry (flip Arrival) <$> readTChan achan) `orElse` - (readTVar aclosed >>= check >> return Nothing) - kont $ Right <$> x + (readTVar aclosed >>= check >> return Terminated) + kont x , sendMessage = \them bs -> do atomically $ writeTChan (chanFromAddr them) (bs,self) , closeTransport = atomically $ writeTVar aclosed True -- cgit v1.2.3