-- | This module can implement any query\/response protocol. It was written -- with Kademlia implementations in mind. {-# LANGUAGE CPP #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} module Network.QueryResponse where #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Exception import Control.Monad import qualified Data.ByteString as B ;import Data.ByteString (ByteString) import Data.Dependent.Map as DMap import Data.Dependent.Sum import Data.Function import Data.Functor.Contravariant import Data.Functor.Identity import Data.GADT.Show import qualified Data.IntMap.Strict as IntMap ;import Data.IntMap.Strict (IntMap) import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) import Data.Time.Clock.POSIX import qualified Data.Word64Map as W64Map ;import Data.Word64Map (Word64Map) import Data.Word import Data.Maybe import GHC.Conc (closeFdWith) import GHC.Event import Network.Socket import Network.Socket.ByteString as B import System.Endian import System.IO import System.IO.Error import System.Timeout import DPut import DebugTag import Data.TableMethods -- | An inbound packet or condition raised while monitoring a connection. data Arrival err addr x = Terminated -- ^ Virtual message that signals EOF. | ParseError !err -- ^ A badly-formed message was recieved. | Arrival { arrivedFrom :: !addr , arrivedMsg :: !x } -- ^ Inbound message. -- | Three methods are required to implement a datagram based query\/response protocol. data TransportA err addr x y = Transport { -- | Blocks until an inbound packet is available. Then calls the provided -- continuation with the packet and origin addres or an error condition. awaitMessage :: forall a. (Arrival err addr x -> IO a) -> STM (IO a) -- | Send an /y/ packet to the given destination /addr/. , sendMessage :: addr -> y -> IO () -- | Shutdown and clean up any state related to this 'Transport'. , setActive :: Bool -> IO () } type Transport err addr x = TransportA err addr x x closeTransport :: TransportA err addr x y -> IO () closeTransport tr = setActive tr False -- | This function modifies a 'Transport' to use higher-level addresses and -- packet representations. It could be used to change UDP 'ByteString's into -- bencoded syntax trees or to add an encryption layer in which addresses have -- associated public keys. layerTransportM :: (x -> addr -> IO (Either err (x', addr'))) -- ^ Function that attempts to transform a low-level address/packet -- pair into a higher level representation. -> (y' -> addr' -> IO (y, addr)) -- ^ Function to encode a high-level address/packet into a lower level -- representation. -> TransportA err addr x y -- ^ The low-level transport to be transformed. -> TransportA err addr' x' y' layerTransportM parse encode tr = tr { awaitMessage = \kont -> awaitMessage tr $ \case Terminated -> kont $ Terminated ParseError e -> kont $ ParseError e Arrival addr x -> parse x addr >>= \case Left e -> kont $ ParseError e Right (x',addr') -> kont $ Arrival addr' x' , sendMessage = \addr' msg' -> do (msg,addr) <- encode msg' addr' sendMessage tr addr msg } -- | This function modifies a 'Transport' to use higher-level addresses and -- packet representations. It could be used to change UDP 'ByteString's into -- bencoded syntax trees or to add an encryption layer in which addresses have -- associated public keys. layerTransport :: (x -> addr -> Either err (x', addr')) -- ^ Function that attempts to transform a low-level address/packet -- pair into a higher level representation. -> (y' -> addr' -> (y, addr)) -- ^ Function to encode a high-level address/packet into a lower level -- representation. -> TransportA err addr x y -- ^ The low-level transport to be transformed. -> TransportA err addr' x' y' layerTransport parse encode tr = layerTransportM (\x addr -> return $ parse x addr) (\x' addr' -> return $ encode x' addr') tr -- | Paritions a 'Transport' into two higher-level transports. Note: A 'TChan' -- is used to share the same underlying socket, so be sure to fork a thread for -- both returned 'Transport's to avoid hanging. partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) -> ((x,xaddr) -> IO (Maybe (c,a))) -> TransportA err a b c -> IO (Transport err xaddr x, TransportA err a b c) partitionTransportM parse encodex tr = do tchan <- atomically newTChan let ytr = tr { awaitMessage = \kont -> fix $ \again -> do awaitMessage tr $ \m -> case m of Arrival adr msg -> parse (msg,adr) >>= \case Left x -> atomically (writeTChan tchan (Just x)) >> join (atomically again) Right (y,yaddr) -> kont $ Arrival yaddr y ParseError e -> kont $ ParseError e Terminated -> atomically (writeTChan tchan Nothing) >> kont Terminated , sendMessage = sendMessage tr } xtr = Transport { awaitMessage = \kont -> readTChan tchan >>= pure . kont . \case Nothing -> Terminated Just (x,xaddr) -> Arrival xaddr x , sendMessage = \addr' msg' -> do msg_addr <- encodex (msg',addr') mapM_ (uncurry . flip $ sendMessage tr) msg_addr , setActive = const $ return () } return (xtr, ytr) -- | Paritions a 'Transport' into two higher-level transports. Note: An 'TChan' -- is used to share the same underlying socket, so be sure to fork a thread for -- both returned 'Transport's to avoid hanging. partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) -> ((x,xaddr) -> Maybe (b,a)) -> Transport err a b -> IO (Transport err xaddr x, Transport err a b) partitionTransport parse encodex tr = partitionTransportM (return . parse) (return . encodex) tr partitionAndForkTransport :: (dst -> msg -> IO ()) -> ((b,a) -> IO (Either (x,xaddr) (b,a))) -> ((x,xaddr) -> IO (Maybe (Either (msg,dst) (b,a)))) -> Transport err a b -> IO (Transport err xaddr x, Transport err a b) partitionAndForkTransport forkedSend parse encodex tr = do tchan <- atomically newTChan let xtr = tr { awaitMessage = \kont -> fix $ \again -> do awaitMessage tr $ \case Arrival a b -> parse (b,a) >>= \case Left (x,xaddr) -> kont $ Arrival xaddr x Right (b,a) -> atomically (writeTChan tchan (Arrival a b)) >> join (atomically again) ParseError e -> kont $ ParseError e Terminated -> atomically (writeTChan tchan Terminated) >> kont Terminated , sendMessage = \addr' msg' -> do msg_addr <- encodex (msg',addr') case msg_addr of Just (Right (b,a)) -> sendMessage tr a b Just (Left (msg,dst)) -> forkedSend dst msg Nothing -> return () } ytr = Transport { awaitMessage = \kont -> readTChan tchan >>= pure . kont , sendMessage = sendMessage tr , setActive = \_ -> return () } return (xtr, ytr) -- | -- * f add x --> Nothing, consume x -- --> Just id, leave x to a different handler -- --> Just g, apply g to x and leave that to a different handler addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x addHandler onParseError f tr = tr { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \case Arrival addr x -> f addr x >>= maybe (join $ atomically eat) (kont . Arrival addr . ($ x)) ParseError e -> onParseError e >> kont (ParseError e) Terminated -> kont Terminated } -- | Modify a 'Transport' to invoke an action upon every received packet. onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr -- * Using a query\/response client. -- | Fork a thread that handles inbound packets. The returned action may be used -- to terminate the thread and clean up any related state. -- -- Example usage: -- -- > -- Start client. -- > quitServer <- forkListener "listener" (clientNet client) -- > -- Send a query q, recieve a response r. -- > r <- sendQuery client method q -- > -- Quit client. -- > quitServer forkListener :: String -> Transport err addr x -> IO (IO ()) forkListener name client = do setActive client True thread_id <- forkIO $ do myThreadId >>= flip labelThread ("listener."++name) fix $ \loop -> join $ atomically $ awaitMessage client $ \case Terminated -> return () _ -> loop dput XMisc $ "Listener died: " ++ name return $ do setActive client False -- killThread thread_id -- * Implementing a query\/response 'Client'. -- | These methods indicate what should be done upon various conditions. Write -- to a log file, make debug prints, or simply ignore them. -- -- [ /addr/ ] Address of remote peer. -- -- [ /x/ ] Incoming or outgoing packet. -- -- [ /meth/ ] Method id of incoming or outgoing request. -- -- [ /tid/ ] Transaction id for outgoing packet. -- -- [ /err/ ] Error information, typically a 'String'. data ErrorReporter addr x meth tid err = ErrorReporter { -- | Incoming: failed to parse packet. reportParseError :: err -> IO () -- | Incoming: no handler for request. , reportMissingHandler :: meth -> addr -> x -> IO () -- | Incoming: unable to identify request. , reportUnknown :: addr -> x -> err -> IO () } ignoreErrors :: ErrorReporter addr x meth tid err ignoreErrors = ErrorReporter { reportParseError = \_ -> return () , reportMissingHandler = \_ _ _ -> return () , reportUnknown = \_ _ _ -> return () } logErrors :: ( Show addr , Show meth ) => ErrorReporter addr x meth tid String logErrors = ErrorReporter { reportParseError = \err -> dput XMisc err , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err } printErrors :: ( Show addr , Show meth ) => Handle -> ErrorReporter addr x meth tid String printErrors h = ErrorReporter { reportParseError = \err -> hPutStrLn h err , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err } -- Change the /err/ type for an 'ErrorReporter'. instance Contravariant (ErrorReporter addr x meth tid) where -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 contramap f (ErrorReporter pe mh unk) = ErrorReporter (\e -> pe (f e)) mh (\addr x e -> unk addr x (f e)) -- | An incoming message can be classified into three cases. data MessageClass err meth tid addr x = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response -- should include the provided /tid/ value. | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked -- with the source and destination address of a message. If it handles the -- message, it should return Nothing. Otherwise, it should return a transform -- (usually /id/) to apply before the next handler examines it. | IsUnknown err -- ^ None of the above. -- | Handler for an inbound query of type /x/ from an address of type _addr_. data MethodHandler err tid addr x = forall a b. MethodHandler { -- | Parse the query into a more specific type for this method. methodParse :: x -> Either err a -- | Serialize the response for transmission, given a context /ctx/ and the origin -- and destination addresses. , methodSerialize :: tid -> addr -> addr -> b -> x -- | Fully typed action to perform upon the query. The remote origin -- address of the query is provided to the handler. , methodAction :: addr -> a -> IO b } -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | forall a. NoReply { -- | Parse the query into a more specific type for this method. methodParse :: x -> Either err a -- | Fully typed action to perform upon the query. The remote origin -- address of the query is provided to the handler. , noreplyAction :: addr -> a -> IO () } -- | To dispatch responses to our outbound queries, we require three -- primitives. See the 'transactionMethods' function to create these -- primitives out of a lookup table and a generator for transaction ids. -- -- The type variable /d/ is used to represent the current state of the -- transaction generator and the table of pending transactions. data TransactionMethods d qid addr x = TransactionMethods { -- | Before a query is sent, this function stores an 'MVar' to which the -- response will be written too. The returned /qid/ is a transaction id -- that can be used to forget the 'MVar' if the remote peer is not -- responding. dispatchRegister :: POSIXTime -- time of expiry -> (Maybe x -> IO ()) -- callback upon response (or timeout) -> addr -> d -> STM (qid, d) -- | This method is invoked when an incoming packet /x/ indicates it is -- a response to the transaction with id /qid/. The returned IO action -- will write the packet to the correct 'MVar' thus completing the -- dispatch. , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) -- | When a timeout interval elapses, this method is called to remove the -- transaction from the table. , dispatchCancel :: qid -> d -> STM d } -- | A set of methods necessary for dispatching incoming packets. data DispatchMethods tbl err meth tid addr x = DispatchMethods { -- | Classify an inbound packet as a query or response. classifyInbound :: x -> MessageClass err meth tid addr x -- | Lookup the handler for a inbound query. , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) -- | Methods for handling incoming responses. , tableMethods :: TransactionMethods tbl tid addr x } -- | All inputs required to implement a query\/response client. data Client err meth tid addr x = forall tbl. Client { -- | The 'Transport' used to dispatch and receive packets. clientNet :: Transport err addr x -- | Methods for handling inbound packets. , clientDispatcher :: DispatchMethods tbl err meth tid addr x -- | Methods for reporting various conditions. , clientErrorReporter :: ErrorReporter addr x meth tid err -- | State necessary for routing inbound responses and assigning unique -- /tid/ values for outgoing queries. , clientPending :: TVar tbl -- | An action yielding this client\'s own address. It is invoked once -- on each outbound and inbound packet. It is valid for this to always -- return the same value. -- -- The argument, if supplied, is the remote address for the transaction. -- This can be used to maintain consistent aliases for specific peers. , clientAddress :: Maybe addr -> IO addr -- | Transform a query /tid/ value to an appropriate response /tid/ -- value. Normally, this would be the identity transformation, but if -- /tid/ includes a unique cryptographic nonce, then it should be -- generated here. , clientResponseId :: tid -> IO tid } -- | These four parameters are required to implement an outgoing query. A -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that -- might be returned by 'lookupHandler'. data MethodSerializer tid addr x meth a b = MethodSerializer { -- | Returns the microseconds to wait for a response to this query being -- sent to the given address. The /addr/ may also be modified to add -- routing information. methodTimeout :: addr -> STM (addr,Int) -- | A method identifier used for error reporting. This needn't be the -- same as the /meth/ argument to 'MethodHandler', but it is suggested. , method :: meth -- | Serialize the outgoing query /a/ into a transmittable packet /x/. -- The /addr/ arguments are, respectively, our own origin address and the -- destination of the request. The /tid/ argument is useful for attaching -- auxiliary notations on all outgoing packets. , wrapQuery :: tid -> addr -> addr -> a -> x -- | Parse an inbound packet /x/ into a response /b/ for this query. , unwrapResponse :: x -> b } microsecondsDiff :: Int -> POSIXTime microsecondsDiff us = fromIntegral us / 1000000 asyncQuery_ :: Client err meth tid addr x -> MethodSerializer tid addr x meth a b -> a -> addr -> (Maybe b -> IO ()) -> IO (tid,POSIXTime,Int) asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do now <- getPOSIXTime (tid,addr,expiry) <- atomically $ do tbl <- readTVar pending (addr,expiry) <- methodTimeout meth addr0 (tid, tbl') <- dispatchRegister (tableMethods d) (now + microsecondsDiff expiry) (withResponse . fmap (unwrapResponse meth)) addr -- XXX: Should be addr0 or addr? tbl -- (addr,expiry) <- methodTimeout meth tid addr0 writeTVar pending tbl' return (tid,addr,expiry) self <- whoami (Just addr) mres <- do sendMessage net addr (wrapQuery meth tid self addr q) return $ Just () `catchIOError` (\e -> return Nothing) return (tid,now,expiry) asyncQuery :: Show meth => Client err meth tid addr x -> MethodSerializer tid addr x meth a b -> a -> addr -> (Maybe b -> IO ()) -> IO () asyncQuery client meth q addr withResponse0 = do tm <- getSystemTimerManager tidvar <- newEmptyMVar timedout <- registerTimeout tm 1000000 $ do dput XMisc $ "async TIMEDOUT " ++ show (method meth) withResponse0 Nothing tid <- takeMVar tidvar dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) case client of Client { clientDispatcher = d, clientPending = pending } -> do atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do unregisterTimeout tm timedout withResponse0 x putMVar tidvar tid updateTimeout tm timedout expiry dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry -- | Send a query to a remote peer. Note that this function will always time -- out if 'forkListener' was never invoked to spawn a thread to receive and -- dispatch the response. sendQuery :: forall err a b tbl x meth tid addr. Client err meth tid addr x -- ^ A query/response implementation. -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. -> a -- ^ The outbound query. -> addr -- ^ Destination address of query. -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. sendQuery c@(Client net d err pending whoami _) meth q addr0 = do mvar <- newEmptyMVar (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) mres <- timeout expiry $ takeMVar mvar case mres of Just b -> return $ Just b Nothing -> do atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending return Nothing contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x contramapAddr f (MethodHandler p s a) = MethodHandler p (\tid src dst result -> s tid (f src) (f dst) result) (\addr arg -> a (f addr) arg) contramapAddr f (NoReply p a) = NoReply p (\addr arg -> a (f addr) arg) -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the -- parse is successful, the returned IO action will construct our reply if -- there is one. Otherwise, a parse err is returned. dispatchQuery :: MethodHandler err tid addr x -- ^ Handler to invoke. -> tid -- ^ The transaction id for this query\/response session. -> addr -- ^ Our own address, to which the query was sent. -> x -- ^ The query packet. -> addr -- ^ The origin address of the query. -> Either err (IO (Maybe x)) dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = fmap (\a -> Just . wrapR tid self addr <$> f addr a) $ unwrapQ x dispatchQuery (NoReply unwrapQ f) tid self x addr = fmap (\a -> f addr a >> return Nothing) $ unwrapQ x -- | Like 'transactionMethods' but allows extra information to be stored in the -- table of pending transactions. This also enables multiple 'Client's to -- share a single transaction table. transactionMethods' :: ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. -> TransactionMethods (g,t a) tid addr x transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods { dispatchCancel = \tid (g,t) -> return (g, delete tid t) , dispatchRegister = \nowPlusExpiry v a (g,t) -> do let (tid,g') = generate g let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t return ( tid, (g',t') ) , dispatchResponse = \tid x (g,t) -> case lookup tid t of Just v -> let t' = delete tid t in return ((g,t'),void $ load v $ Just x) Nothing -> return ((g,t), return ()) } -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a -- function for generating unique transaction ids. transactionMethods :: TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x transactionMethods methods generate = transactionMethods' id id methods generate -- | Handle a single inbound packet and then invoke the given continuation. -- The 'forkListener' function is implemented by passing this function to 'fix' -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or -- throws an exception. handleMessage :: Client err meth tid addr x -> addr -> x -> IO (Maybe (x -> x)) handleMessage (Client net d err pending whoami responseID) addr plain = do -- Just (Left e) -> do reportParseError err e -- return $! Just id -- Just (Right (plain, addr)) -> do case classifyInbound d plain of IsQuery meth tid -> case lookupHandler d meth of Nothing -> do reportMissingHandler err meth addr plain return $! Just id Just m -> do self <- whoami (Just addr) tid' <- responseID tid either (\e -> do reportParseError err e return $! Just id) (>>= \m -> do mapM_ (sendMessage net addr) m return $! Nothing) (dispatchQuery m tid' self plain addr) IsUnsolicited action -> do self <- whoami (Just addr) action self addr return Nothing IsResponse tid -> do action <- atomically $ do ts0 <- readTVar pending (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 writeTVar pending ts return action action return $! Nothing IsUnknown e -> do reportUnknown err addr plain e return $! Just id -- Nothing -> return $! id -- * UDP Datagrams. -- | Access the address family of a given 'SockAddr'. This convenient accessor -- is missing from 'Network.Socket', so I implemented it here. sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 sockAddrFamily (SockAddrUnix _ ) = AF_UNIX #if !MIN_VERSION_network(3,0,0) sockAddrFamily _ = AF_CAN -- SockAddrCan constructor deprecated #endif -- | Packets with an empty payload may trigger EOF exception. -- 'udpTransport' uses this function to avoid throwing in that -- case. ignoreEOF :: Socket -> MVar () -> Arrival e a x -> IOError -> IO (Arrival e a x) ignoreEOF sock isClosed def e = do done <- tryReadMVar isClosed case done of Just () -> do close sock dput XMisc "Closing UDP socket." pure Terminated _ -> if isEOFError e then pure def else throwIO e -- | Hard-coded maximum packet size for incoming UDP Packets received via -- 'udpTransport'. udpBufferSize :: Int udpBufferSize = 65536 -- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError. saferSendTo :: Socket -> ByteString -> SockAddr -> IO () saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) `catch` \e -> -- sendTo: does not exist (Network is unreachable) -- Occurs when IPv6 or IPv4 network is not available. -- Currently, we require -threaded to prevent a forever-hang in this case. if isDoesNotExistError e then return () else throw e -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). udpTransport' :: Show err => SockAddr -> IO (Transport err SockAddr ByteString, Socket) udpTransport' bind_address = do let family = sockAddrFamily bind_address sock <- socket family Datagram defaultProtocol when (family == AF_INET6) $ do setSocketOption sock IPv6Only 0 setSocketOption sock Broadcast 1 bind sock bind_address isClosed <- newEmptyMVar udpTChan <- atomically newTChan let tr = Transport { awaitMessage = \kont -> do r <- readTChan udpTChan return $ kont $! r , sendMessage = case family of AF_INET6 -> \case (SockAddrInet port addr) -> \bs -> -- Change IPv4 to 4mapped6 address. saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0 addr6 -> \bs -> saferSendTo sock bs addr6 AF_INET -> \case (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do let host4 = toBE32 raw4 -- Change 4mapped6 to ordinary IPv4. -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4) saferSendTo sock bs (SockAddrInet port host4) addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) addr4 -> \bs -> saferSendTo sock bs addr4 _ -> \addr bs -> saferSendTo sock bs addr , setActive = \case False -> do dput XMisc $ "closeTransport for udpTransport' called. " ++ show bind_address tryPutMVar isClosed () -- signal awaitMessage that the transport is closed. #if MIN_VERSION_network (3,1,0) #elif MIN_VERSION_network(3,0,0) let withFdSocket sock f = fdSocket sock >>= f >>= seq sock . return #else let withFdSocket sock f = f (fdSocket sock) >>= seq sock . return #endif withFdSocket sock $ \fd -> do let sorryGHCButIAmNotFuckingClosingTheSocketYet fd = return () -- This call is necessary to interrupt the blocking recvFrom call in awaitMessage. closeFdWith sorryGHCButIAmNotFuckingClosingTheSocketYet (fromIntegral fd) True -> do udpThread <- forkIO $ fix $ \again -> do r <- handle (ignoreEOF sock isClosed $ Arrival (SockAddrInet 0 0) B.empty) $ do uncurry (flip Arrival) <$!> B.recvFrom sock udpBufferSize atomically $ writeTChan udpTChan r case r of Terminated -> return () _ -> again labelThread udpThread ("udp.io."++show bind_address) } return (tr, sock) -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The -- argument is the listen-address for incoming packets. This is a useful -- low-level 'Transport' that can be transformed for higher-level protocols -- using 'layerTransport'. udpTransport :: Show err => SockAddr -> IO (Transport err SockAddr ByteString) udpTransport bind_address = fst <$> udpTransport' bind_address chanTransport :: (addr -> TChan (x, addr)) -> addr -> TChan (x, addr) -> TVar Bool -> Transport err addr x chanTransport chanFromAddr self achan aclosed = Transport { awaitMessage = \kont -> do x <- (uncurry (flip Arrival) <$> readTChan achan) `orElse` (readTVar aclosed >>= check >> return Terminated) return $ kont x , sendMessage = \them bs -> do atomically $ writeTChan (chanFromAddr them) (bs,self) , setActive = \case False -> atomically $ writeTVar aclosed True True -> return () } -- | Returns a pair of transports linked together to simulate two computers talking to each other. testPairTransport :: IO (Transport err SockAddr ByteString, Transport err SockAddr ByteString) testPairTransport = do achan <- atomically newTChan bchan <- atomically newTChan aclosed <- atomically $ newTVar False bclosed <- atomically $ newTVar False let a = SockAddrInet 1 1 b = SockAddrInet 2 2 return ( chanTransport (const bchan) a achan aclosed , chanTransport (const achan) b bchan bclosed ) newtype ByAddress err x addr = ByAddress (Transport err addr x) newtype Tagged x addr = Tagged x decorateAddr :: tag addr -> Arrival e addr x -> Arrival e (DSum tag Identity) x decorateAddr tag Terminated = Terminated decorateAddr tag (ParseError e) = ParseError e decorateAddr tag (Arrival addr x) = Arrival (tag ==> addr) x mergeTransports :: GCompare tag => DMap tag (ByAddress err x) -> IO (Transport err (DSum tag Identity) x) mergeTransports tmap = do -- vmap <- traverseWithKey (\k v -> Tagged <$> newEmptyMVar) tmap -- foldrWithKey (\k v n -> forkMergeBranch k v >> n) (return ()) vmap return Transport { awaitMessage = \kont -> foldrWithKey (\k (ByAddress tr) n -> awaitMessage tr (kont . decorateAddr k) `orElse` n) retry tmap , sendMessage = \(tag :=> Identity addr) x -> case DMap.lookup tag tmap of Just (ByAddress tr) -> sendMessage tr addr x Nothing -> return () , setActive = \toggle -> foldrWithKey (\_ (ByAddress tr) next -> setActive tr toggle >> next) (return ()) tmap }